Add support for face detection

This commit is contained in:
Nicolas Mowen 2024-10-23 11:01:11 -06:00
parent fccfbd6959
commit 3a570e21d2
4 changed files with 112 additions and 42 deletions

View File

@ -8,6 +8,9 @@ class EventsSubLabelBody(BaseModel):
subLabelScore: Optional[float] = Field( subLabelScore: Optional[float] = Field(
title="Score for sub label", default=None, gt=0.0, le=1.0 title="Score for sub label", default=None, gt=0.0, le=1.0
) )
camera: Optional[str] = Field(
title="Camera this object is detected on.", default=None
)
class EventsDescriptionBody(BaseModel): class EventsDescriptionBody(BaseModel):

View File

@ -890,38 +890,54 @@ def set_sub_label(
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
except DoesNotExist: except DoesNotExist:
if not body.camera:
return JSONResponse(
content=(
{
"success": False,
"message": "Event "
+ event_id
+ " not found and camera is not provided.",
}
),
status_code=404,
)
event = None
tracked_obj: TrackedObject = request.app.detected_frames_processor.camera_states[
event.camera if event else body.camera
].tracked_objects.get(event_id)
if not event and not tracked_obj:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": "Event " + event_id + " not found"}), content=(
{"success": False, "message": "Event " + event_id + " not found."}
),
status_code=404, status_code=404,
) )
new_sub_label = body.subLabel new_sub_label = body.subLabel
new_score = body.subLabelScore new_score = body.subLabelScore
if not event.end_time: if tracked_obj:
# update tracked object tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score)
tracked_obj: TrackedObject = (
request.app.detected_frames_processor.camera_states[
event.camera
].tracked_objects.get(event.id)
)
if tracked_obj:
tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score)
# update timeline items # update timeline items
Timeline.update( Timeline.update(
data=Timeline.data.update({"sub_label": (new_sub_label, new_score)}) data=Timeline.data.update({"sub_label": (new_sub_label, new_score)})
).where(Timeline.source_id == event_id).execute() ).where(Timeline.source_id == event_id).execute()
event.sub_label = new_sub_label if event:
event.sub_label = new_sub_label
if new_score: if new_score:
data = event.data data = event.data
data["sub_label_score"] = new_score data["sub_label_score"] = new_score
event.data = data event.data = data
event.save()
event.save()
return JSONResponse( return JSONResponse(
content=( content=(
{ {
@ -1015,7 +1031,7 @@ def regenerate_description(
content=( content=(
{ {
"success": False, "success": False,
"message": "Semantic Search and Generative AI must be enabled to regenerate a description", "message": "Semantic search and generative AI are not enabled",
} }
), ),
status_code=400, status_code=400,

View File

@ -129,7 +129,8 @@ class Embeddings:
model_name="facenet", model_name="facenet",
model_file="facenet.onnx", model_file="facenet.onnx",
download_urls={ download_urls={
"facenet.onnx": "https://github.com/NicolasSM-001/faceNet.onnx-/raw/refs/heads/main/faceNet.onnx" "facenet.onnx": "https://github.com/NicolasSM-001/faceNet.onnx-/raw/refs/heads/main/faceNet.onnx",
"facedet.onnx": "https://github.com/opencv/opencv_zoo/raw/refs/heads/main/models/face_detection_yunet/face_detection_yunet_2023mar_int8.onnx",
}, },
model_size="large", model_size="large",
model_type=ModelTypeEnum.face, model_type=ModelTypeEnum.face,

View File

@ -72,6 +72,19 @@ class EmbeddingMaintainer(threading.Thread):
self.tracked_events: dict[str, list[any]] = {} self.tracked_events: dict[str, list[any]] = {}
self.genai_client = get_genai_client(config.genai) self.genai_client = get_genai_client(config.genai)
@property
def face_detector(self) -> cv2.FaceDetectorYN:
# Lazily create the classifier.
if "face_detector" not in self.__dict__:
self.__dict__["face_detector"] = cv2.FaceDetectorYN.create(
"/config/model_cache/facenet/facedet.onnx",
config="",
input_size=(320, 320),
score_threshold=0.8,
nms_threshold=0.3,
)
return self.__dict__["face_detector"]
def run(self) -> None: def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search.""" """Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set(): while not self.stop_event.is_set():
@ -277,7 +290,7 @@ class EmbeddingMaintainer(threading.Thread):
if event_id: if event_id:
self.handle_regenerate_description(event_id, source) self.handle_regenerate_description(event_id, source)
def _search_face(self, query_embedding: bytes) -> list: def _search_face(self, query_embedding: bytes) -> list[tuple[str, float]]:
"""Search for the face most closely matching the embedding.""" """Search for the face most closely matching the embedding."""
sql_query = f""" sql_query = f"""
SELECT SELECT
@ -309,8 +322,38 @@ class EmbeddingMaintainer(threading.Thread):
face: Optional[dict[str, any]] = None face: Optional[dict[str, any]] = None
if self.requires_face_detection: if self.requires_face_detection:
# TODO run cv2 face detection logger.debug("Running manual face detection.")
pass person_box = obj_data.get("box")
if not person_box:
return None
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box
person = rgb[top:bottom, left:right]
self.face_detector.setInputSize((right - left, bottom - top))
faces = self.face_detector.detect(person)
if faces[1] is None:
logger.debug("Detected no faces for person object.")
return
face = None
for _, potential_face in enumerate(faces[1]):
raw_bbox = potential_face[0:4].astype(np.int8)
x = max(raw_bbox[0], 0)
y = max(raw_bbox[1], 0)
w = raw_bbox[2]
h = raw_bbox[3]
bbox = (x, y, x + w, y + h)
if face is None or area(bbox) > area(face):
face = bbox
face_frame = person[face[1] : face[3], face[0] : face[2]]
face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
else: else:
# don't run for object without attributes # don't run for object without attributes
if not obj_data.get("current_attributes"): if not obj_data.get("current_attributes"):
@ -325,22 +368,22 @@ class EmbeddingMaintainer(threading.Thread):
if face is None or attr.get("score", 0.0) > face.get("score", 0.0): if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr face = attr
# no faces detected in this frame # no faces detected in this frame
if not face: if not face:
return return
face_box = face.get("box") face_box = face.get("box")
# check that face is valid # check that face is valid
if ( if not face_box or area(face_box) < self.config.face_recognition.min_area:
not face_box logger.debug(f"Invalid face box {face}")
or area(face_box) < self.config.semantic_search.face_recognition.min_area return
):
logger.debug(f"Invalid face box {face}") face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
return face_frame = face_frame[
face_box[1] : face_box[3], face_box[0] : face_box[2]
]
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, jpg = cv2.imencode( ret, jpg = cv2.imencode(
".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100] ".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
) )
@ -355,6 +398,7 @@ class EmbeddingMaintainer(threading.Thread):
logger.debug(f"Detected best faces for person as: {best_faces}") logger.debug(f"Detected best faces for person as: {best_faces}")
if not best_faces or len(best_faces) < REQUIRED_FACES: if not best_faces or len(best_faces) < REQUIRED_FACES:
logger.debug(f"{len(best_faces)} < {REQUIRED_FACES} min required faces.")
return return
sub_label = str(best_faces[0][0]).split("-")[0] sub_label = str(best_faces[0][0]).split("-")[0]
@ -363,28 +407,34 @@ class EmbeddingMaintainer(threading.Thread):
for face in best_faces: for face in best_faces:
score = 1.0 - face[1] score = 1.0 - face[1]
if face[0] != sub_label: if face[0].split("-")[0] != sub_label:
logger.debug("Detected multiple faces, result is not valid.") logger.debug("Detected multiple faces, result is not valid.")
return None return None
avg_score += score avg_score += score
avg_score = avg_score / REQUIRED_FACES avg_score = round(avg_score / REQUIRED_FACES, 2)
if avg_score < self.config.semantic_search.face_recognition.threshold or ( if avg_score < self.config.face_recognition.threshold or (
id in self.detected_faces and avg_score <= self.detected_faces[id] id in self.detected_faces and avg_score <= self.detected_faces[id]
): ):
logger.debug( logger.debug(
"Detected face does not score higher than threshold / previous face." f"Recognized face score {avg_score} is less than threshold ({self.config.face_recognition.threshold}) / previous face score ({self.detected_faces.get(id)})."
) )
return None return None
self.detected_faces[id] = avg_score resp = requests.post(
requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={"subLabel": sub_label, "subLabelScore": avg_score}, json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": avg_score,
},
) )
if resp.status_code == 200:
self.detected_faces[id] = avg_score
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame.""" """Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)