Remove opencv and use facenet for small model

This commit is contained in:
Nicolas Mowen 2025-03-26 11:07:17 -06:00
parent e6936c177b
commit dcdbec7577
5 changed files with 289 additions and 150 deletions

View File

@ -34,7 +34,6 @@ unidecode == 1.3.*
# Image Manipulation # Image Manipulation
numpy == 1.26.* numpy == 1.26.*
opencv-python-headless == 4.11.0.* opencv-python-headless == 4.11.0.*
opencv-contrib-python == 4.11.0.*
scipy == 1.14.* scipy == 1.14.*
# OpenVino & ONNX # OpenVino & ONNX
openvino == 2024.4.* openvino == 2024.4.*

View File

@ -10,7 +10,7 @@ from scipy import stats
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import MODEL_CACHE_DIR from frigate.const import MODEL_CACHE_DIR
from frigate.embeddings.onnx.facenet import ArcfaceEmbedding from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding, FaceNetEmbedding
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -124,83 +124,142 @@ class FaceRecognizer(ABC):
return 1.0 return 1.0
class LBPHRecognizer(FaceRecognizer): class FaceNetRecognizer(FaceRecognizer):
def __init__(self, config: FrigateConfig): def __init__(self, config: FrigateConfig):
super().__init__(config) super().__init__(config)
self.label_map: dict[int, str] = {} self.mean_embs: dict[int, np.ndarray] = {}
self.recognizer: cv2.face.LBPHFaceRecognizer | None = None self.face_embedder: FaceNetEmbedding = FaceNetEmbedding()
self.model_builder_queue: queue.Queue | None = None
def clear(self) -> None: def clear(self) -> None:
self.face_recognizer = None self.mean_embs = {}
self.label_map = {}
def run_build_task(self) -> None:
self.model_builder_queue = queue.Queue()
def build_model():
face_embeddings_map: dict[str, list[np.ndarray]] = {}
idx = 0
dir = "/media/frigate/clips/faces"
for name in os.listdir(dir):
if name == "train":
continue
face_folder = os.path.join(dir, name)
if not os.path.isdir(face_folder):
continue
face_embeddings_map[name] = []
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
if img is None:
continue
img = self.align_face(img, img.shape[1], img.shape[0])
emb = self.face_embedder([img])[0].squeeze()
face_embeddings_map[name].append(emb)
idx += 1
self.model_builder_queue.put(face_embeddings_map)
thread = threading.Thread(target=build_model, daemon=True)
thread.start()
def build(self): def build(self):
if not self.landmark_detector: if not self.landmark_detector:
self.init_landmark_detector() self.init_landmark_detector()
return None return None
labels = [] if self.model_builder_queue is not None:
faces = [] try:
idx = 0 face_embeddings_map: dict[str, list[np.ndarray]] = (
self.model_builder_queue.get(timeout=0.1)
dir = "/media/frigate/clips/faces" )
for name in os.listdir(dir): self.model_builder_queue = None
if name == "train": except queue.Empty:
continue return
else:
face_folder = os.path.join(dir, name) self.run_build_task()
if not os.path.isdir(face_folder):
continue
self.label_map[idx] = name
for image in os.listdir(face_folder):
img = cv2.imread(os.path.join(face_folder, image))
if img is None:
continue
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = self.align_face(img, img.shape[1], img.shape[0])
faces.append(img)
labels.append(idx)
idx += 1
if not faces:
return return
self.recognizer: cv2.face.LBPHFaceRecognizer = ( if not face_embeddings_map:
cv2.face.LBPHFaceRecognizer_create(radius=2, threshold=400) return
)
self.recognizer.train(faces, np.array(labels))
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: for name, embs in face_embeddings_map.items():
if embs:
self.mean_embs[name] = stats.trim_mean(embs, 0.3)
logger.debug("Finished building FaceNet model")
def similarity_to_confidence(
self, cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12
):
"""
Default sigmoid function to map cosine similarity to confidence.
Args:
cosine_similarity (float): The input cosine similarity.
median (float): Assumed median of cosine similarity distribution.
range_width (float): Assumed range of cosine similarity distribution (90th percentile - 10th percentile).
slope_factor (float): Adjusts the steepness of the curve.
Returns:
float: The confidence score.
"""
# Calculate slope and bias
slope = slope_factor / range_width
bias = median
# Calculate confidence
confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
return confidence
def classify(self, face_image):
if not self.landmark_detector: if not self.landmark_detector:
return None return None
if not self.label_map or not self.recognizer: if not self.mean_embs:
self.build() self.build()
if not self.recognizer: if not self.mean_embs:
return None return None
# face recognition is best run on grayscale images # face recognition is best run on grayscale images
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
# get blur factor before aligning face # get blur factor before aligning face
blur_factor = self.get_blur_factor(img) blur_factor = self.get_blur_factor(face_image)
logger.debug(f"face detected with bluriness {blur_factor}") logger.debug(f"face detected with bluriness {blur_factor}")
# align face and run recognition # align face and run recognition
img = self.align_face(img, img.shape[1], img.shape[0]) img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
index, distance = self.recognizer.predict(img) embedding = self.face_embedder([img])[0].squeeze()
if index == -1: score = 0
label = ""
for name, mean_emb in self.mean_embs.items():
dot_product = np.dot(embedding, mean_emb)
magnitude_A = np.linalg.norm(embedding)
magnitude_B = np.linalg.norm(mean_emb)
cosine_similarity = dot_product / (magnitude_A * magnitude_B)
confidence = self.similarity_to_confidence(
cosine_similarity, median=0.7, range_width=0.6, slope_factor=10
)
if cosine_similarity > score:
score = confidence
label = name
if score < 0.4:
return None return None
score = (1.0 - (distance / 1000)) * blur_factor return label, round(score * blur_factor, 2)
return self.label_map[index], round(score, 2)
class ArcFaceRecognizer(FaceRecognizer): class ArcFaceRecognizer(FaceRecognizer):

