Define postprocessing api and move face processing to fit

This commit is contained in:
Nicolas Mowen 2025-01-10 07:36:09 -07:00
parent 8efdd8487a
commit 4586e4781f
4 changed files with 372 additions and 341 deletions

View File

@ -36,10 +36,10 @@ from frigate.embeddings.lpr.lpr import LicensePlateRecognition
from frigate.events.types import EventTypeEnum from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client from frigate.genai import get_genai_client
from frigate.models import Event from frigate.models import Event
from frigate.postprocessing.face_processor import FaceProcessor
from frigate.types import TrackedObjectUpdateTypesEnum from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize from frigate.util.builtin import serialize
from frigate.util.image import SharedMemoryFrameManager, area, calculate_region from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
from frigate.util.model import FaceClassificationModel
from .embeddings import Embeddings from .embeddings import Embeddings
from .types import EmbeddingsMetrics from .types import EmbeddingsMetrics
@ -77,12 +77,9 @@ class EmbeddingMaintainer(threading.Thread):
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
# set face recognition conditions # set face recognition conditions
self.face_recognition_enabled = self.config.face_recognition.enabled self.face_processor = (
self.requires_face_detection = "face" not in self.config.objects.all_objects FaceProcessor(self.config.face_recognition, db)
self.detected_faces: dict[str, float] = {} if self.config.face_recognition.enabled
self.face_classifier = (
FaceClassificationModel(self.config.face_recognition, db)
if self.face_recognition_enabled
else None else None
) )
@ -213,7 +210,7 @@ class EmbeddingMaintainer(threading.Thread):
# no need to process updated objects if face recognition, lpr, genai are disabled # no need to process updated objects if face recognition, lpr, genai are disabled
if ( if (
not camera_config.genai.enabled not camera_config.genai.enabled
and not self.face_recognition_enabled and not self.face_processor
and not self.lpr_config.enabled and not self.lpr_config.enabled
): ):
return return
@ -232,9 +229,9 @@ class EmbeddingMaintainer(threading.Thread):
) )
return return
if self.face_recognition_enabled: if self.face_processor:
start = datetime.datetime.now().timestamp() start = datetime.datetime.now().timestamp()
processed = self._process_face(data, yuv_frame) processed = self.face_processor.process_frame(data, yuv_frame)
if processed: if processed:
duration = datetime.datetime.now().timestamp() - start duration = datetime.datetime.now().timestamp() - start
@ -408,150 +405,6 @@ class EmbeddingMaintainer(threading.Thread):
if event_id: if event_id:
self.handle_regenerate_description(event_id, source) self.handle_regenerate_description(event_id, source)
def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Detect faces in input image."""
faces = self.face_classifier.detect_faces(input)
if faces is None or faces[1] is None:
return None
face = None
for _, potential_face in enumerate(faces[1]):
raw_bbox = potential_face[0:4].astype(np.uint16)
x: int = max(raw_bbox[0], 0)
y: int = max(raw_bbox[1], 0)
w: int = raw_bbox[2]
h: int = raw_bbox[3]
bbox = (x, y, x + w, y + h)
if face is None or area(bbox) > area(face):
face = bbox
return face
def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> bool:
"""Look for faces in image."""
id = obj_data["id"]
# don't run for non person objects
if obj_data.get("label") != "person":
logger.debug("Not a processing face for non person object.")
return False
# don't overwrite sub label for objects that have a sub label
# that is not a face
if obj_data.get("sub_label") and id not in self.detected_faces:
logger.debug(
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
)
return False
face: Optional[dict[str, any]] = None
if self.requires_face_detection:
logger.debug("Running manual face detection.")
person_box = obj_data.get("box")
if not person_box:
return False
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box
person = rgb[top:bottom, left:right]
face_box = self._detect_face(person)
if not face_box:
logger.debug("Detected no faces for person object.")
return False
face_frame = person[
max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return False
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr
# no faces detected in this frame
if not face:
return False
face_box = face.get("box")
# check that face is valid
if not face_box or area(face_box) < self.config.face_recognition.min_area:
logger.debug(f"Invalid face box {face}")
return False
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[
max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
res = self.face_classifier.classify_face(face_frame)
if not res:
return False
sub_label, score = res
# calculate the overall face score as the probability * area of face
# this will help to reduce false positives from small side-angle faces
# if a large front-on face image may have scored slightly lower but
# is more likely to be accurate due to the larger face area
face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2)
logger.debug(
f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}"
)
if self.config.face_recognition.save_attempts:
# write face to library
folder = os.path.join(FACE_DIR, "train")
file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp")
os.makedirs(folder, exist_ok=True)
cv2.imwrite(file, face_frame)
if score < self.config.face_recognition.threshold:
logger.debug(
f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}"
)
return True
if id in self.detected_faces and face_score <= self.detected_faces[id]:
logger.debug(
f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
)
return True
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": score,
},
)
if resp.status_code == 200:
self.detected_faces[id] = face_score
return True
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height].""" """Return the dimensions of the input image as [x, y, width, height]."""
height, width = input.shape[:2] height, width = input.shape[:2]

