From f341960b23286abb3cfd6d9a932d1687ff658d4f Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 26 Mar 2025 18:35:17 -0600 Subject: [PATCH] Run tflite facenet model --- frigate/data_processing/common/face/model.py | 191 +++++++++++-------- frigate/data_processing/real_time/face.py | 4 +- frigate/embeddings/onnx/face_embedding.py | 114 ++++++++++- 3 files changed, 221 insertions(+), 88 deletions(-) diff --git a/frigate/data_processing/common/face/model.py b/frigate/data_processing/common/face/model.py index 1af934c5d..c0bbe1ed6 100644 --- a/frigate/data_processing/common/face/model.py +++ b/frigate/data_processing/common/face/model.py @@ -10,7 +10,7 @@ from scipy import stats from frigate.config import FrigateConfig from frigate.const import MODEL_CACHE_DIR -from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding +from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding, FaceNetEmbedding logger = logging.getLogger(__name__) @@ -124,83 +124,144 @@ class FaceRecognizer(ABC): return 1.0 -class LBPHRecognizer(FaceRecognizer): +def similarity_to_confidence( + cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12 +): + """ + Default sigmoid function to map cosine similarity to confidence. + + Args: + cosine_similarity (float): The input cosine similarity. + median (float): Assumed median of cosine similarity distribution. + range_width (float): Assumed range of cosine similarity distribution (90th percentile - 10th percentile). + slope_factor (float): Adjusts the steepness of the curve. + + Returns: + float: The confidence score. + """ + + # Calculate slope and bias + slope = slope_factor / range_width + bias = median + + # Calculate confidence + confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias))) + return confidence + + +class FaceNetRecognizer(FaceRecognizer): def __init__(self, config: FrigateConfig): super().__init__(config) - self.label_map: dict[int, str] = {} - self.recognizer: cv2.face.LBPHFaceRecognizer | None = None + self.mean_embs: dict[int, np.ndarray] = {} + self.face_embedder: FaceNetEmbedding = FaceNetEmbedding() + self.model_builder_queue: queue.Queue | None = None def clear(self) -> None: - self.face_recognizer = None - self.label_map = {} + self.mean_embs = {} + + def run_build_task(self) -> None: + self.model_builder_queue = queue.Queue() + + def build_model(): + face_embeddings_map: dict[str, list[np.ndarray]] = {} + idx = 0 + + dir = "/media/frigate/clips/faces" + for name in os.listdir(dir): + if name == "train": + continue + + face_folder = os.path.join(dir, name) + + if not os.path.isdir(face_folder): + continue + + face_embeddings_map[name] = [] + for image in os.listdir(face_folder): + img = cv2.imread(os.path.join(face_folder, image)) + + if img is None: + continue + + img = self.align_face(img, img.shape[1], img.shape[0]) + emb = self.face_embedder([img])[0].squeeze() + face_embeddings_map[name].append(emb) + + idx += 1 + + self.model_builder_queue.put(face_embeddings_map) + + thread = threading.Thread(target=build_model, daemon=True) + thread.start() def build(self): if not self.landmark_detector: self.init_landmark_detector() return None - labels = [] - faces = [] - idx = 0 - - dir = "/media/frigate/clips/faces" - for name in os.listdir(dir): - if name == "train": - continue - - face_folder = os.path.join(dir, name) - - if not os.path.isdir(face_folder): - continue - - self.label_map[idx] = name - for image in os.listdir(face_folder): - img = cv2.imread(os.path.join(face_folder, image)) - - if img is None: - continue - - img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - img = self.align_face(img, img.shape[1], img.shape[0]) - faces.append(img) - labels.append(idx) - - idx += 1 - - if not faces: + if self.model_builder_queue is not None: + try: + face_embeddings_map: dict[str, list[np.ndarray]] = ( + self.model_builder_queue.get(timeout=0.1) + ) + self.model_builder_queue = None + except queue.Empty: + return + else: + self.run_build_task() return - self.recognizer: cv2.face.LBPHFaceRecognizer = ( - cv2.face.LBPHFaceRecognizer_create(radius=2, threshold=400) - ) - self.recognizer.train(faces, np.array(labels)) + if not face_embeddings_map: + return - def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: + for name, embs in face_embeddings_map.items(): + if embs: + self.mean_embs[name] = stats.trim_mean(embs, 0.15) + + logger.debug("Finished building ArcFace model") + + def classify(self, face_image): if not self.landmark_detector: return None - if not self.label_map or not self.recognizer: + if not self.mean_embs: self.build() - if not self.recognizer: + if not self.mean_embs: return None # face recognition is best run on grayscale images - img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY) # get blur factor before aligning face - blur_factor = self.get_blur_factor(img) - logger.debug(f"face detected with bluriness {blur_factor}") + blur_factor = self.get_blur_factor(face_image) + logger.debug(f"face detected with blurriness {blur_factor}") # align face and run recognition - img = self.align_face(img, img.shape[1], img.shape[0]) - index, distance = self.recognizer.predict(img) + img = self.align_face(face_image, face_image.shape[1], face_image.shape[0]) + embedding = self.face_embedder([img])[0].squeeze() - if index == -1: - return None + score = 0 + label = "" - score = (1.0 - (distance / 1000)) * blur_factor - return self.label_map[index], round(score, 2) + for name, mean_emb in self.mean_embs.items(): + dot_product = np.dot(embedding, mean_emb) + magnitude_A = np.linalg.norm(embedding) + magnitude_B = np.linalg.norm(mean_emb) + + cosine_similarity = dot_product / (magnitude_A * magnitude_B) + confidence = similarity_to_confidence( + cosine_similarity, median=0.5, range_width=0.6 + ) + + print(f"got {cosine_similarity} -> {confidence} :: {name}") + + if confidence > score: + score = confidence + label = name + + print("===========================") + + return label, round(score * blur_factor, 2) class ArcFaceRecognizer(FaceRecognizer): @@ -274,30 +335,6 @@ class ArcFaceRecognizer(FaceRecognizer): logger.debug("Finished building ArcFace model") - def similarity_to_confidence( - self, cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12 - ): - """ - Default sigmoid function to map cosine similarity to confidence. - - Args: - cosine_similarity (float): The input cosine similarity. - median (float): Assumed median of cosine similarity distribution. - range_width (float): Assumed range of cosine similarity distribution (90th percentile - 10th percentile). - slope_factor (float): Adjusts the steepness of the curve. - - Returns: - float: The confidence score. - """ - - # Calculate slope and bias - slope = slope_factor / range_width - bias = median - - # Calculate confidence - confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias))) - return confidence - def classify(self, face_image): if not self.landmark_detector: return None @@ -312,7 +349,7 @@ class ArcFaceRecognizer(FaceRecognizer): # get blur factor before aligning face blur_factor = self.get_blur_factor(face_image) - logger.debug(f"face detected with bluriness {blur_factor}") + logger.debug(f"face detected with blurriness {blur_factor}") # align face and run recognition img = self.align_face(face_image, face_image.shape[1], face_image.shape[0]) @@ -327,7 +364,7 @@ class ArcFaceRecognizer(FaceRecognizer): magnitude_B = np.linalg.norm(mean_emb) cosine_similarity = dot_product / (magnitude_A * magnitude_B) - confidence = self.similarity_to_confidence(cosine_similarity) + confidence = similarity_to_confidence(cosine_similarity) if confidence > score: score = confidence diff --git a/frigate/data_processing/real_time/face.py b/frigate/data_processing/real_time/face.py index 9b479a527..0d9a5d437 100644 --- a/frigate/data_processing/real_time/face.py +++ b/frigate/data_processing/real_time/face.py @@ -21,8 +21,8 @@ from frigate.config import FrigateConfig from frigate.const import FACE_DIR, MODEL_CACHE_DIR from frigate.data_processing.common.face.model import ( ArcFaceRecognizer, + FaceNetRecognizer, FaceRecognizer, - LBPHRecognizer, ) from frigate.util.image import area @@ -78,7 +78,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.label_map: dict[int, str] = {} if self.face_config.model_size == "small": - self.recognizer = LBPHRecognizer(self.config) + self.recognizer = FaceNetRecognizer(self.config) else: self.recognizer = ArcFaceRecognizer(self.config) diff --git a/frigate/embeddings/onnx/face_embedding.py b/frigate/embeddings/onnx/face_embedding.py index 0b808f716..a2a3f1a8b 100644 --- a/frigate/embeddings/onnx/face_embedding.py +++ b/frigate/embeddings/onnx/face_embedding.py @@ -11,9 +11,105 @@ from frigate.util.downloader import ModelDownloader from .base_embedding import BaseEmbedding from .runner import ONNXModelRunner +try: + from tflite_runtime.interpreter import Interpreter +except ModuleNotFoundError: + from tensorflow.lite.python.interpreter import Interpreter + logger = logging.getLogger(__name__) -FACE_EMBEDDING_SIZE = 112 +ARCFACE_INPUT_SIZE = 112 +FACENET_INPUT_SIZE = 160 + + +class FaceNetEmbedding(BaseEmbedding): + def __init__( + self, + device: str = "AUTO", + ): + super().__init__( + model_name="facedet", + model_file="facenet.tflite", + download_urls={ + "facenet.tflite": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/arcface.onnx", + }, + ) + self.device = device + self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name) + self.tokenizer = None + self.feature_extractor = None + self.runner = None + files_names = list(self.download_urls.keys()) + + if not all( + os.path.exists(os.path.join(self.download_path, n)) for n in files_names + ): + logger.debug(f"starting model download for {self.model_name}") + self.downloader = ModelDownloader( + model_name=self.model_name, + download_path=self.download_path, + file_names=files_names, + download_func=self._download_model, + ) + self.downloader.ensure_model_files() + else: + self.downloader = None + self._load_model_and_utils() + logger.debug(f"models are already downloaded for {self.model_name}") + + def _load_model_and_utils(self): + if self.runner is None: + if self.downloader: + self.downloader.wait_for_download() + + self.runner = Interpreter( + model_path=os.path.join(MODEL_CACHE_DIR, "facedet/facenet.tflite"), + num_threads=2, + ) + self.runner.allocate_tensors() + self.tensor_input_details = self.runner.get_input_details() + self.tensor_output_details = self.runner.get_output_details() + + def _preprocess_inputs(self, raw_inputs): + pil = self._process_image(raw_inputs[0]) + + # handle images larger than input size + width, height = pil.size + if width != FACENET_INPUT_SIZE or height != FACENET_INPUT_SIZE: + if width > height: + new_height = int(((height / width) * FACENET_INPUT_SIZE) // 4 * 4) + pil = pil.resize((FACENET_INPUT_SIZE, new_height)) + else: + new_width = int(((width / height) * FACENET_INPUT_SIZE) // 4 * 4) + pil = pil.resize((new_width, FACENET_INPUT_SIZE)) + + og = np.array(pil).astype(np.float32) + + # Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE + og_h, og_w, channels = og.shape + frame = np.zeros( + (FACENET_INPUT_SIZE, FACENET_INPUT_SIZE, channels), dtype=np.float32 + ) + + # compute center offset + x_center = (FACENET_INPUT_SIZE - og_w) // 2 + y_center = (FACENET_INPUT_SIZE - og_h) // 2 + + # copy img image into center of result image + frame[y_center : y_center + og_h, x_center : x_center + og_w] = og + + # run facenet normalization + frame = (frame / 127.5) - 1.0 + + frame = np.expand_dims(frame, axis=0) + return frame + + def __call__(self, inputs): + self._load_model_and_utils() + processed = self._preprocess_inputs(inputs) + self.runner.set_tensor(self.tensor_input_details[0]["index"], processed) + self.runner.invoke() + return self.runner.get_tensor(self.tensor_output_details[0]["index"]) class ArcfaceEmbedding(BaseEmbedding): @@ -66,25 +162,25 @@ class ArcfaceEmbedding(BaseEmbedding): # handle images larger than input size width, height = pil.size - if width != FACE_EMBEDDING_SIZE or height != FACE_EMBEDDING_SIZE: + if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE: if width > height: - new_height = int(((height / width) * FACE_EMBEDDING_SIZE) // 4 * 4) - pil = pil.resize((FACE_EMBEDDING_SIZE, new_height)) + new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4) + pil = pil.resize((ARCFACE_INPUT_SIZE, new_height)) else: - new_width = int(((width / height) * FACE_EMBEDDING_SIZE) // 4 * 4) - pil = pil.resize((new_width, FACE_EMBEDDING_SIZE)) + new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4) + pil = pil.resize((new_width, ARCFACE_INPUT_SIZE)) og = np.array(pil).astype(np.float32) # Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE og_h, og_w, channels = og.shape frame = np.zeros( - (FACE_EMBEDDING_SIZE, FACE_EMBEDDING_SIZE, channels), dtype=np.float32 + (ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32 ) # compute center offset - x_center = (FACE_EMBEDDING_SIZE - og_w) // 2 - y_center = (FACE_EMBEDDING_SIZE - og_h) // 2 + x_center = (ARCFACE_INPUT_SIZE - og_w) // 2 + y_center = (ARCFACE_INPUT_SIZE - og_h) // 2 # copy img image into center of result image frame[y_center : y_center + og_h, x_center : x_center + og_w] = og