View File

@ -21,8 +21,8 @@ from frigate.config import FrigateConfig
from frigate.const import FACE_DIR, MODEL_CACHE_DIR from frigate.const import FACE_DIR, MODEL_CACHE_DIR
from frigate.data_processing.common.face.model import ( from frigate.data_processing.common.face.model import (
ArcFaceRecognizer, ArcFaceRecognizer,
FaceNetRecognizer,
FaceRecognizer, FaceRecognizer,
LBPHRecognizer,
) )
from frigate.util.image import area from frigate.util.image import area
@ -78,7 +78,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.label_map: dict[int, str] = {} self.label_map: dict[int, str] = {}
if self.face_config.model_size == "small": if self.face_config.model_size == "small":
self.recognizer = LBPHRecognizer(self.config) self.recognizer = FaceNetRecognizer(self.config)
else: else:
self.recognizer = ArcFaceRecognizer(self.config) self.recognizer = ArcFaceRecognizer(self.config)

View File

@ -0,0 +1,179 @@
"""Facenet Embeddings."""
import logging
import os
import numpy as np
from frigate.const import MODEL_CACHE_DIR
from frigate.util.downloader import ModelDownloader
from .base_embedding import BaseEmbedding
from .runner import ONNXModelRunner
logger = logging.getLogger(__name__)
ARCFACE_INPUT_SIZE = 112
FACENET_INPUT_SIZE = 160
class FaceNetEmbedding(BaseEmbedding):
def __init__(
self,
device: str = "AUTO",
):
super().__init__(
model_name="facedet",
model_file="facenet.onnx",
download_urls={
"facenet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/arcface.onnx",
},
)
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.tokenizer = None
self.feature_extractor = None
self.runner = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
)
def _preprocess_inputs(self, raw_inputs):
pil = self._process_image(raw_inputs[0])
# handle images larger than input size
width, height = pil.size
if width != FACENET_INPUT_SIZE or height != FACENET_INPUT_SIZE:
if width > height:
new_height = int(((height / width) * FACENET_INPUT_SIZE) // 4 * 4)
pil = pil.resize((FACENET_INPUT_SIZE, new_height))
else:
new_width = int(((width / height) * FACENET_INPUT_SIZE) // 4 * 4)
pil = pil.resize((new_width, FACENET_INPUT_SIZE))
og = np.array(pil).astype(np.float32)
# Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE
og_h, og_w, channels = og.shape
frame = np.zeros(
(FACENET_INPUT_SIZE, FACENET_INPUT_SIZE, channels), dtype=np.float32
)
# compute center offset
x_center = (FACENET_INPUT_SIZE - og_w) // 2
y_center = (FACENET_INPUT_SIZE - og_h) // 2
# copy img image into center of result image
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
# run facenet normalization
frame = frame / 255.0
frame = np.expand_dims(frame, axis=0)
return [{"image_input": frame}]
class ArcfaceEmbedding(BaseEmbedding):
def __init__(
self,
device: str = "AUTO",
):
super().__init__(
model_name="facedet",
model_file="arcface.onnx",
download_urls={
"arcface.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/arcface.onnx",
},
)
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.tokenizer = None
self.feature_extractor = None
self.runner = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
)
def _preprocess_inputs(self, raw_inputs):
pil = self._process_image(raw_inputs[0])
# handle images larger than input size
width, height = pil.size
if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE:
if width > height:
new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4)
pil = pil.resize((ARCFACE_INPUT_SIZE, new_height))
else:
new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4)
pil = pil.resize((new_width, ARCFACE_INPUT_SIZE))
og = np.array(pil).astype(np.float32)
# Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE
og_h, og_w, channels = og.shape
frame = np.zeros(
(ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32
)
# compute center offset
x_center = (ARCFACE_INPUT_SIZE - og_w) // 2
y_center = (ARCFACE_INPUT_SIZE - og_h) // 2
# copy img image into center of result image
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
# run arcface normalization
normalized_image = frame.astype(np.float32) / 255.0
frame = (normalized_image - 0.5) / 0.5
frame = np.transpose(frame, (2, 0, 1))
frame = np.expand_dims(frame, axis=0)
return [{"data": frame}]

View File

@ -1,98 +0,0 @@
"""Facenet Embeddings."""
import logging
import os
import numpy as np
from frigate.const import MODEL_CACHE_DIR
from frigate.util.downloader import ModelDownloader
from .base_embedding import BaseEmbedding
from .runner import ONNXModelRunner
logger = logging.getLogger(__name__)
FACE_EMBEDDING_SIZE = 112
class ArcfaceEmbedding(BaseEmbedding):
def __init__(
self,
device: str = "AUTO",
):
super().__init__(
model_name="facedet",
model_file="arcface.onnx",
download_urls={
"arcface.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/arcface.onnx",
},
)
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
self.tokenizer = None
self.feature_extractor = None
self.runner = None
files_names = list(self.download_urls.keys())
if not all(
os.path.exists(os.path.join(self.download_path, n)) for n in files_names
):
logger.debug(f"starting model download for {self.model_name}")
self.downloader = ModelDownloader(
model_name=self.model_name,
download_path=self.download_path,
file_names=files_names,
download_func=self._download_model,
)
self.downloader.ensure_model_files()
else:
self.downloader = None
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
)
def _preprocess_inputs(self, raw_inputs):
pil = self._process_image(raw_inputs[0])
# handle images larger than input size
width, height = pil.size
if width != FACE_EMBEDDING_SIZE or height != FACE_EMBEDDING_SIZE:
if width > height:
new_height = int(((height / width) * FACE_EMBEDDING_SIZE) // 4 * 4)
pil = pil.resize((FACE_EMBEDDING_SIZE, new_height))
else:
new_width = int(((width / height) * FACE_EMBEDDING_SIZE) // 4 * 4)
pil = pil.resize((new_width, FACE_EMBEDDING_SIZE))
og = np.array(pil).astype(np.float32)
# Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE
og_h, og_w, channels = og.shape
frame = np.zeros(
(FACE_EMBEDDING_SIZE, FACE_EMBEDDING_SIZE, channels), dtype=np.float32
)
# compute center offset
x_center = (FACE_EMBEDDING_SIZE - og_w) // 2
y_center = (FACE_EMBEDDING_SIZE - og_h) // 2
# copy img image into center of result image
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
# run arcface normalization
normalized_image = frame.astype(np.float32) / 255.0
frame = (normalized_image - 0.5) / 0.5
frame = np.transpose(frame, (2, 0, 1))
frame = np.expand_dims(frame, axis=0)
return [{"data": frame}]