mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-08 06:25:27 +03:00
Compare commits
8 Commits
7d7df9eb57
...
6dc81a6a0c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6dc81a6a0c | ||
|
|
434ef358a2 | ||
|
|
fe269b77b8 | ||
|
|
0f3dd097ec | ||
|
|
2a4d7e4766 | ||
|
|
46415ffeb5 | ||
|
|
e35ab0b8a1 | ||
|
|
837373547d |
7
Makefile
7
Makefile
@ -21,6 +21,13 @@ local: version
|
||||
--tag frigate:latest \
|
||||
--load
|
||||
|
||||
localh10: version
|
||||
docker buildx build --target=frigate --file docker/main/Dockerfile . \
|
||||
--build-arg HAILORT_VERSION=5.1.1 \
|
||||
--build-arg HAILORT_GIT_REPO=mathieu-d/hailort \
|
||||
--tag frigate:latest \
|
||||
--load
|
||||
|
||||
debug: version
|
||||
docker buildx build --target=frigate --file docker/main/Dockerfile . \
|
||||
--build-arg DEBUG=true \
|
||||
|
||||
@ -12,6 +12,11 @@ services:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: docker/main/Dockerfile
|
||||
# Use args to specify hailort version and location
|
||||
# args:
|
||||
# HAILORT_VERSION: "5.1.1"
|
||||
# HAILORT_GIT_REPO: "mathieu-d/hailort"
|
||||
|
||||
# Use target devcontainer-trt for TensorRT dev
|
||||
target: devcontainer
|
||||
cache_from:
|
||||
@ -29,6 +34,7 @@ services:
|
||||
# devices:
|
||||
# - /dev/bus/usb:/dev/bus/usb # Uncomment for Google Coral USB
|
||||
# - /dev/dri:/dev/dri # for intel hwaccel, needs to be updated for your hardware
|
||||
|
||||
volumes:
|
||||
- .:/workspace/frigate:cached
|
||||
- ./web/dist:/opt/frigate/web:cached
|
||||
|
||||
7
docker/hailo10h/user_installation.sh
Normal file
7
docker/hailo10h/user_installation.sh
Normal file
@ -0,0 +1,7 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Update package list and install hailo driver version 5.1.1 for Hailo-10H
|
||||
sudo apt update
|
||||
sudo apt install -y hailo-h10-all=5.1.1
|
||||
|
||||
|
||||
@ -157,6 +157,8 @@ FROM base AS wheels
|
||||
ARG DEBIAN_FRONTEND
|
||||
ARG TARGETARCH
|
||||
ARG DEBUG=false
|
||||
ARG HAILORT_VERSION=4.21.0
|
||||
ARG HAILORT_GIT_REPO=frigate-nvr/hailort
|
||||
|
||||
# Use a separate container to build wheels to prevent build dependencies in final image
|
||||
RUN apt-get -qq update \
|
||||
|
||||
@ -2,13 +2,11 @@
|
||||
|
||||
set -euxo pipefail
|
||||
|
||||
hailo_version="4.21.0"
|
||||
|
||||
if [[ "${TARGETARCH}" == "amd64" ]]; then
|
||||
arch="x86_64"
|
||||
elif [[ "${TARGETARCH}" == "arm64" ]]; then
|
||||
arch="aarch64"
|
||||
fi
|
||||
|
||||
wget -qO- "https://github.com/frigate-nvr/hailort/releases/download/v${hailo_version}/hailort-debian12-${TARGETARCH}.tar.gz" | tar -C / -xzf -
|
||||
wget -P /wheels/ "https://github.com/frigate-nvr/hailort/releases/download/v${hailo_version}/hailort-${hailo_version}-cp311-cp311-linux_${arch}.whl"
|
||||
wget -qO- "https://github.com/${HAILORT_GIT_REPO}/releases/download/v${HAILORT_VERSION}/hailort-debian12-${TARGETARCH}.tar.gz" | tar -C / -xzf -
|
||||
wget -P /wheels/ "https://github.com/${HAILORT_GIT_REPO}/releases/download/v${HAILORT_VERSION}/hailort-${HAILORT_VERSION}-cp311-cp311-linux_${arch}.whl"
|
||||
|
||||
@ -133,6 +133,61 @@ class FaceRecognizer(ABC):
|
||||
return 0.0
|
||||
|
||||
|
||||
def build_class_mean(
|
||||
embs: list[np.ndarray],
|
||||
trim: float = 0.15,
|
||||
outlier_threshold: float = 0.30,
|
||||
min_keep_frac: float = 0.7,
|
||||
max_iters: int = 3,
|
||||
) -> np.ndarray:
|
||||
"""Build a class-mean embedding with two-layer outlier protection.
|
||||
|
||||
Layer 1 (iterative, vector-wise): drop whole embeddings whose cosine
|
||||
similarity to the current class mean is below ``outlier_threshold``.
|
||||
Catches mislabeled or corrupted training samples (wrong face in the
|
||||
folder, full-frame screenshots, extreme crops) that per-dimension
|
||||
trimming cannot detect.
|
||||
|
||||
Layer 2 (per-dimension): ``scipy.stats.trim_mean`` on the retained set
|
||||
to smooth per-component noise (lighting, expression, alignment jitter).
|
||||
|
||||
Collections with fewer than 5 images bypass outlier rejection — too few
|
||||
samples to establish a reliable class center.
|
||||
"""
|
||||
arr = np.stack(embs, axis=0)
|
||||
|
||||
if len(arr) < 5:
|
||||
return np.asarray(stats.trim_mean(arr, trim, axis=0))
|
||||
|
||||
keep = np.ones(len(arr), dtype=bool)
|
||||
floor = max(5, int(np.ceil(min_keep_frac * len(arr))))
|
||||
|
||||
for _ in range(max_iters):
|
||||
mean = stats.trim_mean(arr[keep], trim, axis=0)
|
||||
m_norm = mean / (np.linalg.norm(mean) + 1e-9)
|
||||
e_norms = arr / (np.linalg.norm(arr, axis=1, keepdims=True) + 1e-9)
|
||||
cos = e_norms @ m_norm
|
||||
new_keep = cos >= outlier_threshold
|
||||
|
||||
if new_keep.sum() < floor:
|
||||
top = np.argsort(-cos)[:floor]
|
||||
new_keep = np.zeros(len(arr), dtype=bool)
|
||||
new_keep[top] = True
|
||||
|
||||
if np.array_equal(new_keep, keep):
|
||||
break
|
||||
keep = new_keep
|
||||
|
||||
dropped = int((~keep).sum())
|
||||
|
||||
if dropped:
|
||||
logger.debug(
|
||||
f"Vector-wise outlier filter dropped {dropped}/{len(arr)} embeddings"
|
||||
)
|
||||
|
||||
return np.asarray(stats.trim_mean(arr[keep], trim, axis=0))
|
||||
|
||||
|
||||
def similarity_to_confidence(
|
||||
cosine_similarity: float,
|
||||
median: float = 0.3,
|
||||
@ -229,7 +284,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
||||
|
||||
for name, embs in face_embeddings_map.items():
|
||||
if embs:
|
||||
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
|
||||
self.mean_embs[name] = build_class_mean(embs)
|
||||
|
||||
logger.debug("Finished building ArcFace model")
|
||||
|
||||
@ -340,7 +395,7 @@ class ArcFaceRecognizer(FaceRecognizer):
|
||||
|
||||
for name, embs in face_embeddings_map.items():
|
||||
if embs:
|
||||
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
|
||||
self.mean_embs[name] = build_class_mean(embs)
|
||||
|
||||
logger.debug("Finished building ArcFace model")
|
||||
|
||||
|
||||
415
frigate/detectors/plugins/hailo10h.py
Executable file
415
frigate/detectors/plugins/hailo10h.py
Executable file
@ -0,0 +1,415 @@
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import urllib.request
|
||||
from functools import partial
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from pydantic import ConfigDict, Field
|
||||
from typing_extensions import Literal
|
||||
|
||||
from frigate.const import MODEL_CACHE_DIR
|
||||
from frigate.detectors.detection_api import DetectionApi
|
||||
from frigate.detectors.detector_config import (
|
||||
BaseDetectorConfig,
|
||||
)
|
||||
from frigate.object_detection.util import RequestStore, ResponseStore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ----------------- Utility Functions ----------------- #
|
||||
|
||||
|
||||
def preprocess_tensor(image: np.ndarray, model_w: int, model_h: int) -> np.ndarray:
|
||||
"""
|
||||
Resize an image with unchanged aspect ratio using padding.
|
||||
Assumes input image shape is (H, W, 3).
|
||||
"""
|
||||
if image.ndim == 4 and image.shape[0] == 1:
|
||||
image = image[0]
|
||||
|
||||
h, w = image.shape[:2]
|
||||
scale = min(model_w / w, model_h / h)
|
||||
new_w, new_h = int(w * scale), int(h * scale)
|
||||
resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
|
||||
padded_image = np.full((model_h, model_w, 3), 114, dtype=image.dtype)
|
||||
x_offset = (model_w - new_w) // 2
|
||||
y_offset = (model_h - new_h) // 2
|
||||
padded_image[y_offset : y_offset + new_h, x_offset : x_offset + new_w] = (
|
||||
resized_image
|
||||
)
|
||||
return padded_image
|
||||
|
||||
|
||||
# ----------------- Global Constants ----------------- #
|
||||
DETECTOR_KEY = "hailo10h"
|
||||
ARCH = None
|
||||
H10H_DEFAULT_MODEL = "yolov6n.hef"
|
||||
H10H_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v5.2.0/hailo10h/yolov6n.hef"
|
||||
|
||||
|
||||
def detect_hailo_arch():
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["hailortcli", "fw-control", "identify"], capture_output=True, text=True
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error(f"Inference error: {result.stderr}")
|
||||
return None
|
||||
for line in result.stdout.split("\n"):
|
||||
if "Device Architecture" in line:
|
||||
if "HAILO10H" in line:
|
||||
return "hailo10h"
|
||||
logger.error("Inference error: Could not determine Hailo architecture.")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Inference error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ----------------- HailoAsyncInference Class ----------------- #
|
||||
class HailoAsyncInference:
|
||||
def __init__(
|
||||
self,
|
||||
hef_path: str,
|
||||
input_store: RequestStore,
|
||||
output_store: ResponseStore,
|
||||
batch_size: int = 1,
|
||||
input_type: Optional[str] = None,
|
||||
output_type: Optional[Dict[str, str]] = None,
|
||||
send_original_frame: bool = False,
|
||||
) -> None:
|
||||
# when importing hailo it activates the driver
|
||||
# which leaves processes running even though it may not be used.
|
||||
try:
|
||||
from hailo_platform import (
|
||||
HEF,
|
||||
FormatType,
|
||||
HailoSchedulingAlgorithm,
|
||||
VDevice,
|
||||
)
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
self.input_store = input_store
|
||||
self.output_store = output_store
|
||||
|
||||
params = VDevice.create_params()
|
||||
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
|
||||
|
||||
self.hef = HEF(hef_path)
|
||||
self.target = VDevice(params)
|
||||
self.infer_model = self.target.create_infer_model(hef_path)
|
||||
self.infer_model.set_batch_size(batch_size)
|
||||
|
||||
if input_type is not None:
|
||||
self.infer_model.input().set_format_type(getattr(FormatType, input_type))
|
||||
|
||||
if output_type is not None:
|
||||
for output_name, output_type in output_type.items():
|
||||
self.infer_model.output(output_name).set_format_type(
|
||||
getattr(FormatType, output_type)
|
||||
)
|
||||
|
||||
self.output_type = output_type
|
||||
self.send_original_frame = send_original_frame
|
||||
|
||||
def callback(
|
||||
self,
|
||||
completion_info,
|
||||
bindings_list: List,
|
||||
input_batch: List,
|
||||
request_ids: List[int],
|
||||
):
|
||||
if completion_info.exception:
|
||||
logger.error(f"Inference error: {completion_info.exception}")
|
||||
else:
|
||||
for i, bindings in enumerate(bindings_list):
|
||||
if len(bindings._output_names) == 1:
|
||||
result = bindings.output().get_buffer()
|
||||
else:
|
||||
result = {
|
||||
name: np.expand_dims(bindings.output(name).get_buffer(), axis=0)
|
||||
for name in bindings._output_names
|
||||
}
|
||||
self.output_store.put(request_ids[i], (input_batch[i], result))
|
||||
|
||||
def _create_bindings(self, configured_infer_model) -> object:
|
||||
if self.output_type is None:
|
||||
output_buffers = {
|
||||
output_info.name: np.empty(
|
||||
self.infer_model.output(output_info.name).shape,
|
||||
dtype=getattr(
|
||||
np, str(output_info.format.type).split(".")[1].lower()
|
||||
),
|
||||
)
|
||||
for output_info in self.hef.get_output_vstream_infos()
|
||||
}
|
||||
else:
|
||||
output_buffers = {
|
||||
name: np.empty(
|
||||
self.infer_model.output(name).shape,
|
||||
dtype=getattr(np, self.output_type[name].lower()),
|
||||
)
|
||||
for name in self.output_type
|
||||
}
|
||||
return configured_infer_model.create_bindings(output_buffers=output_buffers)
|
||||
|
||||
def get_input_shape(self) -> Tuple[int, ...]:
|
||||
return self.hef.get_input_vstream_infos()[0].shape
|
||||
|
||||
def run(self) -> None:
|
||||
job = None
|
||||
with self.infer_model.configure() as configured_infer_model:
|
||||
while True:
|
||||
batch_data = self.input_store.get()
|
||||
|
||||
if batch_data is None:
|
||||
break
|
||||
|
||||
request_id, frame_data = batch_data
|
||||
preprocessed_batch = [frame_data]
|
||||
request_ids = [request_id]
|
||||
input_batch = preprocessed_batch # non-send_original_frame mode
|
||||
|
||||
bindings_list = []
|
||||
for frame in preprocessed_batch:
|
||||
bindings = self._create_bindings(configured_infer_model)
|
||||
bindings.input().set_buffer(np.array(frame))
|
||||
bindings_list.append(bindings)
|
||||
configured_infer_model.wait_for_async_ready(timeout_ms=10000)
|
||||
job = configured_infer_model.run_async(
|
||||
bindings_list,
|
||||
partial(
|
||||
self.callback,
|
||||
input_batch=input_batch,
|
||||
request_ids=request_ids,
|
||||
bindings_list=bindings_list,
|
||||
),
|
||||
)
|
||||
|
||||
if job is not None:
|
||||
job.wait(100)
|
||||
|
||||
|
||||
# ----------------- HailoDetector Class ----------------- #
|
||||
class HailoDetector(DetectionApi):
|
||||
type_key = DETECTOR_KEY
|
||||
|
||||
def __init__(self, detector_config: "HailoDetectorConfig"):
|
||||
global ARCH
|
||||
ARCH = detect_hailo_arch()
|
||||
self.cache_dir = MODEL_CACHE_DIR
|
||||
self.device_type = detector_config.device
|
||||
self.model_height = (
|
||||
detector_config.model.height
|
||||
if hasattr(detector_config.model, "height")
|
||||
else None
|
||||
)
|
||||
self.model_width = (
|
||||
detector_config.model.width
|
||||
if hasattr(detector_config.model, "width")
|
||||
else None
|
||||
)
|
||||
self.model_type = (
|
||||
detector_config.model.model_type
|
||||
if hasattr(detector_config.model, "model_type")
|
||||
else None
|
||||
)
|
||||
self.tensor_format = (
|
||||
detector_config.model.input_tensor
|
||||
if hasattr(detector_config.model, "input_tensor")
|
||||
else None
|
||||
)
|
||||
self.pixel_format = (
|
||||
detector_config.model.input_pixel_format
|
||||
if hasattr(detector_config.model, "input_pixel_format")
|
||||
else None
|
||||
)
|
||||
self.input_dtype = (
|
||||
detector_config.model.input_dtype
|
||||
if hasattr(detector_config.model, "input_dtype")
|
||||
else None
|
||||
)
|
||||
self.output_type = "FLOAT32"
|
||||
self.set_path_and_url(detector_config.model.path)
|
||||
self.working_model_path = self.check_and_prepare()
|
||||
|
||||
self.batch_size = 1
|
||||
self.input_store = RequestStore()
|
||||
self.response_store = ResponseStore()
|
||||
|
||||
try:
|
||||
logger.debug(f"[INIT] Loading HEF model from {self.working_model_path}")
|
||||
self.inference_engine = HailoAsyncInference(
|
||||
self.working_model_path,
|
||||
self.input_store,
|
||||
self.response_store,
|
||||
self.batch_size,
|
||||
)
|
||||
self.input_shape = self.inference_engine.get_input_shape()
|
||||
logger.debug(f"[INIT] Model input shape: {self.input_shape}")
|
||||
self.inference_thread = threading.Thread(
|
||||
target=self.inference_engine.run, daemon=True
|
||||
)
|
||||
self.inference_thread.start()
|
||||
except Exception as e:
|
||||
logger.error(f"[INIT] Failed to initialize HailoAsyncInference: {e}")
|
||||
raise
|
||||
|
||||
def set_path_and_url(self, path: str = None):
|
||||
if not path:
|
||||
self.model_path = None
|
||||
self.url = None
|
||||
return
|
||||
if self.is_url(path):
|
||||
self.url = path
|
||||
self.model_path = None
|
||||
else:
|
||||
self.model_path = path
|
||||
self.url = None
|
||||
|
||||
def is_url(self, url: str) -> bool:
|
||||
return (
|
||||
url.startswith("http://")
|
||||
or url.startswith("https://")
|
||||
or url.startswith("www.")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def extract_model_name(path: str = None, url: str = None) -> str:
|
||||
if path and path.endswith(".hef"):
|
||||
return os.path.basename(path)
|
||||
elif url and url.endswith(".hef"):
|
||||
return os.path.basename(url)
|
||||
else:
|
||||
return H10H_DEFAULT_MODEL
|
||||
|
||||
@staticmethod
|
||||
def download_model(url: str, destination: str):
|
||||
if not url.endswith(".hef"):
|
||||
raise ValueError("Invalid model URL. Only .hef files are supported.")
|
||||
try:
|
||||
urllib.request.urlretrieve(url, destination)
|
||||
logger.debug(f"Downloaded model to {destination}")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to download model from {url}: {str(e)}")
|
||||
|
||||
def check_and_prepare(self) -> str:
|
||||
if not os.path.exists(self.cache_dir):
|
||||
os.makedirs(self.cache_dir)
|
||||
model_name = self.extract_model_name(self.model_path, self.url)
|
||||
cached_model_path = os.path.join(self.cache_dir, model_name)
|
||||
if not self.model_path and not self.url:
|
||||
if os.path.exists(cached_model_path):
|
||||
logger.debug(f"Model found in cache: {cached_model_path}")
|
||||
return cached_model_path
|
||||
else:
|
||||
logger.debug(f"Downloading default model: {model_name}")
|
||||
self.download_model(H10H_DEFAULT_URL, cached_model_path)
|
||||
|
||||
elif self.url:
|
||||
logger.debug(f"Downloading model from URL: {self.url}")
|
||||
self.download_model(self.url, cached_model_path)
|
||||
elif self.model_path:
|
||||
if os.path.exists(self.model_path):
|
||||
logger.debug(f"Using existing model at: {self.model_path}")
|
||||
return self.model_path
|
||||
else:
|
||||
raise FileNotFoundError(f"Model file not found at: {self.model_path}")
|
||||
return cached_model_path
|
||||
|
||||
def detect_raw(self, tensor_input):
|
||||
tensor_input = self.preprocess(tensor_input)
|
||||
|
||||
if isinstance(tensor_input, np.ndarray) and len(tensor_input.shape) == 3:
|
||||
tensor_input = np.expand_dims(tensor_input, axis=0)
|
||||
|
||||
request_id = self.input_store.put(tensor_input)
|
||||
|
||||
try:
|
||||
_, infer_results = self.response_store.get(request_id, timeout=1.0)
|
||||
except TimeoutError:
|
||||
logger.error(
|
||||
f"Timeout waiting for inference results for request {request_id}"
|
||||
)
|
||||
|
||||
if not self.inference_thread.is_alive():
|
||||
raise RuntimeError(
|
||||
"HailoRT inference thread has stopped, restart required."
|
||||
)
|
||||
|
||||
return np.zeros((20, 6), dtype=np.float32)
|
||||
|
||||
if isinstance(infer_results, list) and len(infer_results) == 1:
|
||||
infer_results = infer_results[0]
|
||||
|
||||
threshold = 0.4
|
||||
all_detections = []
|
||||
for class_id, detection_set in enumerate(infer_results):
|
||||
if not isinstance(detection_set, np.ndarray) or detection_set.size == 0:
|
||||
continue
|
||||
for det in detection_set:
|
||||
if det.shape[0] < 5:
|
||||
continue
|
||||
score = float(det[4])
|
||||
if score < threshold:
|
||||
continue
|
||||
all_detections.append([class_id, score, det[0], det[1], det[2], det[3]])
|
||||
|
||||
if len(all_detections) == 0:
|
||||
detections_array = np.zeros((20, 6), dtype=np.float32)
|
||||
else:
|
||||
detections_array = np.array(all_detections, dtype=np.float32)
|
||||
if detections_array.shape[0] > 20:
|
||||
detections_array = detections_array[:20, :]
|
||||
elif detections_array.shape[0] < 20:
|
||||
pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32)
|
||||
detections_array = np.vstack((detections_array, pad))
|
||||
|
||||
return detections_array
|
||||
|
||||
def preprocess(self, image):
|
||||
if isinstance(image, np.ndarray):
|
||||
processed = preprocess_tensor(
|
||||
image, self.input_shape[1], self.input_shape[0]
|
||||
)
|
||||
return np.expand_dims(processed, axis=0)
|
||||
else:
|
||||
raise ValueError("Unsupported image format for preprocessing")
|
||||
|
||||
def close(self):
|
||||
"""Properly shuts down the inference engine and releases the VDevice."""
|
||||
logger.debug("[CLOSE] Closing HailoDetector")
|
||||
try:
|
||||
if hasattr(self, "inference_engine"):
|
||||
if hasattr(self.inference_engine, "target"):
|
||||
self.inference_engine.target.release()
|
||||
logger.debug("Hailo VDevice released successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to close Hailo device: {e}")
|
||||
raise
|
||||
|
||||
def __del__(self):
|
||||
"""Destructor to ensure cleanup when the object is deleted."""
|
||||
self.close()
|
||||
|
||||
|
||||
# ----------------- HailoDetectorConfig Class ----------------- #
|
||||
class HailoDetectorConfig(BaseDetectorConfig):
|
||||
"""Hailo10H detector using HEF models and the HailoRT SDK for inference on Hailo hardware."""
|
||||
|
||||
model_config = ConfigDict(
|
||||
title="Hailo-10H",
|
||||
)
|
||||
|
||||
type: Literal[DETECTOR_KEY]
|
||||
device: str = Field(
|
||||
default="PCIe",
|
||||
title="Device Type",
|
||||
description="The device to use for Hailo inference (e.g. 'PCIe', 'M.2').",
|
||||
)
|
||||
@ -123,6 +123,15 @@ def get_detector_temperature(
|
||||
if index < len(hailo_device_names):
|
||||
device_name = hailo_device_names[index]
|
||||
return hailo_temps[device_name]
|
||||
elif detector_type == "hailo10h":
|
||||
# Get temperatures for Hailo devices
|
||||
hailo_temps = get_hailo_temps()
|
||||
if hailo_temps:
|
||||
hailo_device_names = sorted(hailo_temps.keys())
|
||||
index = detector_index_by_type.get("hailo10h", 0)
|
||||
if index < len(hailo_device_names):
|
||||
device_name = hailo_device_names[index]
|
||||
return hailo_temps[device_name]
|
||||
elif detector_type == "rknn":
|
||||
# Rockchip temperatures are handled by the GPU / NPU stats
|
||||
# as there are not detector specific temperatures
|
||||
|
||||
376
testing-scripts/analyze_recording_keyframes.py
Normal file
376
testing-scripts/analyze_recording_keyframes.py
Normal file
@ -0,0 +1,376 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Analyze keyframe and timestamp structure of Frigate recording segments.
|
||||
|
||||
This is a diagnostic tool for investigating seek precision / GOP behavior on
|
||||
recorded segments. It does not modify anything.
|
||||
|
||||
ffprobe is only available inside the Frigate container, at
|
||||
/usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe
|
||||
This script auto-resolves that path from the DEFAULT_FFMPEG_VERSION env var
|
||||
(or falls back to scanning /usr/lib/ffmpeg/*/bin/ffprobe). Pass --ffprobe to
|
||||
override if needed.
|
||||
|
||||
All recording segments on the filesystem are in UTC. The --timestamp flag
|
||||
expects a UTC Unix timestamp.
|
||||
|
||||
Typical use:
|
||||
# Inside the Frigate container (or wherever recordings are mounted)
|
||||
python3 analyze_recording_keyframes.py <camera_name>
|
||||
|
||||
# Analyze 10 most recent segments
|
||||
python3 analyze_recording_keyframes.py <camera_name> --count 10
|
||||
|
||||
# Locate the segment that contains a specific UTC Unix timestamp and
|
||||
# show it plus surrounding segments
|
||||
python3 analyze_recording_keyframes.py <camera> --timestamp 1713471234.567
|
||||
|
||||
# Custom recordings directory
|
||||
python3 analyze_recording_keyframes.py <camera> --recordings-dir /media/frigate/recordings
|
||||
|
||||
# Override the ffprobe path explicitly
|
||||
python3 analyze_recording_keyframes.py <camera> --ffprobe /usr/lib/ffmpeg/7.0/bin/ffprobe
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from statistics import mean, median, stdev
|
||||
|
||||
|
||||
def resolve_ffprobe_path(override: str | None) -> str:
|
||||
"""Resolve the ffprobe binary path.
|
||||
|
||||
Inside the Frigate container, ffprobe lives at
|
||||
/usr/lib/ffmpeg/{DEFAULT_FFMPEG_VERSION}/bin/ffprobe — the exact version
|
||||
depends on the image build and is exposed as an env var.
|
||||
"""
|
||||
if override:
|
||||
return override
|
||||
version = os.environ.get("DEFAULT_FFMPEG_VERSION", "")
|
||||
if version:
|
||||
path = f"/usr/lib/ffmpeg/{version}/bin/ffprobe"
|
||||
if Path(path).is_file():
|
||||
return path
|
||||
# Fall back to scanning the Frigate ffmpeg install root.
|
||||
for candidate in sorted(Path("/usr/lib/ffmpeg").glob("*/bin/ffprobe")):
|
||||
if candidate.is_file():
|
||||
return str(candidate)
|
||||
print(
|
||||
"Could not locate ffprobe. Pass --ffprobe <path> or set "
|
||||
"DEFAULT_FFMPEG_VERSION.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def find_recent_segments(recordings_dir: Path, camera: str, count: int) -> list[Path]:
|
||||
"""Return the N most recent .mp4 segments for the given camera.
|
||||
|
||||
Expected layout: <recordings_dir>/<YYYY-MM-DD>/<HH>/<camera>/<MM>.<SS>.mp4
|
||||
"""
|
||||
pattern = f"*/*/{camera}/*.mp4"
|
||||
segments = sorted(recordings_dir.glob(pattern))
|
||||
return segments[-count:]
|
||||
|
||||
|
||||
def find_segments_near_timestamp(
|
||||
recordings_dir: Path, camera: str, target_ts: float, count: int
|
||||
) -> tuple[list[Path], Path | None]:
|
||||
"""Return `count` segments centered on the one containing `target_ts`.
|
||||
|
||||
Also returns the specific segment that should contain the timestamp, so
|
||||
callers can highlight it in output.
|
||||
"""
|
||||
pattern = f"*/*/{camera}/*.mp4"
|
||||
with_ts: list[tuple[float, Path]] = []
|
||||
for seg in sorted(recordings_dir.glob(pattern)):
|
||||
ts = filename_to_timestamp(seg)
|
||||
if ts is not None:
|
||||
with_ts.append((ts, seg))
|
||||
|
||||
if not with_ts:
|
||||
return [], None
|
||||
|
||||
# Largest filename_ts that is <= target_ts — that's the segment that
|
||||
# should contain the timestamp (Frigate catalogs segments by filename).
|
||||
target_idx = -1
|
||||
for i, (ts, _) in enumerate(with_ts):
|
||||
if ts <= target_ts:
|
||||
target_idx = i
|
||||
else:
|
||||
break
|
||||
|
||||
if target_idx < 0:
|
||||
# target_ts is before the earliest segment we have — just return the
|
||||
# first `count` segments so the user can see what's available.
|
||||
window = with_ts[:count]
|
||||
return [seg for _, seg in window], None
|
||||
|
||||
half = count // 2
|
||||
start = max(0, target_idx - half)
|
||||
end = min(len(with_ts), start + count)
|
||||
start = max(0, end - count)
|
||||
|
||||
window = with_ts[start:end]
|
||||
return [seg for _, seg in window], with_ts[target_idx][1]
|
||||
|
||||
|
||||
def filename_to_timestamp(segment: Path) -> float | None:
|
||||
"""Parse the wall-clock time from Frigate's segment path layout."""
|
||||
try:
|
||||
date = segment.parent.parent.parent.name # YYYY-MM-DD
|
||||
hour = segment.parent.parent.name # HH
|
||||
mm_ss = segment.stem # MM.SS
|
||||
minute, second = mm_ss.split(".")
|
||||
dt = datetime.datetime.strptime(
|
||||
f"{date} {hour}:{minute}:{second}",
|
||||
"%Y-%m-%d %H:%M:%S",
|
||||
).replace(tzinfo=datetime.timezone.utc)
|
||||
return dt.timestamp()
|
||||
except (ValueError, IndexError):
|
||||
return None
|
||||
|
||||
|
||||
def run_ffprobe(ffprobe: str, args: list[str]) -> dict:
|
||||
"""Run ffprobe and return parsed JSON, or empty dict on failure."""
|
||||
result = subprocess.run(
|
||||
[ffprobe, "-v", "error", *args, "-of", "json"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=False,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
print(f" ffprobe error: {result.stderr.strip()}", file=sys.stderr)
|
||||
return {}
|
||||
try:
|
||||
return json.loads(result.stdout)
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
|
||||
|
||||
def get_format_info(ffprobe: str, segment: Path) -> tuple[dict, dict]:
|
||||
"""Return (format_dict, stream_dict) for the first video stream."""
|
||||
data = run_ffprobe(
|
||||
ffprobe,
|
||||
[
|
||||
"-show_entries",
|
||||
"format=duration,start_time",
|
||||
"-show_entries",
|
||||
"stream=codec_name,profile,r_frame_rate,width,height",
|
||||
"-select_streams",
|
||||
"v:0",
|
||||
str(segment),
|
||||
],
|
||||
)
|
||||
fmt = data.get("format", {})
|
||||
streams = data.get("streams") or [{}]
|
||||
return fmt, streams[0]
|
||||
|
||||
|
||||
def get_video_packets(ffprobe: str, segment: Path) -> list[dict]:
|
||||
"""Return video packets with pts_time and flags."""
|
||||
data = run_ffprobe(
|
||||
ffprobe,
|
||||
[
|
||||
"-select_streams",
|
||||
"v",
|
||||
"-show_entries",
|
||||
"packet=pts_time,dts_time,flags",
|
||||
str(segment),
|
||||
],
|
||||
)
|
||||
return data.get("packets", [])
|
||||
|
||||
|
||||
def analyze(ffprobe: str, segment: Path, highlight: bool = False) -> None:
|
||||
marker = " <-- contains target timestamp" if highlight else ""
|
||||
print(f"\n=== {segment} ==={marker}")
|
||||
|
||||
fmt, stream = get_format_info(ffprobe, segment)
|
||||
duration = float(fmt.get("duration", 0) or 0)
|
||||
start_time = float(fmt.get("start_time", 0) or 0)
|
||||
codec = stream.get("codec_name", "?")
|
||||
profile = stream.get("profile", "?")
|
||||
width = stream.get("width", "?")
|
||||
height = stream.get("height", "?")
|
||||
fps = stream.get("r_frame_rate", "?/1")
|
||||
|
||||
filename_ts = filename_to_timestamp(segment)
|
||||
filename_iso = (
|
||||
datetime.datetime.fromtimestamp(
|
||||
filename_ts, tz=datetime.timezone.utc
|
||||
).isoformat()
|
||||
if filename_ts is not None
|
||||
else "?"
|
||||
)
|
||||
|
||||
print(f" Codec: {codec} ({profile}) {width}x{height} {fps}")
|
||||
print(f" Filename time: {filename_ts} ({filename_iso})")
|
||||
print(f" Format duration: {duration:.3f}s")
|
||||
print(f" Format start: {start_time:.3f}s (PTS offset of first packet)")
|
||||
|
||||
packets = get_video_packets(ffprobe, segment)
|
||||
if not packets:
|
||||
print(" (no video packets)")
|
||||
return
|
||||
|
||||
keyframe_times: list[float] = []
|
||||
first_pts: float | None = None
|
||||
last_pts: float | None = None
|
||||
|
||||
for pkt in packets:
|
||||
pts_str = pkt.get("pts_time")
|
||||
if pts_str is None or pts_str == "N/A":
|
||||
continue
|
||||
pts = float(pts_str)
|
||||
if first_pts is None:
|
||||
first_pts = pts
|
||||
last_pts = pts
|
||||
if "K" in pkt.get("flags", ""):
|
||||
keyframe_times.append(pts)
|
||||
|
||||
total_packets = len(packets)
|
||||
kf_count = len(keyframe_times)
|
||||
|
||||
print(f" Video packets: {total_packets}")
|
||||
print(f" Keyframes: {kf_count}")
|
||||
if first_pts is not None and last_pts is not None:
|
||||
print(
|
||||
f" Packet PTS: first={first_pts:.3f}s last={last_pts:.3f}s "
|
||||
f"span={last_pts - first_pts:.3f}s"
|
||||
)
|
||||
|
||||
if keyframe_times:
|
||||
print(
|
||||
f" Keyframe PTS: first={keyframe_times[0]:.3f}s "
|
||||
f"last={keyframe_times[-1]:.3f}s"
|
||||
)
|
||||
formatted = ", ".join(f"{t:.3f}" for t in keyframe_times)
|
||||
print(f" Keyframe times: [{formatted}]")
|
||||
|
||||
if len(keyframe_times) >= 2:
|
||||
gaps = [b - a for a, b in zip(keyframe_times, keyframe_times[1:])]
|
||||
avg_fps_estimate = (
|
||||
total_packets / (last_pts - first_pts)
|
||||
if last_pts and first_pts is not None and last_pts > first_pts
|
||||
else 0
|
||||
)
|
||||
print(
|
||||
f" GOP gaps (s): min={min(gaps):.3f} max={max(gaps):.3f} "
|
||||
f"mean={mean(gaps):.3f} median={median(gaps):.3f}"
|
||||
)
|
||||
if len(gaps) > 1:
|
||||
print(f" stdev={stdev(gaps):.3f}")
|
||||
print(
|
||||
f" Est. mean GOP: ~{mean(gaps) * avg_fps_estimate:.1f} frames"
|
||||
if avg_fps_estimate
|
||||
else ""
|
||||
)
|
||||
if max(gaps) > 5:
|
||||
print(
|
||||
" !! Max GOP > 5s — consistent with adaptive/smart codec "
|
||||
"(even if 'Smart Codec' is off in the UI, some cameras still "
|
||||
"produce irregular GOPs under specific encoder profiles)"
|
||||
)
|
||||
elif kf_count == 1:
|
||||
print(" !! Only one keyframe in segment — very long GOP")
|
||||
|
||||
# Report how well filename time aligns with first-packet PTS.
|
||||
# (Filename time is what Frigate uses as recording.start_time in the DB.)
|
||||
if filename_ts is not None and first_pts is not None:
|
||||
print(
|
||||
f" Notes: first packet PTS is {first_pts:.3f}s into the file; "
|
||||
f"Frigate treats filename time as PTS=0 for seek math."
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description=__doc__,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument("camera", help="Camera name (matches the recordings subfolder)")
|
||||
parser.add_argument(
|
||||
"--count",
|
||||
type=int,
|
||||
default=5,
|
||||
help="Number of most recent segments to analyze (default: 5)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--recordings-dir",
|
||||
default="/media/frigate/recordings",
|
||||
help="Path to the recordings directory (default: /media/frigate/recordings)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ffprobe",
|
||||
default=None,
|
||||
help=(
|
||||
"Full path to the ffprobe binary. Defaults to the Frigate-bundled "
|
||||
"binary at /usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timestamp",
|
||||
type=float,
|
||||
default=None,
|
||||
help=(
|
||||
"Unix timestamp (UTC seconds, decimals allowed) to locate. The "
|
||||
"script finds the segment that should contain this time and "
|
||||
"analyzes it plus surrounding segments (count controls the "
|
||||
"window). All on-disk segments are stored in UTC, so pass a UTC "
|
||||
"Unix timestamp."
|
||||
),
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
ffprobe = resolve_ffprobe_path(args.ffprobe)
|
||||
|
||||
recordings_dir = Path(args.recordings_dir)
|
||||
if not recordings_dir.is_dir():
|
||||
print(
|
||||
f"Recordings directory not found: {recordings_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
target_segment: Path | None = None
|
||||
if args.timestamp is not None:
|
||||
segments, target_segment = find_segments_near_timestamp(
|
||||
recordings_dir, args.camera, args.timestamp, args.count
|
||||
)
|
||||
target_iso = datetime.datetime.fromtimestamp(
|
||||
args.timestamp, tz=datetime.timezone.utc
|
||||
).isoformat()
|
||||
mode = f"around timestamp {args.timestamp} ({target_iso})"
|
||||
else:
|
||||
segments = find_recent_segments(recordings_dir, args.camera, args.count)
|
||||
mode = "most recent"
|
||||
|
||||
if not segments:
|
||||
print(
|
||||
f"No segments found for camera '{args.camera}' under {recordings_dir}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
if args.timestamp is not None and target_segment is None:
|
||||
print(
|
||||
f"!! Target timestamp {args.timestamp} is before the earliest "
|
||||
f"segment on disk; showing the earliest available segments instead.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
print(
|
||||
f"Analyzing {len(segments)} {mode} segment(s) for camera "
|
||||
f"'{args.camera}' under {recordings_dir} (ffprobe: {ffprobe})"
|
||||
)
|
||||
for segment in segments:
|
||||
analyze(ffprobe, segment, highlight=(segment == target_segment))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
783
testing-scripts/face_dataset.py
Normal file
783
testing-scripts/face_dataset.py
Normal file
@ -0,0 +1,783 @@
|
||||
"""
|
||||
Face recognition investigation script.
|
||||
|
||||
Standalone replica of Frigate's ArcFace pipeline (see
|
||||
frigate/data_processing/common/face/model.py and
|
||||
frigate/embeddings/onnx/face_embedding.py) for analyzing a face collection
|
||||
outside the running service. Useful for:
|
||||
|
||||
- Diagnosing why a person's collection produces false positives
|
||||
- Finding outlier/contaminating training images
|
||||
- Inspecting the effect of the shipped vector-wise outlier filter
|
||||
|
||||
Layout:
|
||||
- Core pipeline: LandmarkAligner, ArcFaceEmbedder, arcface_preprocess,
|
||||
similarity_to_confidence, blur_reduction — all mirroring the production
|
||||
code exactly
|
||||
- Default run: summarize positive and negative sets against a baseline
|
||||
trim_mean class representation
|
||||
- Optional diagnostics (flags): vector-outlier filter behavior, degenerate
|
||||
"tiny crop" embedding clustering, and multi-identity contamination
|
||||
|
||||
Usage:
|
||||
python3 face_investigate.py \\
|
||||
--positive <positive_folder> \\
|
||||
--negative <negative_folder> \\
|
||||
[--model-cache /path/to/model_cache] \\
|
||||
[--vector-outlier] [--degenerate] [--contamination]
|
||||
|
||||
The positive folder should contain training images for a single identity
|
||||
(same layout as FACE_DIR/<name>/*.webp). The negative folder should contain
|
||||
runtime crops to test against — a mix of true matches and misfires.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import onnxruntime as ort
|
||||
from PIL import Image
|
||||
from scipy import stats
|
||||
|
||||
ARCFACE_INPUT_SIZE = 112
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Replicated Frigate pipeline
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _process_image_frigate(image: np.ndarray) -> Image.Image:
|
||||
"""Mirror BaseEmbedding._process_image for an ndarray input.
|
||||
|
||||
NOTE: Frigate passes the output of `cv2.imread` (BGR) directly in. PIL's
|
||||
`Image.fromarray` does NOT reorder channels, so the embedder effectively
|
||||
receives a BGR-ordered tensor. We replicate that faithfully here. (Tested
|
||||
— swapping to RGB produces near-identical embeddings; this model is
|
||||
robust to channel order.)
|
||||
"""
|
||||
return Image.fromarray(image)
|
||||
|
||||
|
||||
def arcface_preprocess(image_bgr: np.ndarray) -> np.ndarray:
|
||||
"""Mirror ArcfaceEmbedding._preprocess_inputs."""
|
||||
pil = _process_image_frigate(image_bgr)
|
||||
|
||||
width, height = pil.size
|
||||
if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE:
|
||||
if width > height:
|
||||
new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4)
|
||||
pil = pil.resize((ARCFACE_INPUT_SIZE, new_height))
|
||||
else:
|
||||
new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4)
|
||||
pil = pil.resize((new_width, ARCFACE_INPUT_SIZE))
|
||||
|
||||
og = np.array(pil).astype(np.float32)
|
||||
og_h, og_w, channels = og.shape
|
||||
|
||||
frame = np.zeros(
|
||||
(ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32
|
||||
)
|
||||
x_center = (ARCFACE_INPUT_SIZE - og_w) // 2
|
||||
y_center = (ARCFACE_INPUT_SIZE - og_h) // 2
|
||||
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
|
||||
|
||||
frame = (frame / 127.5) - 1.0
|
||||
frame = np.transpose(frame, (2, 0, 1))
|
||||
frame = np.expand_dims(frame, axis=0)
|
||||
return frame
|
||||
|
||||
|
||||
class LandmarkAligner:
|
||||
"""Mirror FaceRecognizer.align_face."""
|
||||
|
||||
def __init__(self, landmark_model_path: str):
|
||||
if not os.path.exists(landmark_model_path):
|
||||
raise FileNotFoundError(landmark_model_path)
|
||||
self.detector = cv2.face.createFacemarkLBF()
|
||||
self.detector.loadModel(landmark_model_path)
|
||||
|
||||
def align(
|
||||
self, image: np.ndarray, out_w: int, out_h: int
|
||||
) -> tuple[np.ndarray, dict]:
|
||||
land_image = (
|
||||
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image
|
||||
)
|
||||
_, lands = self.detector.fit(
|
||||
land_image, np.array([(0, 0, land_image.shape[1], land_image.shape[0])])
|
||||
)
|
||||
landmarks = lands[0][0]
|
||||
|
||||
leftEyePts = landmarks[42:48]
|
||||
rightEyePts = landmarks[36:42]
|
||||
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
|
||||
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
|
||||
|
||||
dY = rightEyeCenter[1] - leftEyeCenter[1]
|
||||
dX = rightEyeCenter[0] - leftEyeCenter[0]
|
||||
angle = np.degrees(np.arctan2(dY, dX)) - 180
|
||||
dist = float(np.sqrt((dX**2) + (dY**2)))
|
||||
|
||||
desiredRightEyeX = 1.0 - 0.35
|
||||
desiredDist = (desiredRightEyeX - 0.35) * out_w
|
||||
scale = desiredDist / dist if dist > 0 else 1.0
|
||||
|
||||
eyesCenter = (
|
||||
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
|
||||
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
|
||||
)
|
||||
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
|
||||
tX = out_w * 0.5
|
||||
tY = out_h * 0.35
|
||||
M[0, 2] += tX - eyesCenter[0]
|
||||
M[1, 2] += tY - eyesCenter[1]
|
||||
|
||||
aligned = cv2.warpAffine(
|
||||
image, M, (out_w, out_h), flags=cv2.INTER_CUBIC
|
||||
)
|
||||
info = dict(
|
||||
angle=float(angle),
|
||||
eye_dist_px=dist,
|
||||
scale=float(scale),
|
||||
landmarks=landmarks,
|
||||
)
|
||||
return aligned, info
|
||||
|
||||
|
||||
class ArcFaceEmbedder:
|
||||
def __init__(self, model_path: str):
|
||||
self.session = ort.InferenceSession(
|
||||
model_path, providers=["CPUExecutionProvider"]
|
||||
)
|
||||
self.input_name = self.session.get_inputs()[0].name
|
||||
|
||||
def embed(self, image_bgr: np.ndarray) -> np.ndarray:
|
||||
tensor = arcface_preprocess(image_bgr)
|
||||
out = self.session.run(None, {self.input_name: tensor})[0]
|
||||
return out.squeeze()
|
||||
|
||||
|
||||
def similarity_to_confidence(
|
||||
cos_sim: float,
|
||||
median: float = 0.3,
|
||||
range_width: float = 0.6,
|
||||
slope_factor: float = 12,
|
||||
) -> float:
|
||||
slope = slope_factor / range_width
|
||||
return float(1.0 / (1.0 + np.exp(-slope * (cos_sim - median))))
|
||||
|
||||
|
||||
def laplacian_variance(image: np.ndarray) -> float:
|
||||
return float(cv2.Laplacian(image, cv2.CV_64F).var())
|
||||
|
||||
|
||||
def blur_reduction(variance: float) -> float:
|
||||
if variance < 120:
|
||||
return 0.06
|
||||
elif variance < 160:
|
||||
return 0.04
|
||||
elif variance < 200:
|
||||
return 0.02
|
||||
elif variance < 250:
|
||||
return 0.01
|
||||
return 0.0
|
||||
|
||||
|
||||
def cosine(a: np.ndarray, b: np.ndarray) -> float:
|
||||
denom = np.linalg.norm(a) * np.linalg.norm(b)
|
||||
if denom == 0:
|
||||
return 0.0
|
||||
return float(np.dot(a, b) / denom)
|
||||
|
||||
|
||||
def l2(v: np.ndarray) -> np.ndarray:
|
||||
return v / (np.linalg.norm(v) + 1e-9)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sample loading
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class FaceSample:
|
||||
path: str
|
||||
shape: tuple[int, int]
|
||||
embedding: np.ndarray
|
||||
blur_var: float
|
||||
align_info: dict
|
||||
|
||||
|
||||
def load_folder(
|
||||
folder: str, aligner: LandmarkAligner, embedder: ArcFaceEmbedder
|
||||
) -> list[FaceSample]:
|
||||
samples: list[FaceSample] = []
|
||||
names = sorted(os.listdir(folder))
|
||||
for name in names:
|
||||
if name.startswith("."):
|
||||
continue
|
||||
path = os.path.join(folder, name)
|
||||
if not os.path.isfile(path):
|
||||
continue
|
||||
img = cv2.imread(path)
|
||||
if img is None:
|
||||
print(f" [skip unreadable] {name}")
|
||||
continue
|
||||
aligned, info = aligner.align(img, img.shape[1], img.shape[0])
|
||||
emb = embedder.embed(aligned)
|
||||
samples.append(
|
||||
FaceSample(
|
||||
path=path,
|
||||
shape=(img.shape[1], img.shape[0]),
|
||||
embedding=emb,
|
||||
blur_var=laplacian_variance(img),
|
||||
align_info=info,
|
||||
)
|
||||
)
|
||||
return samples
|
||||
|
||||
|
||||
def trimmed_mean(embs: Iterable[np.ndarray], trim: float = 0.15) -> np.ndarray:
|
||||
arr = np.stack(list(embs), axis=0)
|
||||
return stats.trim_mean(arr, trim, axis=0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Baseline analyses (always run)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def summarize_positive(samples: list[FaceSample], mean_emb: np.ndarray) -> None:
|
||||
"""Summary of training set: per-sample cos to class mean, intra-class stats.
|
||||
|
||||
Outliers with cos far below the rest are likely degrading the mean —
|
||||
they'd be the first candidates the shipped vector-outlier filter drops.
|
||||
"""
|
||||
print("\n" + "=" * 78)
|
||||
print(f"POSITIVE SET ANALYSIS ({len(samples)} images)")
|
||||
print("=" * 78)
|
||||
|
||||
rows = []
|
||||
for s in samples:
|
||||
cs = cosine(s.embedding, mean_emb)
|
||||
conf = similarity_to_confidence(cs)
|
||||
red = blur_reduction(s.blur_var)
|
||||
rows.append(
|
||||
dict(
|
||||
name=os.path.basename(s.path),
|
||||
shape=f"{s.shape[0]}x{s.shape[1]}",
|
||||
eye_px=s.align_info["eye_dist_px"],
|
||||
angle=s.align_info["angle"] + 180,
|
||||
blur=s.blur_var,
|
||||
cos=cs,
|
||||
conf=conf,
|
||||
red=red,
|
||||
adj_conf=max(0.0, conf - red),
|
||||
)
|
||||
)
|
||||
|
||||
rows.sort(key=lambda r: r["cos"])
|
||||
sims = np.array([r["cos"] for r in rows])
|
||||
print(
|
||||
f"\nCosine-to-trimmed-mean: mean={sims.mean():.3f} std={sims.std():.3f} "
|
||||
f"min={sims.min():.3f} max={sims.max():.3f}"
|
||||
)
|
||||
|
||||
print("\n-- Worst matches (bottom 10, most likely hurting the mean) --")
|
||||
print(
|
||||
f"{'cos':>6} {'conf':>6} {'blur':>7} {'eyes':>6} "
|
||||
f"{'angle':>6} {'shape':>9} name"
|
||||
)
|
||||
for r in rows[:10]:
|
||||
print(
|
||||
f"{r['cos']:6.3f} {r['conf']:6.3f} {r['blur']:7.1f} "
|
||||
f"{r['eye_px']:6.1f} {r['angle']:6.1f} {r['shape']:>9} {r['name']}"
|
||||
)
|
||||
|
||||
print("\n-- Best matches (top 5) --")
|
||||
for r in rows[-5:][::-1]:
|
||||
print(
|
||||
f"{r['cos']:6.3f} {r['conf']:6.3f} {r['blur']:7.1f} "
|
||||
f"{r['eye_px']:6.1f} {r['angle']:6.1f} {r['shape']:>9} {r['name']}"
|
||||
)
|
||||
|
||||
# Pairwise analysis — flags embeddings poorly correlated with the rest
|
||||
print("\n-- Pairwise intra-class similarity (mean cos vs. other positives) --")
|
||||
embs = np.stack([s.embedding for s in samples], axis=0)
|
||||
norms = embs / (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9)
|
||||
sim_matrix = norms @ norms.T
|
||||
np.fill_diagonal(sim_matrix, np.nan)
|
||||
mean_pairwise = np.nanmean(sim_matrix, axis=1)
|
||||
names = [os.path.basename(s.path) for s in samples]
|
||||
ordered = sorted(zip(names, mean_pairwise), key=lambda t: t[1])
|
||||
print(f"{'mean_cos':>9} name")
|
||||
for nm, mp in ordered[:10]:
|
||||
print(f"{mp:9.3f} {nm}")
|
||||
print(f"\n overall mean pairwise cos: {np.nanmean(sim_matrix):.3f}")
|
||||
print(f" median pairwise cos: {np.nanmedian(sim_matrix):.3f}")
|
||||
|
||||
|
||||
def summarize_negative(
|
||||
neg_samples: list[FaceSample],
|
||||
mean_emb: np.ndarray,
|
||||
pos_samples: list[FaceSample],
|
||||
) -> None:
|
||||
"""Score each negative against the class mean, then show its top-3
|
||||
nearest positives. High-scoring negatives that match specific outlier
|
||||
positives hint at training-set contamination.
|
||||
"""
|
||||
print("\n" + "=" * 78)
|
||||
print(f"NEGATIVE SET ANALYSIS ({len(neg_samples)} images)")
|
||||
print("=" * 78)
|
||||
print(
|
||||
f"\n{'cos':>6} {'conf':>6} {'red':>5} {'adj':>5} "
|
||||
f"{'blur':>7} {'eyes':>6} {'shape':>9} name"
|
||||
)
|
||||
for s in neg_samples:
|
||||
cs = cosine(s.embedding, mean_emb)
|
||||
conf = similarity_to_confidence(cs)
|
||||
red = blur_reduction(s.blur_var)
|
||||
print(
|
||||
f"{cs:6.3f} {conf:6.3f} {red:5.2f} {max(0, conf - red):5.2f} "
|
||||
f"{s.blur_var:7.1f} {s.align_info['eye_dist_px']:6.1f} "
|
||||
f"{s.shape[0]}x{s.shape[1]:<5} {os.path.basename(s.path)}"
|
||||
)
|
||||
|
||||
print("\n-- For each negative, top-3 most similar positives --")
|
||||
pos_embs = np.stack([p.embedding for p in pos_samples])
|
||||
pos_norm = pos_embs / (np.linalg.norm(pos_embs, axis=1, keepdims=True) + 1e-9)
|
||||
for s in neg_samples:
|
||||
v = s.embedding / (np.linalg.norm(s.embedding) + 1e-9)
|
||||
sims = pos_norm @ v
|
||||
idx = np.argsort(-sims)[:3]
|
||||
print(f"\n {os.path.basename(s.path)}:")
|
||||
for i in idx:
|
||||
print(
|
||||
f" {sims[i]:6.3f} {os.path.basename(pos_samples[i].path)} "
|
||||
f"blur={pos_samples[i].blur_var:.1f} "
|
||||
f"eyes={pos_samples[i].align_info['eye_dist_px']:.1f}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Optional diagnostics
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def vector_outlier_test(
|
||||
pos: list[FaceSample], neg: list[FaceSample], base_trim: float = 0.15
|
||||
) -> None:
|
||||
"""Measure the shipped vector-wise outlier filter at various thresholds.
|
||||
|
||||
The production filter at `build_class_mean` in
|
||||
frigate/data_processing/common/face/model.py uses T=0.30. This test
|
||||
sweeps T so you can see which images would be dropped on a new collection
|
||||
and how that affects the negative scores.
|
||||
|
||||
Algorithm: iteratively recompute trim_mean on the kept set, drop any
|
||||
embedding with cos < T to that mean, repeat until converged. Floor at
|
||||
50% of the collection to avoid collapse.
|
||||
"""
|
||||
print("\n" + "=" * 78)
|
||||
print("VECTOR-WISE OUTLIER PRE-FILTER — layered on trim_mean(0.15)")
|
||||
print("=" * 78)
|
||||
|
||||
all_embs = np.stack([s.embedding for s in pos])
|
||||
|
||||
def iterative_mean(
|
||||
embs: np.ndarray,
|
||||
threshold: float,
|
||||
iters: int = 3,
|
||||
min_keep_frac: float = 0.5,
|
||||
) -> tuple[np.ndarray, np.ndarray]:
|
||||
keep = np.ones(len(embs), dtype=bool)
|
||||
floor = max(5, int(np.ceil(min_keep_frac * len(embs))))
|
||||
for _ in range(iters):
|
||||
m = stats.trim_mean(embs[keep], base_trim, axis=0)
|
||||
m_norm = m / (np.linalg.norm(m) + 1e-9)
|
||||
e_norms = embs / (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9)
|
||||
cos_to_mean = e_norms @ m_norm
|
||||
new_keep = cos_to_mean >= threshold
|
||||
if new_keep.sum() < floor:
|
||||
top_idx = np.argsort(-cos_to_mean)[:floor]
|
||||
new_keep = np.zeros_like(new_keep)
|
||||
new_keep[top_idx] = True
|
||||
if np.array_equal(new_keep, keep):
|
||||
break
|
||||
keep = new_keep
|
||||
final = stats.trim_mean(embs[keep], base_trim, axis=0)
|
||||
return final, keep
|
||||
|
||||
provisional = stats.trim_mean(all_embs, base_trim, axis=0)
|
||||
p_norm = provisional / (np.linalg.norm(provisional) + 1e-9)
|
||||
e_norms_all = all_embs / (np.linalg.norm(all_embs, axis=1, keepdims=True) + 1e-9)
|
||||
cos_to_prov = e_norms_all @ p_norm
|
||||
print("\nDistribution of cos(positive, provisional trim_mean):")
|
||||
print(
|
||||
f" min={cos_to_prov.min():.3f} p10={np.percentile(cos_to_prov, 10):.3f} "
|
||||
f"p25={np.percentile(cos_to_prov, 25):.3f} "
|
||||
f"median={np.median(cos_to_prov):.3f} "
|
||||
f"p75={np.percentile(cos_to_prov, 75):.3f} max={cos_to_prov.max():.3f}"
|
||||
)
|
||||
|
||||
baseline_mean = stats.trim_mean(all_embs, base_trim, axis=0)
|
||||
baseline_pos = np.array([cosine(p.embedding, baseline_mean) for p in pos])
|
||||
baseline_neg = (
|
||||
np.array([cosine(n.embedding, baseline_mean) for n in neg])
|
||||
if neg
|
||||
else np.array([])
|
||||
)
|
||||
baseline_conf_neg = np.array(
|
||||
[similarity_to_confidence(c) for c in baseline_neg]
|
||||
)
|
||||
|
||||
print(
|
||||
f"\nBaseline (trim_mean only, {len(pos)} images):"
|
||||
f"\n pos cos min={baseline_pos.min():.3f} "
|
||||
f"mean={baseline_pos.mean():.3f} max={baseline_pos.max():.3f}"
|
||||
)
|
||||
if len(neg):
|
||||
print(
|
||||
f" neg cos min={baseline_neg.min():.3f} "
|
||||
f"mean={baseline_neg.mean():.3f} max={baseline_neg.max():.3f}"
|
||||
)
|
||||
print(
|
||||
f" neg conf min={baseline_conf_neg.min():.3f} "
|
||||
f"mean={baseline_conf_neg.mean():.3f} max={baseline_conf_neg.max():.3f}"
|
||||
)
|
||||
print(
|
||||
f" margin (pos.min - neg.max): "
|
||||
f"{baseline_pos.min() - baseline_neg.max():+.3f}"
|
||||
)
|
||||
|
||||
print("\nIterative (refine mean → drop vectors with cos<T → repeat):")
|
||||
print(
|
||||
f"\n{'T':>5} {'kept':>6} {'pos min':>7} {'pos mean':>8} "
|
||||
f"{'neg max':>7} {'neg mean':>8} {'neg conf.max':>12} {'margin':>7}"
|
||||
)
|
||||
for T in [0.15, 0.20, 0.25, 0.28, 0.30, 0.33, 0.36, 0.40]:
|
||||
mean, keep = iterative_mean(all_embs, T)
|
||||
pos_sims = np.array([cosine(p.embedding, mean) for p in pos])
|
||||
neg_sims = (
|
||||
np.array([cosine(n.embedding, mean) for n in neg])
|
||||
if neg
|
||||
else np.array([])
|
||||
)
|
||||
neg_conf = np.array([similarity_to_confidence(c) for c in neg_sims])
|
||||
margin = pos_sims.min() - (neg_sims.max() if len(neg_sims) else 0)
|
||||
print(
|
||||
f"{T:5.2f} {int(keep.sum()):>3}/{len(pos):<2} "
|
||||
f"{pos_sims.min():7.3f} {pos_sims.mean():8.3f} "
|
||||
f"{neg_sims.max() if len(neg_sims) else float('nan'):7.3f} "
|
||||
f"{neg_sims.mean() if len(neg_sims) else float('nan'):8.3f} "
|
||||
f"{neg_conf.max() if len(neg_conf) else float('nan'):12.3f} "
|
||||
f"{margin:+7.3f}"
|
||||
)
|
||||
|
||||
# Show which images get dropped at the shipped threshold + neighbors
|
||||
for T_show in (0.25, 0.30, 0.33):
|
||||
_, keep = iterative_mean(all_embs, T_show)
|
||||
print(
|
||||
f"\nAt T={T_show}, the {int((~keep).sum())} dropped positives are:"
|
||||
)
|
||||
final_mean = stats.trim_mean(all_embs[keep], base_trim, axis=0)
|
||||
m_n = final_mean / (np.linalg.norm(final_mean) + 1e-9)
|
||||
for i, (p, k) in enumerate(zip(pos, keep)):
|
||||
if not k:
|
||||
e_n = p.embedding / (np.linalg.norm(p.embedding) + 1e-9)
|
||||
cos_final = float(e_n @ m_n)
|
||||
print(
|
||||
f" cos_to_clean_mean={cos_final:6.3f} "
|
||||
f"shape={p.shape[0]}x{p.shape[1]} "
|
||||
f"eyes={p.align_info['eye_dist_px']:6.1f} "
|
||||
f"blur={p.blur_var:7.1f} "
|
||||
f"{os.path.basename(p.path)}"
|
||||
)
|
||||
|
||||
|
||||
def degenerate_embedding_test(
|
||||
pos: list[FaceSample], neg: list[FaceSample]
|
||||
) -> None:
|
||||
"""Detect whether negatives and low-quality positives share a degenerate
|
||||
'tiny/noisy face' region of the embedding space.
|
||||
|
||||
Signal: if neg-to-neg cos is higher than pos-to-pos cos, the negatives
|
||||
aren't really per-identity embeddings — they're dominated by upsample /
|
||||
low-resolution artifacts that all map to a similar corner of embedding
|
||||
space regardless of who the face belongs to.
|
||||
|
||||
Also rebuilds the mean using only high-intra-similarity positives to
|
||||
show whether a cleaner training set separates the negatives.
|
||||
"""
|
||||
print("\n" + "=" * 78)
|
||||
print("DEGENERATE-EMBEDDING TEST")
|
||||
print("=" * 78)
|
||||
|
||||
pos_embs = np.stack([l2(s.embedding) for s in pos])
|
||||
neg_embs = np.stack([l2(s.embedding) for s in neg])
|
||||
|
||||
nn = neg_embs @ neg_embs.T
|
||||
np.fill_diagonal(nn, np.nan)
|
||||
pp = pos_embs @ pos_embs.T
|
||||
np.fill_diagonal(pp, np.nan)
|
||||
pn = pos_embs @ neg_embs.T
|
||||
|
||||
print(
|
||||
f"\n neg<->neg mean cos : {np.nanmean(nn):.3f} "
|
||||
f"(how tightly negatives cluster together)"
|
||||
)
|
||||
print(
|
||||
f" pos<->pos mean cos : {np.nanmean(pp):.3f} "
|
||||
f"(how tightly positives cluster)"
|
||||
)
|
||||
print(
|
||||
f" pos<->neg mean cos : {pn.mean():.3f} "
|
||||
f"(cross-class — should be low for a clean class)"
|
||||
)
|
||||
if np.nanmean(nn) > np.nanmean(pp):
|
||||
print(
|
||||
"\n >> neg<->neg > pos<->pos: negatives cluster more tightly than\n"
|
||||
" positives. This is the degenerate-embedding signature —\n"
|
||||
" upsampled tiny crops share a common 'face-like blob' region\n"
|
||||
" regardless of identity."
|
||||
)
|
||||
|
||||
mean_intra = np.nanmean(pp, axis=1)
|
||||
for thresh in (0.30, 0.33, 0.36):
|
||||
keep = mean_intra >= thresh
|
||||
if keep.sum() < 5:
|
||||
continue
|
||||
clean_embs = [pos[i].embedding for i in range(len(pos)) if keep[i]]
|
||||
clean_mean = stats.trim_mean(np.stack(clean_embs), 0.15, axis=0)
|
||||
neg_scores = np.array([cosine(n.embedding, clean_mean) for n in neg])
|
||||
neg_confs = np.array([similarity_to_confidence(c) for c in neg_scores])
|
||||
pos_scores = np.array(
|
||||
[
|
||||
cosine(pos[i].embedding, clean_mean)
|
||||
for i in range(len(pos))
|
||||
if keep[i]
|
||||
]
|
||||
)
|
||||
print(
|
||||
f"\n mean_intra >= {thresh}: keeping {int(keep.sum())}/{len(pos)} positives"
|
||||
)
|
||||
print(
|
||||
f" pos cos vs mean : min={pos_scores.min():.3f} "
|
||||
f"mean={pos_scores.mean():.3f} max={pos_scores.max():.3f}"
|
||||
)
|
||||
print(
|
||||
f" neg cos vs mean : min={neg_scores.min():.3f} "
|
||||
f"mean={neg_scores.mean():.3f} max={neg_scores.max():.3f}"
|
||||
)
|
||||
print(
|
||||
f" neg conf : min={neg_confs.min():.3f} "
|
||||
f"mean={neg_confs.mean():.3f} max={neg_confs.max():.3f}"
|
||||
)
|
||||
print(
|
||||
f" margin (pos.min - neg.max): "
|
||||
f"{pos_scores.min() - neg_scores.max():+.3f}"
|
||||
)
|
||||
|
||||
|
||||
def contamination_analysis(
|
||||
pos: list[FaceSample], neg: list[FaceSample]
|
||||
) -> None:
|
||||
"""Check whether the positive collection contains a second identity.
|
||||
|
||||
Two signals:
|
||||
(a) Per-positive: if an image is closer to at least one negative than
|
||||
to the rest of the positive class, it's likely a mislabeled face.
|
||||
(b) 2-means split of the positive embeddings: if one cluster center
|
||||
lands close to the negative mean, that cluster is a contaminating
|
||||
sub-identity that's pulling the class mean toward the negatives.
|
||||
"""
|
||||
print("\n" + "=" * 78)
|
||||
print("CONTAMINATION ANALYSIS")
|
||||
print("=" * 78)
|
||||
|
||||
pos_embs = np.stack([l2(s.embedding) for s in pos])
|
||||
neg_embs = np.stack([l2(s.embedding) for s in neg])
|
||||
pos_names = [os.path.basename(s.path) for s in pos]
|
||||
|
||||
pos_pos = pos_embs @ pos_embs.T
|
||||
np.fill_diagonal(pos_pos, np.nan)
|
||||
pos_neg = pos_embs @ neg_embs.T
|
||||
|
||||
mean_intra = np.nanmean(pos_pos, axis=1)
|
||||
max_to_neg = pos_neg.max(axis=1)
|
||||
mean_to_neg = pos_neg.mean(axis=1)
|
||||
|
||||
print(
|
||||
"\nPositives closer to a negative than to their own class avg"
|
||||
"\n(these are candidates for mislabeled images):"
|
||||
)
|
||||
print(
|
||||
f"\n{'max_neg':>7} {'mean_neg':>8} {'mean_intra':>10} "
|
||||
f"{'delta':>6} name"
|
||||
)
|
||||
rows = list(zip(pos_names, max_to_neg, mean_to_neg, mean_intra))
|
||||
rows.sort(key=lambda r: -(r[1] - r[3]))
|
||||
for nm, mxn, mnn, mi in rows[:15]:
|
||||
delta = mxn - mi
|
||||
marker = " <<" if delta > 0 else ""
|
||||
print(f"{mxn:7.3f} {mnn:8.3f} {mi:10.3f} {delta:6.3f} {nm}{marker}")
|
||||
|
||||
# 2-means in cosine space (no sklearn dependency).
|
||||
print("\n2-means split of positive embeddings (cosine space):")
|
||||
rng = np.random.default_rng(0)
|
||||
best = None
|
||||
for _ in range(5):
|
||||
idx = rng.choice(len(pos_embs), 2, replace=False)
|
||||
centers = pos_embs[idx].copy()
|
||||
for _ in range(50):
|
||||
sims = pos_embs @ centers.T
|
||||
labels = np.argmax(sims, axis=1)
|
||||
new_centers = np.stack(
|
||||
[
|
||||
l2(pos_embs[labels == k].mean(axis=0))
|
||||
if np.any(labels == k)
|
||||
else centers[k]
|
||||
for k in range(2)
|
||||
]
|
||||
)
|
||||
if np.allclose(new_centers, centers):
|
||||
break
|
||||
centers = new_centers
|
||||
tight = float(np.mean([sims[i, labels[i]] for i in range(len(labels))]))
|
||||
if best is None or tight > best[0]:
|
||||
best = (tight, labels.copy(), centers.copy())
|
||||
|
||||
_, labels, centers = best
|
||||
sizes = [int((labels == k).sum()) for k in range(2)]
|
||||
neg_mean = l2(neg_embs.mean(axis=0))
|
||||
print(
|
||||
f" cluster 0: size={sizes[0]:>2} "
|
||||
f"center<->other_center_cos={float(centers[0] @ centers[1]):.3f} "
|
||||
f"center<->neg_mean_cos={float(centers[0] @ neg_mean):.3f}"
|
||||
)
|
||||
print(
|
||||
f" cluster 1: size={sizes[1]:>2} "
|
||||
f"center<->neg_mean_cos={float(centers[1] @ neg_mean):.3f}"
|
||||
)
|
||||
|
||||
neg_aligned = 0 if centers[0] @ neg_mean > centers[1] @ neg_mean else 1
|
||||
print(
|
||||
f"\n cluster {neg_aligned} is more similar to the negatives — "
|
||||
f"its members are the contamination candidates:"
|
||||
)
|
||||
for i, lbl in enumerate(labels):
|
||||
if lbl == neg_aligned:
|
||||
print(
|
||||
f" max_to_neg={max_to_neg[i]:.3f} "
|
||||
f"mean_intra={mean_intra[i]:.3f} {pos_names[i]}"
|
||||
)
|
||||
|
||||
keep_mask = labels != neg_aligned
|
||||
if keep_mask.sum() >= 3:
|
||||
clean_embs = [pos[i].embedding for i in range(len(pos)) if keep_mask[i]]
|
||||
clean_mean = stats.trim_mean(np.stack(clean_embs), 0.15, axis=0)
|
||||
print(
|
||||
f"\n Rebuilding class mean from the OTHER cluster "
|
||||
f"({keep_mask.sum()} images):"
|
||||
)
|
||||
print(f" {'cos':>6} {'conf':>6} name")
|
||||
for n in neg:
|
||||
cs = cosine(n.embedding, clean_mean)
|
||||
cf = similarity_to_confidence(cs)
|
||||
print(f" {cs:6.3f} {cf:6.3f} {os.path.basename(n.path)}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(
|
||||
description="Analyze a face recognition collection outside Frigate.",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__,
|
||||
)
|
||||
ap.add_argument("--positive", required=True, help="Training folder for one identity")
|
||||
ap.add_argument(
|
||||
"--negative",
|
||||
default=None,
|
||||
help="Runtime-crop folder to score against (optional)",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--model-cache",
|
||||
default="/config/model_cache",
|
||||
help="Directory containing facedet/arcface.onnx and facedet/landmarkdet.yaml",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--trim",
|
||||
type=float,
|
||||
default=0.15,
|
||||
help="trim_mean proportion (Frigate uses 0.15)",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--vector-outlier",
|
||||
action="store_true",
|
||||
help="Sweep the vector-wise outlier filter threshold",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--degenerate",
|
||||
action="store_true",
|
||||
help="Test whether negatives share a degenerate embedding region",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--contamination",
|
||||
action="store_true",
|
||||
help="Check whether the positive folder contains a second identity",
|
||||
)
|
||||
args = ap.parse_args()
|
||||
|
||||
arcface_path = os.path.join(args.model_cache, "facedet", "arcface.onnx")
|
||||
landmark_path = os.path.join(args.model_cache, "facedet", "landmarkdet.yaml")
|
||||
for p in (arcface_path, landmark_path):
|
||||
if not os.path.exists(p):
|
||||
print(f"ERROR: model file not found: {p}")
|
||||
return 1
|
||||
|
||||
print(f"Loading ArcFace from {arcface_path}")
|
||||
embedder = ArcFaceEmbedder(arcface_path)
|
||||
print(f"Loading landmark model from {landmark_path}")
|
||||
aligner = LandmarkAligner(landmark_path)
|
||||
|
||||
print(f"\nLoading positives from {args.positive} ...")
|
||||
pos = load_folder(args.positive, aligner, embedder)
|
||||
print(f" {len(pos)} positives loaded")
|
||||
|
||||
neg: list[FaceSample] = []
|
||||
if args.negative:
|
||||
print(f"\nLoading negatives from {args.negative} ...")
|
||||
neg = load_folder(args.negative, aligner, embedder)
|
||||
print(f" {len(neg)} negatives loaded")
|
||||
|
||||
if not pos:
|
||||
print("no positive samples — aborting")
|
||||
return 1
|
||||
|
||||
mean_emb = trimmed_mean([s.embedding for s in pos], trim=args.trim)
|
||||
summarize_positive(pos, mean_emb)
|
||||
if neg:
|
||||
summarize_negative(neg, mean_emb, pos)
|
||||
|
||||
if args.vector_outlier:
|
||||
vector_outlier_test(pos, neg, args.trim)
|
||||
if args.degenerate and neg:
|
||||
degenerate_embedding_test(pos, neg)
|
||||
if args.contamination and neg:
|
||||
contamination_analysis(pos, neg)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@ -397,6 +397,14 @@
|
||||
"description": "The device to use for Hailo inference (e.g. 'PCIe', 'M.2')."
|
||||
}
|
||||
},
|
||||
"hailo10h": {
|
||||
"label": "Hailo-10H",
|
||||
"description": "Hailo-10H detector using HEF models and the HailoRT SDK for inference on Hailo hardware.",
|
||||
"device": {
|
||||
"label": "Device Type",
|
||||
"description": "The device to use for Hailo inference (e.g. 'PCIe', 'M.2')."
|
||||
}
|
||||
},
|
||||
"memryx": {
|
||||
"label": "MemryX",
|
||||
"description": "MemryX MX3 detector that runs compiled DFP models on MemryX accelerators.",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user