Add support face recognition based on existing faces

This commit is contained in:
Nicolas Mowen 2024-10-21 12:22:07 -06:00
parent 6b2ffc4c06
commit bd95f1d270
4 changed files with 37 additions and 27 deletions

View File

@ -9,11 +9,8 @@ __all__ = ["FaceRecognitionConfig", "SemanticSearchConfig"]
class FaceRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable face recognition.")
reindex: Optional[bool] = Field(
default=False, title="Reindex all detections on startup."
)
model_size: str = Field(
default="small", title="The size of the embeddings model used."
threshold: float = Field(
default=0.8, title="Face similarity score required to be considered a match."
)

View File

@ -5,8 +5,9 @@ DEFAULT_DB_PATH = f"{CONFIG_DIR}/frigate.db"
MODEL_CACHE_DIR = f"{CONFIG_DIR}/model_cache"
BASE_DIR = "/media/frigate"
CLIPS_DIR = f"{BASE_DIR}/clips"
RECORD_DIR = f"{BASE_DIR}/recordings"
EXPORT_DIR = f"{BASE_DIR}/exports"
FACE_DIR = f"{CLIPS_DIR}/faces"
RECORD_DIR = f"{BASE_DIR}/recordings"
BIRDSEYE_PIPE = "/tmp/cache/birdseye"
CACHE_DIR = "/tmp/cache"
FRIGATE_LOCALHOST = "http://127.0.0.1:5000"

View File

@ -14,6 +14,7 @@ from frigate.comms.inter_process import InterProcessRequestor
from frigate.config.semantic_search import SemanticSearchConfig
from frigate.const import (
CONFIG_DIR,
FACE_DIR,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_MODEL_STATE,
)
@ -227,6 +228,16 @@ class Embeddings:
random.choices(string.ascii_lowercase + string.digits, k=6)
)
id = f"{label}-{rand_id}"
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.jpg")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail)
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_faces(id, face_embedding)

View File

@ -9,6 +9,7 @@ from typing import Optional
import cv2
import numpy as np
import requests
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
@ -20,7 +21,7 @@ from frigate.comms.event_metadata_updater import (
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, UPDATE_EVENT_DESCRIPTION
from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION
from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
from frigate.models import Event
@ -273,9 +274,7 @@ class EmbeddingMaintainer(threading.Thread):
WHERE face_embedding MATCH ?
AND k = 10 ORDER BY distance
"""
logger.info("doing a search")
results = self.embeddings.db.execute_sql(sql_query, [query_embedding]).fetchall()
logger.info(f"the search results are {results}")
return self.embeddings.db.execute_sql(sql_query, [query_embedding]).fetchall()
def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
"""Look for faces in image."""
@ -287,7 +286,7 @@ class EmbeddingMaintainer(threading.Thread):
if obj_data.get("sub_label"):
return
face = None
face: Optional[dict[str, any]] = None
if self.requires_face_detection:
# TODO run cv2 face detection
@ -297,8 +296,8 @@ class EmbeddingMaintainer(threading.Thread):
if not obj_data.get("current_attributes"):
return
for attr in obj_data.get("current_attributes", []):
logger.info(f"attribute is {attr}")
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
@ -306,8 +305,6 @@ class EmbeddingMaintainer(threading.Thread):
face = attr
# no faces detected in this frame
logger.info(f"face is detected as {face}")
if not face:
return
@ -318,25 +315,29 @@ class EmbeddingMaintainer(threading.Thread):
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
ret, jpg = cv2.imencode(".jpg", face_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if not ret:
return
# if face_frame is not None:
# cv2.imwrite(
# "/media/frigate/face_crop.webp",
# face_frame,
# [int(cv2.IMWRITE_WEBP_QUALITY), 60],
# )
embedding = self.embeddings.embed_face("nick", jpg.tobytes(), upsert=False)
embedding = self.embeddings.embed_face("unknown", jpg.tobytes(), upsert=False)
query_embedding = serialize(embedding)
best_faces = self._search_face(query_embedding)
logger.debug(f"Detected best faces for person as: {best_faces}")
# TODO compare embedding to faces in embeddings DB to fine cosine similarity
# TODO check against threshold and min score to see if best face qualifies
# TODO update tracked object
if not best_faces:
return
sub_label = str(best_faces[0][0]).split("-")[0]
score = 1.0 - best_faces[0][1]
if score < self.config.semantic_search.face_recognition.threshold:
return None
requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{obj_data['id']}/sub_label",
json={"subLabel": sub_label, "subLabelScore": score},
)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""