Reconfigure updates processing to handle face

This commit is contained in:
Nicolas Mowen 2024-10-21 10:46:43 -06:00
parent 819e6e4d40
commit ed243846cc

View File

@ -58,10 +58,17 @@ class EmbeddingMaintainer(threading.Thread):
) )
self.embeddings_responder = EmbeddingsResponder() self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
# set face recognition conditions
self.face_recognition_enabled = (
self.config.semantic_search.face_recognition.enabled
)
self.requires_face_detection = "face" not in self.config.model.all_attributes
# create communication for updating event descriptions # create communication for updating event descriptions
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.stop_event = stop_event self.stop_event = stop_event
self.tracked_events = {} self.tracked_events: dict[str, list[any]] = {}
self.genai_client = get_genai_client(config.genai) self.genai_client = get_genai_client(config.genai)
def run(self) -> None: def run(self) -> None:
@ -119,24 +126,30 @@ class EmbeddingMaintainer(threading.Thread):
return return
camera_config = self.config.cameras[camera] camera_config = self.config.cameras[camera]
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if (
not camera_config.genai.enabled
or self.genai_client is None
or data["stationary"]
):
return
if data["id"] not in self.tracked_events: # no need to process updated objects if face recognition and genai are disabled
self.tracked_events[data["id"]] = [] if not camera_config.genai.enabled and not self.face_recognition_enabled:
return
# Create our own thumbnail based on the bounding box and the frame time # Create our own thumbnail based on the bounding box and the frame time
try: try:
frame_id = f"{camera}{data['frame_time']}" frame_id = f"{camera}{data['frame_time']}"
yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv) yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv)
except FileNotFoundError:
pass
if yuv_frame is None:
return
if self.face_recognition_enabled:
self._process_face(data, yuv_frame)
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self.genai_client is not None and not data["stationary"]:
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
if yuv_frame is not None:
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved # Limit the number of thumbnails saved
@ -147,8 +160,6 @@ class EmbeddingMaintainer(threading.Thread):
self.tracked_events[data["id"]].append(data) self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_id) self.frame_manager.close(frame_id)
except FileNotFoundError:
pass
def _process_finalized(self) -> None: def _process_finalized(self) -> None:
"""Process the end of an event.""" """Process the end of an event."""
@ -252,6 +263,37 @@ class EmbeddingMaintainer(threading.Thread):
if event_id: if event_id:
self.handle_regenerate_description(event_id, source) self.handle_regenerate_description(event_id, source)
def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
"""Look for faces in image."""
# don't overwrite sub label for objects that have one
if obj_data.get("sub_label"):
return
face = None
if self.requires_face_detection:
# TODO run cv2 face detection
pass
else:
for attr in obj_data.get("current_attributes", []):
if attr.get("label") != "face":
continue
if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr
# no faces detected in this frame
logger.info(f"face is detected as {face}")
if not face:
return
# TODO crop frame to face box
# TODO run embedding on face box
# TODO compare embedding to faces in embeddings DB to fine cosine similarity
# TODO check against threshold and min score to see if best face qualifies
# TODO update tracked object
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame.""" """Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)