From ed243846cc75e6ddf7289d6cf0e689825e9077b8 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 21 Oct 2024 10:46:43 -0600 Subject: [PATCH] Reconfigure updates processing to handle face --- frigate/embeddings/maintainer.py | 88 +++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 23 deletions(-) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 1578a0fe3..1262d6fa5 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -58,10 +58,17 @@ class EmbeddingMaintainer(threading.Thread): ) self.embeddings_responder = EmbeddingsResponder() self.frame_manager = SharedMemoryFrameManager() + + # set face recognition conditions + self.face_recognition_enabled = ( + self.config.semantic_search.face_recognition.enabled + ) + self.requires_face_detection = "face" not in self.config.model.all_attributes + # create communication for updating event descriptions self.requestor = InterProcessRequestor() self.stop_event = stop_event - self.tracked_events = {} + self.tracked_events: dict[str, list[any]] = {} self.genai_client = get_genai_client(config.genai) def run(self) -> None: @@ -119,37 +126,41 @@ class EmbeddingMaintainer(threading.Thread): return camera_config = self.config.cameras[camera] - # no need to save our own thumbnails if genai is not enabled - # or if the object has become stationary - if ( - not camera_config.genai.enabled - or self.genai_client is None - or data["stationary"] - ): - return - if data["id"] not in self.tracked_events: - self.tracked_events[data["id"]] = [] + # no need to process updated objects if face recognition and genai are disabled + if not camera_config.genai.enabled and not self.face_recognition_enabled: + return # Create our own thumbnail based on the bounding box and the frame time try: frame_id = f"{camera}{data['frame_time']}" yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv) - - if yuv_frame is not None: - data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) - - # Limit the number of thumbnails saved - if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS: - # Always keep the first thumbnail for the event - self.tracked_events[data["id"]].pop(1) - - self.tracked_events[data["id"]].append(data) - - self.frame_manager.close(frame_id) except FileNotFoundError: pass + if yuv_frame is None: + return + + if self.face_recognition_enabled: + self._process_face(data, yuv_frame) + + # no need to save our own thumbnails if genai is not enabled + # or if the object has become stationary + if self.genai_client is not None and not data["stationary"]: + if data["id"] not in self.tracked_events: + self.tracked_events[data["id"]] = [] + + data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) + + # Limit the number of thumbnails saved + if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS: + # Always keep the first thumbnail for the event + self.tracked_events[data["id"]].pop(1) + + self.tracked_events[data["id"]].append(data) + + self.frame_manager.close(frame_id) + def _process_finalized(self) -> None: """Process the end of an event.""" while True: @@ -252,6 +263,37 @@ class EmbeddingMaintainer(threading.Thread): if event_id: self.handle_regenerate_description(event_id, source) + def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> None: + """Look for faces in image.""" + # don't overwrite sub label for objects that have one + if obj_data.get("sub_label"): + return + + face = None + + if self.requires_face_detection: + # TODO run cv2 face detection + pass + else: + for attr in obj_data.get("current_attributes", []): + if attr.get("label") != "face": + continue + + if face is None or attr.get("score", 0.0) > face.get("score", 0.0): + face = attr + + # no faces detected in this frame + logger.info(f"face is detected as {face}") + + if not face: + return + + # TODO crop frame to face box + # TODO run embedding on face box + # TODO compare embedding to faces in embeddings DB to fine cosine similarity + # TODO check against threshold and min score to see if best face qualifies + # TODO update tracked object + def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: """Return jpg thumbnail of a region of the frame.""" frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)