Implement best practices

This commit is contained in:
Weitheng Haw 2025-01-28 09:24:39 +00:00
parent 70f46fae28
commit aa5f7721b6
2 changed files with 105 additions and 54 deletions

View File

@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2 MIN_MATCHING_FACES = 2
MIN_FACE_SCORE = 0.8
NMS_THRESHOLD = 0.3
FACE_INPUT_SIZE = (320, 320)
FACE_QUALITY = 100
class FaceProcessor(RealTimeProcessorApi): class FaceProcessor(RealTimeProcessorApi):
@ -358,49 +362,83 @@ class FaceProcessor(RealTimeProcessorApi):
if topic == EmbeddingsRequestEnum.clear_face_classifier.value: if topic == EmbeddingsRequestEnum.clear_face_classifier.value:
self.__clear_classifier() self.__clear_classifier()
elif topic == EmbeddingsRequestEnum.register_face.value: elif topic == EmbeddingsRequestEnum.register_face.value:
rand_id = "".join( face_name = request_data.get("face_name")
random.choices(string.ascii_lowercase + string.digits, k=6) if not self.__validate_face_name(face_name):
) return {
label = request_data["face_name"] "message": "Invalid face name",
id = f"{label}-{rand_id}" "success": False,
}
if request_data.get("cropped"):
thumbnail = request_data["image"] try:
else: rand_id = "".join(
img = cv2.imdecode( random.choices(string.ascii_lowercase + string.digits, k=6)
np.frombuffer(
base64.b64decode(request_data["image"]), dtype=np.uint8
),
cv2.IMREAD_COLOR,
) )
face_box = self.__detect_face(img) id = f"{face_name}-{rand_id}"
if not face_box: if request_data.get("cropped"):
return { thumbnail = request_data["image"]
"message": "No face was detected.", else:
"success": False, img = cv2.imdecode(
} np.frombuffer(
base64.b64decode(request_data["image"]), dtype=np.uint8
),
cv2.IMREAD_COLOR,
)
face_box = self.__detect_face(img)
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]] if not face_box:
_, thumbnail = cv2.imencode( return {
".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100] "message": "No face was detected.",
) "success": False,
}
# write face to library face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
folder = os.path.join(FACE_DIR, label) _, thumbnail = cv2.imencode(
file = os.path.join(folder, f"{id}.webp") ".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
os.makedirs(folder, exist_ok=True) )
# save face image # write face to library
with open(file, "wb") as output: folder = os.path.join(FACE_DIR, face_name)
output.write(thumbnail.tobytes()) file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
self.__clear_classifier() # save face image
return { with open(file, "wb") as output:
"message": "Successfully registered face.", output.write(thumbnail.tobytes())
"success": True,
} self.__clear_classifier()
return {
"message": "Successfully registered face.",
"success": True,
}
except cv2.error as e:
return {
"message": f"Failed to process image: {str(e)}",
"success": False,
}
except Exception as e:
logger.error(f"Unexpected error registering face: {str(e)}")
return {
"message": "Internal server error",
"success": False,
}
def expire_object(self, object_id: str): def expire_object(self, object_id: str):
if object_id in self.detected_faces: if object_id in self.detected_faces:
self.detected_faces.pop(object_id) self.detected_faces.pop(object_id)
def __validate_face_name(self, name: str) -> bool:
"""Validate face name meets requirements."""
if not name or not isinstance(name, str):
return False
# Add any other validation rules (e.g., no special chars)
return True
def cleanup(self):
"""Cleanup resources when shutting down."""
if self.face_detector:
self.face_detector = None
if self.landmark_detector:
self.landmark_detector = None
if self.face_recognizer:
self.face_recognizer = None

View File

@ -116,29 +116,38 @@ export default function FaceLibrary() {
); );
const [newFaceDialog, setNewFaceDialog] = useState(false); const [newFaceDialog, setNewFaceDialog] = useState(false);
const [isCreatingFace, setIsCreatingFace] = useState(false);
const [newFaceName, setNewFaceName] = useState(""); const [newFaceName, setNewFaceName] = useState("");
const createNewFace = useCallback(() => { const handleKeyPress = (e: React.KeyboardEvent) => {
if (e.key === 'Enter') {
createNewFace();
}
};
const createNewFace = useCallback(async () => {
if (!newFaceName.trim()) { if (!newFaceName.trim()) {
toast.error("Face name cannot be empty", { position: "top-center" }); toast.error("Face name cannot be empty", { position: "top-center" });
return; return;
} }
axios setIsCreatingFace(true);
.post(`/faces/${newFaceName}`) try {
.then((resp) => { const resp = await axios.post(`/faces/${newFaceName}`);
if (resp.status == 200) { if (resp.status === 200) {
setNewFaceDialog(false); setNewFaceDialog(false);
setNewFaceName(""); setNewFaceName("");
refreshFaces(); refreshFaces();
toast.success("Successfully created new face", { position: "top-center" }); toast.success("Successfully created new face", { position: "top-center" });
} }
}) } catch (error) {
.catch((error) => { toast.error(
toast.error(`Failed to create face: ${error.response?.data?.message || error.message}`, { `Failed to create face: ${error.response?.data?.message || error.message}`,
position: "top-center", { position: "top-center" }
}); );
}); } finally {
setIsCreatingFace(false);
}
}, [newFaceName, refreshFaces]); }, [newFaceName, refreshFaces]);
if (!config) { if (!config) {
@ -159,8 +168,12 @@ export default function FaceLibrary() {
placeholder="Enter face name" placeholder="Enter face name"
value={newFaceName} value={newFaceName}
onChange={(e) => setNewFaceName(e.target.value)} onChange={(e) => setNewFaceName(e.target.value)}
onKeyPress={handleKeyPress}
disabled={isCreatingFace}
/> />
<Button onClick={createNewFace}>Create</Button> <Button onClick={createNewFace} disabled={isCreatingFace}>
{isCreatingFace ? "Creating..." : "Create"}
</Button>
</div> </div>
</DialogContent> </DialogContent>
</Dialog> </Dialog>