Standardize request handling

This commit is contained in:
Nicolas Mowen 2025-01-10 07:48:59 -07:00
parent 4586e4781f
commit c78d859379
4 changed files with 110 additions and 81 deletions

View File

@ -39,6 +39,12 @@ def get_faces():
@router.post("/faces/{name}")
async def register_face(request: Request, name: str, file: UploadFile):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
status_code=400,
content={"message": "Face recognition is not enabled.", "success": False},
)
context: EmbeddingsContext = request.app.embeddings
result = context.register_face(name, await file.read())
return JSONResponse(
@ -48,7 +54,13 @@ async def register_face(request: Request, name: str, file: UploadFile):
@router.post("/faces/train/{name}/classify")
def train_face(name: str, body: dict = None):
def train_face(request: Request, name: str, body: dict = None):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
status_code=400,
content={"message": "Face recognition is not enabled.", "success": False},
)
json: dict[str, any] = body or {}
training_file = os.path.join(
FACE_DIR, f"train/{sanitize_filename(json.get('training_file', ''))}"
@ -82,6 +94,12 @@ def train_face(name: str, body: dict = None):
@router.post("/faces/{name}/delete")
def deregister_faces(request: Request, name: str, body: dict = None):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
status_code=400,
content={"message": "Face recognition is not enabled.", "success": False},
)
json: dict[str, any] = body or {}
list_of_ids = json.get("ids", "")

View File

@ -4,9 +4,7 @@ import base64
import datetime
import logging
import os
import random
import re
import string
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
@ -28,7 +26,6 @@ from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import (
CLIPS_DIR,
FACE_DIR,
FRIGATE_LOCALHOST,
UPDATE_EVENT_DESCRIPTION,
)
@ -37,6 +34,7 @@ from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
from frigate.models import Event
from frigate.postprocessing.face_processor import FaceProcessor
from frigate.postprocessing.processor_api import ProcessorApi
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
@ -75,13 +73,10 @@ class EmbeddingMaintainer(threading.Thread):
)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
self.processors: list[ProcessorApi] = []
# set face recognition conditions
self.face_processor = (
FaceProcessor(self.config.face_recognition, db)
if self.config.face_recognition.enabled
else None
)
if self.config.face_recognition.enabled:
self.processors.append(FaceProcessor(self.config.face_recognition, db))
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
@ -139,55 +134,12 @@ class EmbeddingMaintainer(threading.Thread):
self.embeddings.embed_description("", data, upsert=False),
pack=False,
)
elif topic == EmbeddingsRequestEnum.register_face.value:
if not self.face_recognition_enabled:
return {
"message": "Face recognition is not enabled.",
"success": False,
}
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
label = data["face_name"]
id = f"{label}-{rand_id}"
if data.get("cropped"):
pass
else:
img = cv2.imdecode(
np.frombuffer(
base64.b64decode(data["image"]), dtype=np.uint8
),
cv2.IMREAD_COLOR,
)
face_box = self._detect_face(img)
for processor in self.processors:
resp = processor.handle_request(data)
if not face_box:
return {
"message": "No face was detected.",
"success": False,
}
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, thumbnail = cv2.imencode(
".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail.tobytes())
self.face_classifier.clear_classifier()
return {
"message": "Successfully registered face.",
"success": True,
}
if resp is not None:
return resp
except Exception as e:
logger.error(f"Unable to handle embeddings request {e}")

View File

@ -1,7 +1,10 @@
"""Handle processing images for face detection and recognition."""
import base64
import logging
import os
import random
import string
from typing import Optional
import cv2
@ -173,20 +176,17 @@ class FaceProcessor(ProcessorApi):
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
)
def clear_classifier(self) -> None:
def __clear_classifier(self) -> None:
self.face_recognizer = None
self.label_map = {}
def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None:
def __detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Detect faces in input image."""
if not self.face_detector:
return None
self.face_detector.setInputSize((input.shape[1], input.shape[0]))
return self.face_detector.detect(input)
def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Detect faces in input image."""
faces = self.detect_faces(input)
faces = self.face_detector.detect(input)
if faces is None or faces[1] is None:
return None
@ -206,6 +206,23 @@ class FaceProcessor(ProcessorApi):
return face
def __classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector:
return None
if not self.label_map:
self.__build_classifier()
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if index == -1:
return None
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> bool:
"""Look for faces in image."""
id = obj_data["id"]
@ -235,7 +252,7 @@ class FaceProcessor(ProcessorApi):
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box
person = rgb[top:bottom, left:right]
face_box = self._detect_face(person)
face_box = self.__detect_face(person)
if not face_box:
logger.debug("Detected no faces for person object.")
@ -278,7 +295,7 @@ class FaceProcessor(ProcessorApi):
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
res = self.face_classifier.classify_face(face_frame)
res = self.__classify_face(face_frame)
if not res:
return False
@ -328,19 +345,42 @@ class FaceProcessor(ProcessorApi):
return True
def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector:
return None
def handle_request(self, request_data) -> dict[str, any] | None:
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
label = request_data["face_name"]
id = f"{label}-{rand_id}"
if not self.label_map:
self.__build_classifier()
if request_data.get("cropped"):
thumbnail = request_data["image"]
else:
img = cv2.imdecode(
np.frombuffer(base64.b64decode(request_data["image"]), dtype=np.uint8),
cv2.IMREAD_COLOR,
)
face_box = self.__detect_face(img)
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if not face_box:
return {
"message": "No face was detected.",
"success": False,
}
if index == -1:
return None
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, thumbnail = cv2.imencode(
".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail.tobytes())
self.__clear_classifier()
return {
"message": "Successfully registered face.",
"success": True,
}

View File

@ -10,10 +10,29 @@ logger = logging.getLogger(__name__)
class ProcessorApi(ABC):
@abstractmethod
def __init__(self, config: FrigateConfig):
def __init__(self, config: FrigateConfig) -> None:
self.config = config
pass
@abstractmethod
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
"""Processes the frame with object data.
Args:
obj_data (dict): containing data about focused object in frame.
frame (ndarray): full yuv frame.
Returns:
None.
"""
pass
@abstractmethod
def handle_request(self, request_data: dict[str, any]) -> any | None:
"""Handle metadata requests.
Args:
request_data (dict): containing data about requested change to process.
Returns:
None if request was not handled, otherwise return response.
"""
pass