View File

@ -0,0 +1,346 @@
"""Handle processing images for face detection and recognition."""
import logging
import os
from typing import Optional
import cv2
import numpy as np
import requests
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config import FrigateConfig
from frigate.const import FACE_DIR, FRIGATE_LOCALHOST, MODEL_CACHE_DIR
from frigate.util.image import area
from .processor_api import ProcessorApi
logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2
class FaceProcessor(ProcessorApi):
def __init__(self, config: FrigateConfig, db: SqliteQueueDatabase):
super().__init__(config)
self.face_config = config.face_recognition
self.db = db
self.face_detector: cv2.FaceDetectorYN = None
self.landmark_detector: cv2.face.FacemarkLBF = None
self.face_recognizer: cv2.face.LBPHFaceRecognizer = None
self.requires_face_detection = "face" not in self.config.objects.all_objects
self.detected_faces: dict[str, float] = {}
download_path = os.path.join(MODEL_CACHE_DIR, "facedet")
self.model_files = {
"facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx",
"landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml",
}
if not all(
os.path.exists(os.path.join(download_path, n))
for n in self.model_files.keys()
):
# conditionally import ModelDownloader
from frigate.util.downloader import ModelDownloader
self.downloader = ModelDownloader(
model_name="facedet",
download_path=download_path,
file_names=self.model_files.keys(),
download_func=self.__download_models,
complete_func=self.__build_detector,
)
self.downloader.ensure_model_files()
else:
self.__build_detector()
self.label_map: dict[int, str] = {}
self.__build_classifier()
def __download_models(self, path: str) -> None:
try:
file_name = os.path.basename(path)
# conditionally import ModelDownloader
from frigate.util.downloader import ModelDownloader
ModelDownloader.download_from_url(self.model_files[file_name], path)
except Exception as e:
logger.error(f"Failed to download {path}: {e}")
def __build_detector(self) -> None:
self.face_detector = cv2.FaceDetectorYN.create(
"/config/model_cache/facedet/facedet.onnx",
config="",
input_size=(320, 320),
score_threshold=0.8,
nms_threshold=0.3,
)
self.landmark_detector = cv2.face.createFacemarkLBF()
self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
def __build_classifier(self) -> None:
if not self.landmark_detector:
return None
labels = []
faces = []
dir = "/media/frigate/clips/faces"
for idx, name in enumerate(os.listdir(dir)):
if name == "train":
continue
face_folder = os.path.join(dir, name)
if not os.path.isdir(face_folder):
continue
self.label_map[idx] = name
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
if img is None:
continue
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
faces.append(img)
labels.append(idx)
self.recognizer: cv2.face.LBPHFaceRecognizer = (
cv2.face.LBPHFaceRecognizer_create(
radius=2, threshold=(1 - self.face_config.min_score) * 1000
)
)
self.recognizer.train(faces, np.array(labels))
def __align_face(
self,
image: np.ndarray,
output_width: int,
output_height: int,
) -> np.ndarray:
_, lands = self.landmark_detector.fit(
image, np.array([(0, 0, image.shape[1], image.shape[0])])
)
landmarks: np.ndarray = lands[0][0]
# get landmarks for eyes
leftEyePts = landmarks[42:48]
rightEyePts = landmarks[36:42]
# compute the center of mass for each eye
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
# compute the angle between the eye centroids
dY = rightEyeCenter[1] - leftEyeCenter[1]
dX = rightEyeCenter[0] - leftEyeCenter[0]
angle = np.degrees(np.arctan2(dY, dX)) - 180
# compute the desired right eye x-coordinate based on the
# desired x-coordinate of the left eye
desiredRightEyeX = 1.0 - 0.35
# determine the scale of the new resulting image by taking
# the ratio of the distance between eyes in the *current*
# image to the ratio of distance between eyes in the
# *desired* image
dist = np.sqrt((dX**2) + (dY**2))
desiredDist = desiredRightEyeX - 0.35
desiredDist *= output_width
scale = desiredDist / dist
# compute center (x, y)-coordinates (i.e., the median point)
# between the two eyes in the input image
# grab the rotation matrix for rotating and scaling the face
eyesCenter = (
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
)
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
# update the translation component of the matrix
tX = output_width * 0.5
tY = output_height * 0.35
M[0, 2] += tX - eyesCenter[0]
M[1, 2] += tY - eyesCenter[1]
# apply the affine transformation
return cv2.warpAffine(
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
)
def clear_classifier(self) -> None:
self.face_recognizer = None
self.label_map = {}
def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None:
if not self.face_detector:
return None
self.face_detector.setInputSize((input.shape[1], input.shape[0]))
return self.face_detector.detect(input)
def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Detect faces in input image."""
faces = self.detect_faces(input)
if faces is None or faces[1] is None:
return None
face = None
for _, potential_face in enumerate(faces[1]):
raw_bbox = potential_face[0:4].astype(np.uint16)
x: int = max(raw_bbox[0], 0)
y: int = max(raw_bbox[1], 0)
w: int = raw_bbox[2]
h: int = raw_bbox[3]
bbox = (x, y, x + w, y + h)
if face is None or area(bbox) > area(face):
face = bbox
return face
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> bool:
"""Look for faces in image."""
id = obj_data["id"]
# don't run for non person objects
if obj_data.get("label") != "person":
logger.debug("Not a processing face for non person object.")
return False
# don't overwrite sub label for objects that have a sub label
# that is not a face
if obj_data.get("sub_label") and id not in self.detected_faces:
logger.debug(
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
)
return False
face: Optional[dict[str, any]] = None
if self.requires_face_detection:
logger.debug("Running manual face detection.")
person_box = obj_data.get("box")
if not person_box:
return False
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box
person = rgb[top:bottom, left:right]
face_box = self._detect_face(person)
if not face_box:
logger.debug("Detected no faces for person object.")
return False
face_frame = person[
max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return False
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr
# no faces detected in this frame
if not face:
return False
face_box = face.get("box")
# check that face is valid
if not face_box or area(face_box) < self.config.face_recognition.min_area:
logger.debug(f"Invalid face box {face}")
return False
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[
max(0, face_box[1]) : min(frame.shape[0], face_box[3]),
max(0, face_box[0]) : min(frame.shape[1], face_box[2]),
]
res = self.face_classifier.classify_face(face_frame)
if not res:
return False
sub_label, score = res
# calculate the overall face score as the probability * area of face
# this will help to reduce false positives from small side-angle faces
# if a large front-on face image may have scored slightly lower but
# is more likely to be accurate due to the larger face area
face_score = round(score * face_frame.shape[0] * face_frame.shape[1], 2)
logger.debug(
f"Detected best face for person as: {sub_label} with probability {score} and overall face score {face_score}"
)
if self.config.face_recognition.save_attempts:
# write face to library
folder = os.path.join(FACE_DIR, "train")
file = os.path.join(folder, f"{id}-{sub_label}-{score}-{face_score}.webp")
os.makedirs(folder, exist_ok=True)
cv2.imwrite(file, face_frame)
if score < self.config.face_recognition.threshold:
logger.debug(
f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}"
)
return True
if id in self.detected_faces and face_score <= self.detected_faces[id]:
logger.debug(
f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
)
return True
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": score,
},
)
if resp.status_code == 200:
self.detected_faces[id] = face_score
return True
def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector:
return None
if not self.label_map:
self.__build_classifier()
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if index == -1:
return None
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)

View File

@ -0,0 +1,19 @@
import logging
from abc import ABC, abstractmethod
import numpy as np
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
class ProcessorApi(ABC):
@abstractmethod
def __init__(self, config: FrigateConfig):
self.config = config
pass
@abstractmethod
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
pass

View File

@ -4,13 +4,7 @@ import logging
import os import os
from typing import Any from typing import Any
import cv2
import numpy as np
import onnxruntime as ort import onnxruntime as ort
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config.semantic_search import FaceRecognitionConfig
from frigate.const import MODEL_CACHE_DIR
try: try:
import openvino as ov import openvino as ov
@ -21,9 +15,6 @@ except ImportError:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MIN_MATCHING_FACES = 2
def get_ort_providers( def get_ort_providers(
force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False force_cpu: bool = False, device: str = "AUTO", requires_fp16: bool = False
) -> tuple[list[str], list[dict[str, any]]]: ) -> tuple[list[str], list[dict[str, any]]]:
@ -157,181 +148,3 @@ class ONNXModelRunner:
return [infer_request.get_output_tensor().data] return [infer_request.get_output_tensor().data]
elif self.type == "ort": elif self.type == "ort":
return self.ort.run(None, input) return self.ort.run(None, input)
class FaceClassificationModel:
def __init__(self, config: FaceRecognitionConfig, db: SqliteQueueDatabase):
self.config = config
self.db = db
self.face_detector: cv2.FaceDetectorYN = None
self.landmark_detector: cv2.face.FacemarkLBF = None
self.face_recognizer: cv2.face.LBPHFaceRecognizer = None
download_path = os.path.join(MODEL_CACHE_DIR, "facedet")
self.model_files = {
"facedet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facedet.onnx",
"landmarkdet.yaml": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/landmarkdet.yaml",
}
if not all(
os.path.exists(os.path.join(download_path, n))
for n in self.model_files.keys()
):
# conditionally import ModelDownloader
from frigate.util.downloader import ModelDownloader
self.downloader = ModelDownloader(
model_name="facedet",
download_path=download_path,
file_names=self.model_files.keys(),
download_func=self.__download_models,
complete_func=self.__build_detector,
)
self.downloader.ensure_model_files()
else:
self.__build_detector()
self.label_map: dict[int, str] = {}
self.__build_classifier()
def __download_models(self, path: str) -> None:
try:
file_name = os.path.basename(path)
# conditionally import ModelDownloader
from frigate.util.downloader import ModelDownloader
ModelDownloader.download_from_url(self.model_files[file_name], path)
except Exception as e:
logger.error(f"Failed to download {path}: {e}")
def __build_detector(self) -> None:
self.face_detector = cv2.FaceDetectorYN.create(
"/config/model_cache/facedet/facedet.onnx",
config="",
input_size=(320, 320),
score_threshold=0.8,
nms_threshold=0.3,
)
self.landmark_detector = cv2.face.createFacemarkLBF()
self.landmark_detector.loadModel("/config/model_cache/facedet/landmarkdet.yaml")
def __build_classifier(self) -> None:
if not self.landmark_detector:
return None
labels = []
faces = []
dir = "/media/frigate/clips/faces"
for idx, name in enumerate(os.listdir(dir)):
if name == "train":
continue
face_folder = os.path.join(dir, name)
if not os.path.isdir(face_folder):
continue
self.label_map[idx] = name
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
if img is None:
continue
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
faces.append(img)
labels.append(idx)
self.recognizer: cv2.face.LBPHFaceRecognizer = (
cv2.face.LBPHFaceRecognizer_create(
radius=2, threshold=(1 - self.config.min_score) * 1000
)
)
self.recognizer.train(faces, np.array(labels))
def __align_face(
self,
image: np.ndarray,
output_width: int,
output_height: int,
) -> np.ndarray:
_, lands = self.landmark_detector.fit(
image, np.array([(0, 0, image.shape[1], image.shape[0])])
)
landmarks = lands[0][0]
# get landmarks for eyes
leftEyePts = landmarks[42:48]
rightEyePts = landmarks[36:42]
# compute the center of mass for each eye
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
# compute the angle between the eye centroids
dY = rightEyeCenter[1] - leftEyeCenter[1]
dX = rightEyeCenter[0] - leftEyeCenter[0]
angle = np.degrees(np.arctan2(dY, dX)) - 180
# compute the desired right eye x-coordinate based on the
# desired x-coordinate of the left eye
desiredRightEyeX = 1.0 - 0.35
# determine the scale of the new resulting image by taking
# the ratio of the distance between eyes in the *current*
# image to the ratio of distance between eyes in the
# *desired* image
dist = np.sqrt((dX**2) + (dY**2))
desiredDist = desiredRightEyeX - 0.35
desiredDist *= output_width
scale = desiredDist / dist
# compute center (x, y)-coordinates (i.e., the median point)
# between the two eyes in the input image
# grab the rotation matrix for rotating and scaling the face
eyesCenter = (
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
)
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
# update the translation component of the matrix
tX = output_width * 0.5
tY = output_height * 0.35
M[0, 2] += tX - eyesCenter[0]
M[1, 2] += tY - eyesCenter[1]
# apply the affine transformation
return cv2.warpAffine(
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
)
def clear_classifier(self) -> None:
self.face_recognizer = None
self.label_map = {}
def detect_faces(self, input: np.ndarray) -> tuple[int, cv2.typing.MatLike] | None:
if not self.face_detector:
return None
self.face_detector.setInputSize((input.shape[1], input.shape[0]))
return self.face_detector.detect(input)
def classify_face(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector:
return None
if not self.label_map:
self.__build_classifier()
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if index == -1:
return None
score = 1.0 - (distance / 1000)
return self.label_map[index], round(score, 2)