Compare commits

..

3 Commits

Author SHA1 Message Date
mathieu-d
6dc81a6a0c
Merge 0f3dd097ec into 434ef358a2 2026-04-24 20:12:23 +02:00
Josh Hawkins
434ef358a2
add analyze keyframes testing script (#22994)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
2026-04-24 11:42:30 -05:00
Nicolas Mowen
fe269b77b8
Optimize face recognition (#22993)
* Improve mean generation for faces to remove outlier embeddings

* Create testing scripts folder

* Fix mypy
2026-04-24 11:14:28 -05:00
6 changed files with 1216 additions and 2 deletions

View File

@ -133,6 +133,61 @@ class FaceRecognizer(ABC):
return 0.0
def build_class_mean(
embs: list[np.ndarray],
trim: float = 0.15,
outlier_threshold: float = 0.30,
min_keep_frac: float = 0.7,
max_iters: int = 3,
) -> np.ndarray:
"""Build a class-mean embedding with two-layer outlier protection.
Layer 1 (iterative, vector-wise): drop whole embeddings whose cosine
similarity to the current class mean is below ``outlier_threshold``.
Catches mislabeled or corrupted training samples (wrong face in the
folder, full-frame screenshots, extreme crops) that per-dimension
trimming cannot detect.
Layer 2 (per-dimension): ``scipy.stats.trim_mean`` on the retained set
to smooth per-component noise (lighting, expression, alignment jitter).
Collections with fewer than 5 images bypass outlier rejection too few
samples to establish a reliable class center.
"""
arr = np.stack(embs, axis=0)
if len(arr) < 5:
return np.asarray(stats.trim_mean(arr, trim, axis=0))
keep = np.ones(len(arr), dtype=bool)
floor = max(5, int(np.ceil(min_keep_frac * len(arr))))
for _ in range(max_iters):
mean = stats.trim_mean(arr[keep], trim, axis=0)
m_norm = mean / (np.linalg.norm(mean) + 1e-9)
e_norms = arr / (np.linalg.norm(arr, axis=1, keepdims=True) + 1e-9)
cos = e_norms @ m_norm
new_keep = cos >= outlier_threshold
if new_keep.sum() < floor:
top = np.argsort(-cos)[:floor]
new_keep = np.zeros(len(arr), dtype=bool)
new_keep[top] = True
if np.array_equal(new_keep, keep):
break
keep = new_keep
dropped = int((~keep).sum())
if dropped:
logger.debug(
f"Vector-wise outlier filter dropped {dropped}/{len(arr)} embeddings"
)
return np.asarray(stats.trim_mean(arr[keep], trim, axis=0))
def similarity_to_confidence(
cosine_similarity: float,
median: float = 0.3,
@ -229,7 +284,7 @@ class FaceNetRecognizer(FaceRecognizer):
for name, embs in face_embeddings_map.items():
if embs:
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
self.mean_embs[name] = build_class_mean(embs)
logger.debug("Finished building ArcFace model")
@ -340,7 +395,7 @@ class ArcFaceRecognizer(FaceRecognizer):
for name, embs in face_embeddings_map.items():
if embs:
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
self.mean_embs[name] = build_class_mean(embs)
logger.debug("Finished building ArcFace model")

View File

@ -0,0 +1,376 @@
#!/usr/bin/env python3
"""Analyze keyframe and timestamp structure of Frigate recording segments.
This is a diagnostic tool for investigating seek precision / GOP behavior on
recorded segments. It does not modify anything.
ffprobe is only available inside the Frigate container, at
/usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe
This script auto-resolves that path from the DEFAULT_FFMPEG_VERSION env var
(or falls back to scanning /usr/lib/ffmpeg/*/bin/ffprobe). Pass --ffprobe to
override if needed.
All recording segments on the filesystem are in UTC. The --timestamp flag
expects a UTC Unix timestamp.
Typical use:
# Inside the Frigate container (or wherever recordings are mounted)
python3 analyze_recording_keyframes.py <camera_name>
# Analyze 10 most recent segments
python3 analyze_recording_keyframes.py <camera_name> --count 10
# Locate the segment that contains a specific UTC Unix timestamp and
# show it plus surrounding segments
python3 analyze_recording_keyframes.py <camera> --timestamp 1713471234.567
# Custom recordings directory
python3 analyze_recording_keyframes.py <camera> --recordings-dir /media/frigate/recordings
# Override the ffprobe path explicitly
python3 analyze_recording_keyframes.py <camera> --ffprobe /usr/lib/ffmpeg/7.0/bin/ffprobe
"""
import argparse
import datetime
import json
import os
import subprocess
import sys
from pathlib import Path
from statistics import mean, median, stdev
def resolve_ffprobe_path(override: str | None) -> str:
"""Resolve the ffprobe binary path.
Inside the Frigate container, ffprobe lives at
/usr/lib/ffmpeg/{DEFAULT_FFMPEG_VERSION}/bin/ffprobe the exact version
depends on the image build and is exposed as an env var.
"""
if override:
return override
version = os.environ.get("DEFAULT_FFMPEG_VERSION", "")
if version:
path = f"/usr/lib/ffmpeg/{version}/bin/ffprobe"
if Path(path).is_file():
return path
# Fall back to scanning the Frigate ffmpeg install root.
for candidate in sorted(Path("/usr/lib/ffmpeg").glob("*/bin/ffprobe")):
if candidate.is_file():
return str(candidate)
print(
"Could not locate ffprobe. Pass --ffprobe <path> or set "
"DEFAULT_FFMPEG_VERSION.",
file=sys.stderr,
)
sys.exit(1)
def find_recent_segments(recordings_dir: Path, camera: str, count: int) -> list[Path]:
"""Return the N most recent .mp4 segments for the given camera.
Expected layout: <recordings_dir>/<YYYY-MM-DD>/<HH>/<camera>/<MM>.<SS>.mp4
"""
pattern = f"*/*/{camera}/*.mp4"
segments = sorted(recordings_dir.glob(pattern))
return segments[-count:]
def find_segments_near_timestamp(
recordings_dir: Path, camera: str, target_ts: float, count: int
) -> tuple[list[Path], Path | None]:
"""Return `count` segments centered on the one containing `target_ts`.
Also returns the specific segment that should contain the timestamp, so
callers can highlight it in output.
"""
pattern = f"*/*/{camera}/*.mp4"
with_ts: list[tuple[float, Path]] = []
for seg in sorted(recordings_dir.glob(pattern)):
ts = filename_to_timestamp(seg)
if ts is not None:
with_ts.append((ts, seg))
if not with_ts:
return [], None
# Largest filename_ts that is <= target_ts — that's the segment that
# should contain the timestamp (Frigate catalogs segments by filename).
target_idx = -1
for i, (ts, _) in enumerate(with_ts):
if ts <= target_ts:
target_idx = i
else:
break
if target_idx < 0:
# target_ts is before the earliest segment we have — just return the
# first `count` segments so the user can see what's available.
window = with_ts[:count]
return [seg for _, seg in window], None
half = count // 2
start = max(0, target_idx - half)
end = min(len(with_ts), start + count)
start = max(0, end - count)
window = with_ts[start:end]
return [seg for _, seg in window], with_ts[target_idx][1]
def filename_to_timestamp(segment: Path) -> float | None:
"""Parse the wall-clock time from Frigate's segment path layout."""
try:
date = segment.parent.parent.parent.name # YYYY-MM-DD
hour = segment.parent.parent.name # HH
mm_ss = segment.stem # MM.SS
minute, second = mm_ss.split(".")
dt = datetime.datetime.strptime(
f"{date} {hour}:{minute}:{second}",
"%Y-%m-%d %H:%M:%S",
).replace(tzinfo=datetime.timezone.utc)
return dt.timestamp()
except (ValueError, IndexError):
return None
def run_ffprobe(ffprobe: str, args: list[str]) -> dict:
"""Run ffprobe and return parsed JSON, or empty dict on failure."""
result = subprocess.run(
[ffprobe, "-v", "error", *args, "-of", "json"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
print(f" ffprobe error: {result.stderr.strip()}", file=sys.stderr)
return {}
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {}
def get_format_info(ffprobe: str, segment: Path) -> tuple[dict, dict]:
"""Return (format_dict, stream_dict) for the first video stream."""
data = run_ffprobe(
ffprobe,
[
"-show_entries",
"format=duration,start_time",
"-show_entries",
"stream=codec_name,profile,r_frame_rate,width,height",
"-select_streams",
"v:0",
str(segment),
],
)
fmt = data.get("format", {})
streams = data.get("streams") or [{}]
return fmt, streams[0]
def get_video_packets(ffprobe: str, segment: Path) -> list[dict]:
"""Return video packets with pts_time and flags."""
data = run_ffprobe(
ffprobe,
[
"-select_streams",
"v",
"-show_entries",
"packet=pts_time,dts_time,flags",
str(segment),
],
)
return data.get("packets", [])
def analyze(ffprobe: str, segment: Path, highlight: bool = False) -> None:
marker = " <-- contains target timestamp" if highlight else ""
print(f"\n=== {segment} ==={marker}")
fmt, stream = get_format_info(ffprobe, segment)
duration = float(fmt.get("duration", 0) or 0)
start_time = float(fmt.get("start_time", 0) or 0)
codec = stream.get("codec_name", "?")
profile = stream.get("profile", "?")
width = stream.get("width", "?")
height = stream.get("height", "?")
fps = stream.get("r_frame_rate", "?/1")
filename_ts = filename_to_timestamp(segment)
filename_iso = (
datetime.datetime.fromtimestamp(
filename_ts, tz=datetime.timezone.utc
).isoformat()
if filename_ts is not None
else "?"
)
print(f" Codec: {codec} ({profile}) {width}x{height} {fps}")
print(f" Filename time: {filename_ts} ({filename_iso})")
print(f" Format duration: {duration:.3f}s")
print(f" Format start: {start_time:.3f}s (PTS offset of first packet)")
packets = get_video_packets(ffprobe, segment)
if not packets:
print(" (no video packets)")
return
keyframe_times: list[float] = []
first_pts: float | None = None
last_pts: float | None = None
for pkt in packets:
pts_str = pkt.get("pts_time")
if pts_str is None or pts_str == "N/A":
continue
pts = float(pts_str)
if first_pts is None:
first_pts = pts
last_pts = pts
if "K" in pkt.get("flags", ""):
keyframe_times.append(pts)
total_packets = len(packets)
kf_count = len(keyframe_times)
print(f" Video packets: {total_packets}")
print(f" Keyframes: {kf_count}")
if first_pts is not None and last_pts is not None:
print(
f" Packet PTS: first={first_pts:.3f}s last={last_pts:.3f}s "
f"span={last_pts - first_pts:.3f}s"
)
if keyframe_times:
print(
f" Keyframe PTS: first={keyframe_times[0]:.3f}s "
f"last={keyframe_times[-1]:.3f}s"
)
formatted = ", ".join(f"{t:.3f}" for t in keyframe_times)
print(f" Keyframe times: [{formatted}]")
if len(keyframe_times) >= 2:
gaps = [b - a for a, b in zip(keyframe_times, keyframe_times[1:])]
avg_fps_estimate = (
total_packets / (last_pts - first_pts)
if last_pts and first_pts is not None and last_pts > first_pts
else 0
)
print(
f" GOP gaps (s): min={min(gaps):.3f} max={max(gaps):.3f} "
f"mean={mean(gaps):.3f} median={median(gaps):.3f}"
)
if len(gaps) > 1:
print(f" stdev={stdev(gaps):.3f}")
print(
f" Est. mean GOP: ~{mean(gaps) * avg_fps_estimate:.1f} frames"
if avg_fps_estimate
else ""
)
if max(gaps) > 5:
print(
" !! Max GOP > 5s — consistent with adaptive/smart codec "
"(even if 'Smart Codec' is off in the UI, some cameras still "
"produce irregular GOPs under specific encoder profiles)"
)
elif kf_count == 1:
print(" !! Only one keyframe in segment — very long GOP")
# Report how well filename time aligns with first-packet PTS.
# (Filename time is what Frigate uses as recording.start_time in the DB.)
if filename_ts is not None and first_pts is not None:
print(
f" Notes: first packet PTS is {first_pts:.3f}s into the file; "
f"Frigate treats filename time as PTS=0 for seek math."
)
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("camera", help="Camera name (matches the recordings subfolder)")
parser.add_argument(
"--count",
type=int,
default=5,
help="Number of most recent segments to analyze (default: 5)",
)
parser.add_argument(
"--recordings-dir",
default="/media/frigate/recordings",
help="Path to the recordings directory (default: /media/frigate/recordings)",
)
parser.add_argument(
"--ffprobe",
default=None,
help=(
"Full path to the ffprobe binary. Defaults to the Frigate-bundled "
"binary at /usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe."
),
)
parser.add_argument(
"--timestamp",
type=float,
default=None,
help=(
"Unix timestamp (UTC seconds, decimals allowed) to locate. The "
"script finds the segment that should contain this time and "
"analyzes it plus surrounding segments (count controls the "
"window). All on-disk segments are stored in UTC, so pass a UTC "
"Unix timestamp."
),
)
args = parser.parse_args()
ffprobe = resolve_ffprobe_path(args.ffprobe)
recordings_dir = Path(args.recordings_dir)
if not recordings_dir.is_dir():
print(
f"Recordings directory not found: {recordings_dir}",
file=sys.stderr,
)
sys.exit(1)
target_segment: Path | None = None
if args.timestamp is not None:
segments, target_segment = find_segments_near_timestamp(
recordings_dir, args.camera, args.timestamp, args.count
)
target_iso = datetime.datetime.fromtimestamp(
args.timestamp, tz=datetime.timezone.utc
).isoformat()
mode = f"around timestamp {args.timestamp} ({target_iso})"
else:
segments = find_recent_segments(recordings_dir, args.camera, args.count)
mode = "most recent"
if not segments:
print(
f"No segments found for camera '{args.camera}' under {recordings_dir}",
file=sys.stderr,
)
sys.exit(1)
if args.timestamp is not None and target_segment is None:
print(
f"!! Target timestamp {args.timestamp} is before the earliest "
f"segment on disk; showing the earliest available segments instead.",
file=sys.stderr,
)
print(
f"Analyzing {len(segments)} {mode} segment(s) for camera "
f"'{args.camera}' under {recordings_dir} (ffprobe: {ffprobe})"
)
for segment in segments:
analyze(ffprobe, segment, highlight=(segment == target_segment))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,783 @@
"""
Face recognition investigation script.
Standalone replica of Frigate's ArcFace pipeline (see
frigate/data_processing/common/face/model.py and
frigate/embeddings/onnx/face_embedding.py) for analyzing a face collection
outside the running service. Useful for:
- Diagnosing why a person's collection produces false positives
- Finding outlier/contaminating training images
- Inspecting the effect of the shipped vector-wise outlier filter
Layout:
- Core pipeline: LandmarkAligner, ArcFaceEmbedder, arcface_preprocess,
similarity_to_confidence, blur_reduction all mirroring the production
code exactly
- Default run: summarize positive and negative sets against a baseline
trim_mean class representation
- Optional diagnostics (flags): vector-outlier filter behavior, degenerate
"tiny crop" embedding clustering, and multi-identity contamination
Usage:
python3 face_investigate.py \\
--positive <positive_folder> \\
--negative <negative_folder> \\
[--model-cache /path/to/model_cache] \\
[--vector-outlier] [--degenerate] [--contamination]
The positive folder should contain training images for a single identity
(same layout as FACE_DIR/<name>/*.webp). The negative folder should contain
runtime crops to test against a mix of true matches and misfires.
"""
from __future__ import annotations
import argparse
import os
import sys
from dataclasses import dataclass
from typing import Iterable
import cv2
import numpy as np
import onnxruntime as ort
from PIL import Image
from scipy import stats
ARCFACE_INPUT_SIZE = 112
# ---------------------------------------------------------------------------
# Replicated Frigate pipeline
# ---------------------------------------------------------------------------
def _process_image_frigate(image: np.ndarray) -> Image.Image:
"""Mirror BaseEmbedding._process_image for an ndarray input.
NOTE: Frigate passes the output of `cv2.imread` (BGR) directly in. PIL's
`Image.fromarray` does NOT reorder channels, so the embedder effectively
receives a BGR-ordered tensor. We replicate that faithfully here. (Tested
swapping to RGB produces near-identical embeddings; this model is
robust to channel order.)
"""
return Image.fromarray(image)
def arcface_preprocess(image_bgr: np.ndarray) -> np.ndarray:
"""Mirror ArcfaceEmbedding._preprocess_inputs."""
pil = _process_image_frigate(image_bgr)
width, height = pil.size
if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE:
if width > height:
new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4)
pil = pil.resize((ARCFACE_INPUT_SIZE, new_height))
else:
new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4)
pil = pil.resize((new_width, ARCFACE_INPUT_SIZE))
og = np.array(pil).astype(np.float32)
og_h, og_w, channels = og.shape
frame = np.zeros(
(ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32
)
x_center = (ARCFACE_INPUT_SIZE - og_w) // 2
y_center = (ARCFACE_INPUT_SIZE - og_h) // 2
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
frame = (frame / 127.5) - 1.0
frame = np.transpose(frame, (2, 0, 1))
frame = np.expand_dims(frame, axis=0)
return frame
class LandmarkAligner:
"""Mirror FaceRecognizer.align_face."""
def __init__(self, landmark_model_path: str):
if not os.path.exists(landmark_model_path):
raise FileNotFoundError(landmark_model_path)
self.detector = cv2.face.createFacemarkLBF()
self.detector.loadModel(landmark_model_path)
def align(
self, image: np.ndarray, out_w: int, out_h: int
) -> tuple[np.ndarray, dict]:
land_image = (
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image
)
_, lands = self.detector.fit(
land_image, np.array([(0, 0, land_image.shape[1], land_image.shape[0])])
)
landmarks = lands[0][0]
leftEyePts = landmarks[42:48]
rightEyePts = landmarks[36:42]
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
dY = rightEyeCenter[1] - leftEyeCenter[1]
dX = rightEyeCenter[0] - leftEyeCenter[0]
angle = np.degrees(np.arctan2(dY, dX)) - 180
dist = float(np.sqrt((dX**2) + (dY**2)))
desiredRightEyeX = 1.0 - 0.35
desiredDist = (desiredRightEyeX - 0.35) * out_w
scale = desiredDist / dist if dist > 0 else 1.0
eyesCenter = (
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
)
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
tX = out_w * 0.5
tY = out_h * 0.35
M[0, 2] += tX - eyesCenter[0]
M[1, 2] += tY - eyesCenter[1]
aligned = cv2.warpAffine(
image, M, (out_w, out_h), flags=cv2.INTER_CUBIC
)
info = dict(
angle=float(angle),
eye_dist_px=dist,
scale=float(scale),
landmarks=landmarks,
)
return aligned, info
class ArcFaceEmbedder:
def __init__(self, model_path: str):
self.session = ort.InferenceSession(
model_path, providers=["CPUExecutionProvider"]
)
self.input_name = self.session.get_inputs()[0].name
def embed(self, image_bgr: np.ndarray) -> np.ndarray:
tensor = arcface_preprocess(image_bgr)
out = self.session.run(None, {self.input_name: tensor})[0]
return out.squeeze()
def similarity_to_confidence(
cos_sim: float,
median: float = 0.3,
range_width: float = 0.6,
slope_factor: float = 12,
) -> float:
slope = slope_factor / range_width
return float(1.0 / (1.0 + np.exp(-slope * (cos_sim - median))))
def laplacian_variance(image: np.ndarray) -> float:
return float(cv2.Laplacian(image, cv2.CV_64F).var())
def blur_reduction(variance: float) -> float:
if variance < 120:
return 0.06
elif variance < 160:
return 0.04
elif variance < 200:
return 0.02
elif variance < 250:
return 0.01
return 0.0
def cosine(a: np.ndarray, b: np.ndarray) -> float:
denom = np.linalg.norm(a) * np.linalg.norm(b)
if denom == 0:
return 0.0
return float(np.dot(a, b) / denom)
def l2(v: np.ndarray) -> np.ndarray:
return v / (np.linalg.norm(v) + 1e-9)
# ---------------------------------------------------------------------------
# Sample loading
# ---------------------------------------------------------------------------
@dataclass
class FaceSample:
path: str
shape: tuple[int, int]
embedding: np.ndarray
blur_var: float
align_info: dict
def load_folder(
folder: str, aligner: LandmarkAligner, embedder: ArcFaceEmbedder
) -> list[FaceSample]:
samples: list[FaceSample] = []
names = sorted(os.listdir(folder))
for name in names:
if name.startswith("."):
continue
path = os.path.join(folder, name)
if not os.path.isfile(path):
continue
img = cv2.imread(path)
if img is None:
print(f" [skip unreadable] {name}")
continue
aligned, info = aligner.align(img, img.shape[1], img.shape[0])
emb = embedder.embed(aligned)
samples.append(
FaceSample(
path=path,
shape=(img.shape[1], img.shape[0]),
embedding=emb,
blur_var=laplacian_variance(img),
align_info=info,
)
)
return samples
def trimmed_mean(embs: Iterable[np.ndarray], trim: float = 0.15) -> np.ndarray:
arr = np.stack(list(embs), axis=0)
return stats.trim_mean(arr, trim, axis=0)
# ---------------------------------------------------------------------------
# Baseline analyses (always run)
# ---------------------------------------------------------------------------
def summarize_positive(samples: list[FaceSample], mean_emb: np.ndarray) -> None:
"""Summary of training set: per-sample cos to class mean, intra-class stats.
Outliers with cos far below the rest are likely degrading the mean
they'd be the first candidates the shipped vector-outlier filter drops.
"""
print("\n" + "=" * 78)
print(f"POSITIVE SET ANALYSIS ({len(samples)} images)")
print("=" * 78)
rows = []
for s in samples:
cs = cosine(s.embedding, mean_emb)
conf = similarity_to_confidence(cs)
red = blur_reduction(s.blur_var)
rows.append(
dict(
name=os.path.basename(s.path),
shape=f"{s.shape[0]}x{s.shape[1]}",
eye_px=s.align_info["eye_dist_px"],
angle=s.align_info["angle"] + 180,
blur=s.blur_var,
cos=cs,
conf=conf,
red=red,
adj_conf=max(0.0, conf - red),
)
)
rows.sort(key=lambda r: r["cos"])
sims = np.array([r["cos"] for r in rows])
print(
f"\nCosine-to-trimmed-mean: mean={sims.mean():.3f} std={sims.std():.3f} "
f"min={sims.min():.3f} max={sims.max():.3f}"
)
print("\n-- Worst matches (bottom 10, most likely hurting the mean) --")
print(
f"{'cos':>6} {'conf':>6} {'blur':>7} {'eyes':>6} "
f"{'angle':>6} {'shape':>9} name"
)
for r in rows[:10]:
print(
f"{r['cos']:6.3f} {r['conf']:6.3f} {r['blur']:7.1f} "
f"{r['eye_px']:6.1f} {r['angle']:6.1f} {r['shape']:>9} {r['name']}"
)
print("\n-- Best matches (top 5) --")
for r in rows[-5:][::-1]:
print(
f"{r['cos']:6.3f} {r['conf']:6.3f} {r['blur']:7.1f} "
f"{r['eye_px']:6.1f} {r['angle']:6.1f} {r['shape']:>9} {r['name']}"
)
# Pairwise analysis — flags embeddings poorly correlated with the rest
print("\n-- Pairwise intra-class similarity (mean cos vs. other positives) --")
embs = np.stack([s.embedding for s in samples], axis=0)
norms = embs / (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9)
sim_matrix = norms @ norms.T
np.fill_diagonal(sim_matrix, np.nan)
mean_pairwise = np.nanmean(sim_matrix, axis=1)
names = [os.path.basename(s.path) for s in samples]
ordered = sorted(zip(names, mean_pairwise), key=lambda t: t[1])
print(f"{'mean_cos':>9} name")
for nm, mp in ordered[:10]:
print(f"{mp:9.3f} {nm}")
print(f"\n overall mean pairwise cos: {np.nanmean(sim_matrix):.3f}")
print(f" median pairwise cos: {np.nanmedian(sim_matrix):.3f}")
def summarize_negative(
neg_samples: list[FaceSample],
mean_emb: np.ndarray,
pos_samples: list[FaceSample],
) -> None:
"""Score each negative against the class mean, then show its top-3
nearest positives. High-scoring negatives that match specific outlier
positives hint at training-set contamination.
"""
print("\n" + "=" * 78)
print(f"NEGATIVE SET ANALYSIS ({len(neg_samples)} images)")
print("=" * 78)
print(
f"\n{'cos':>6} {'conf':>6} {'red':>5} {'adj':>5} "
f"{'blur':>7} {'eyes':>6} {'shape':>9} name"
)
for s in neg_samples:
cs = cosine(s.embedding, mean_emb)
conf = similarity_to_confidence(cs)
red = blur_reduction(s.blur_var)
print(
f"{cs:6.3f} {conf:6.3f} {red:5.2f} {max(0, conf - red):5.2f} "
f"{s.blur_var:7.1f} {s.align_info['eye_dist_px']:6.1f} "
f"{s.shape[0]}x{s.shape[1]:<5} {os.path.basename(s.path)}"
)
print("\n-- For each negative, top-3 most similar positives --")
pos_embs = np.stack([p.embedding for p in pos_samples])
pos_norm = pos_embs / (np.linalg.norm(pos_embs, axis=1, keepdims=True) + 1e-9)
for s in neg_samples:
v = s.embedding / (np.linalg.norm(s.embedding) + 1e-9)
sims = pos_norm @ v
idx = np.argsort(-sims)[:3]
print(f"\n {os.path.basename(s.path)}:")
for i in idx:
print(
f" {sims[i]:6.3f} {os.path.basename(pos_samples[i].path)} "
f"blur={pos_samples[i].blur_var:.1f} "
f"eyes={pos_samples[i].align_info['eye_dist_px']:.1f}"
)
# ---------------------------------------------------------------------------
# Optional diagnostics
# ---------------------------------------------------------------------------
def vector_outlier_test(
pos: list[FaceSample], neg: list[FaceSample], base_trim: float = 0.15
) -> None:
"""Measure the shipped vector-wise outlier filter at various thresholds.
The production filter at `build_class_mean` in
frigate/data_processing/common/face/model.py uses T=0.30. This test
sweeps T so you can see which images would be dropped on a new collection
and how that affects the negative scores.
Algorithm: iteratively recompute trim_mean on the kept set, drop any
embedding with cos < T to that mean, repeat until converged. Floor at
50% of the collection to avoid collapse.
"""
print("\n" + "=" * 78)
print("VECTOR-WISE OUTLIER PRE-FILTER — layered on trim_mean(0.15)")
print("=" * 78)
all_embs = np.stack([s.embedding for s in pos])
def iterative_mean(
embs: np.ndarray,
threshold: float,
iters: int = 3,
min_keep_frac: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
keep = np.ones(len(embs), dtype=bool)
floor = max(5, int(np.ceil(min_keep_frac * len(embs))))
for _ in range(iters):
m = stats.trim_mean(embs[keep], base_trim, axis=0)
m_norm = m / (np.linalg.norm(m) + 1e-9)
e_norms = embs / (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9)
cos_to_mean = e_norms @ m_norm
new_keep = cos_to_mean >= threshold
if new_keep.sum() < floor:
top_idx = np.argsort(-cos_to_mean)[:floor]
new_keep = np.zeros_like(new_keep)
new_keep[top_idx] = True
if np.array_equal(new_keep, keep):
break
keep = new_keep
final = stats.trim_mean(embs[keep], base_trim, axis=0)
return final, keep
provisional = stats.trim_mean(all_embs, base_trim, axis=0)
p_norm = provisional / (np.linalg.norm(provisional) + 1e-9)
e_norms_all = all_embs / (np.linalg.norm(all_embs, axis=1, keepdims=True) + 1e-9)
cos_to_prov = e_norms_all @ p_norm
print("\nDistribution of cos(positive, provisional trim_mean):")
print(
f" min={cos_to_prov.min():.3f} p10={np.percentile(cos_to_prov, 10):.3f} "
f"p25={np.percentile(cos_to_prov, 25):.3f} "
f"median={np.median(cos_to_prov):.3f} "
f"p75={np.percentile(cos_to_prov, 75):.3f} max={cos_to_prov.max():.3f}"
)
baseline_mean = stats.trim_mean(all_embs, base_trim, axis=0)
baseline_pos = np.array([cosine(p.embedding, baseline_mean) for p in pos])
baseline_neg = (
np.array([cosine(n.embedding, baseline_mean) for n in neg])
if neg
else np.array([])
)
baseline_conf_neg = np.array(
[similarity_to_confidence(c) for c in baseline_neg]
)
print(
f"\nBaseline (trim_mean only, {len(pos)} images):"
f"\n pos cos min={baseline_pos.min():.3f} "
f"mean={baseline_pos.mean():.3f} max={baseline_pos.max():.3f}"
)
if len(neg):
print(
f" neg cos min={baseline_neg.min():.3f} "
f"mean={baseline_neg.mean():.3f} max={baseline_neg.max():.3f}"
)
print(
f" neg conf min={baseline_conf_neg.min():.3f} "
f"mean={baseline_conf_neg.mean():.3f} max={baseline_conf_neg.max():.3f}"
)
print(
f" margin (pos.min - neg.max): "
f"{baseline_pos.min() - baseline_neg.max():+.3f}"
)
print("\nIterative (refine mean → drop vectors with cos<T → repeat):")
print(
f"\n{'T':>5} {'kept':>6} {'pos min':>7} {'pos mean':>8} "
f"{'neg max':>7} {'neg mean':>8} {'neg conf.max':>12} {'margin':>7}"
)
for T in [0.15, 0.20, 0.25, 0.28, 0.30, 0.33, 0.36, 0.40]:
mean, keep = iterative_mean(all_embs, T)
pos_sims = np.array([cosine(p.embedding, mean) for p in pos])
neg_sims = (
np.array([cosine(n.embedding, mean) for n in neg])
if neg
else np.array([])
)
neg_conf = np.array([similarity_to_confidence(c) for c in neg_sims])
margin = pos_sims.min() - (neg_sims.max() if len(neg_sims) else 0)
print(
f"{T:5.2f} {int(keep.sum()):>3}/{len(pos):<2} "
f"{pos_sims.min():7.3f} {pos_sims.mean():8.3f} "
f"{neg_sims.max() if len(neg_sims) else float('nan'):7.3f} "
f"{neg_sims.mean() if len(neg_sims) else float('nan'):8.3f} "
f"{neg_conf.max() if len(neg_conf) else float('nan'):12.3f} "
f"{margin:+7.3f}"
)
# Show which images get dropped at the shipped threshold + neighbors
for T_show in (0.25, 0.30, 0.33):
_, keep = iterative_mean(all_embs, T_show)
print(
f"\nAt T={T_show}, the {int((~keep).sum())} dropped positives are:"
)
final_mean = stats.trim_mean(all_embs[keep], base_trim, axis=0)
m_n = final_mean / (np.linalg.norm(final_mean) + 1e-9)
for i, (p, k) in enumerate(zip(pos, keep)):
if not k:
e_n = p.embedding / (np.linalg.norm(p.embedding) + 1e-9)
cos_final = float(e_n @ m_n)
print(
f" cos_to_clean_mean={cos_final:6.3f} "
f"shape={p.shape[0]}x{p.shape[1]} "
f"eyes={p.align_info['eye_dist_px']:6.1f} "
f"blur={p.blur_var:7.1f} "
f"{os.path.basename(p.path)}"
)
def degenerate_embedding_test(
pos: list[FaceSample], neg: list[FaceSample]
) -> None:
"""Detect whether negatives and low-quality positives share a degenerate
'tiny/noisy face' region of the embedding space.
Signal: if neg-to-neg cos is higher than pos-to-pos cos, the negatives
aren't really per-identity embeddings — they're dominated by upsample /
low-resolution artifacts that all map to a similar corner of embedding
space regardless of who the face belongs to.
Also rebuilds the mean using only high-intra-similarity positives to
show whether a cleaner training set separates the negatives.
"""
print("\n" + "=" * 78)
print("DEGENERATE-EMBEDDING TEST")
print("=" * 78)
pos_embs = np.stack([l2(s.embedding) for s in pos])
neg_embs = np.stack([l2(s.embedding) for s in neg])
nn = neg_embs @ neg_embs.T
np.fill_diagonal(nn, np.nan)
pp = pos_embs @ pos_embs.T
np.fill_diagonal(pp, np.nan)
pn = pos_embs @ neg_embs.T
print(
f"\n neg<->neg mean cos : {np.nanmean(nn):.3f} "
f"(how tightly negatives cluster together)"
)
print(
f" pos<->pos mean cos : {np.nanmean(pp):.3f} "
f"(how tightly positives cluster)"
)
print(
f" pos<->neg mean cos : {pn.mean():.3f} "
f"(cross-class — should be low for a clean class)"
)
if np.nanmean(nn) > np.nanmean(pp):
print(
"\n >> neg<->neg > pos<->pos: negatives cluster more tightly than\n"
" positives. This is the degenerate-embedding signature —\n"
" upsampled tiny crops share a common 'face-like blob' region\n"
" regardless of identity."
)
mean_intra = np.nanmean(pp, axis=1)
for thresh in (0.30, 0.33, 0.36):
keep = mean_intra >= thresh
if keep.sum() < 5:
continue
clean_embs = [pos[i].embedding for i in range(len(pos)) if keep[i]]
clean_mean = stats.trim_mean(np.stack(clean_embs), 0.15, axis=0)
neg_scores = np.array([cosine(n.embedding, clean_mean) for n in neg])
neg_confs = np.array([similarity_to_confidence(c) for c in neg_scores])
pos_scores = np.array(
[
cosine(pos[i].embedding, clean_mean)
for i in range(len(pos))
if keep[i]
]
)
print(
f"\n mean_intra >= {thresh}: keeping {int(keep.sum())}/{len(pos)} positives"
)
print(
f" pos cos vs mean : min={pos_scores.min():.3f} "
f"mean={pos_scores.mean():.3f} max={pos_scores.max():.3f}"
)
print(
f" neg cos vs mean : min={neg_scores.min():.3f} "
f"mean={neg_scores.mean():.3f} max={neg_scores.max():.3f}"
)
print(
f" neg conf : min={neg_confs.min():.3f} "
f"mean={neg_confs.mean():.3f} max={neg_confs.max():.3f}"
)
print(
f" margin (pos.min - neg.max): "
f"{pos_scores.min() - neg_scores.max():+.3f}"
)
def contamination_analysis(
pos: list[FaceSample], neg: list[FaceSample]
) -> None:
"""Check whether the positive collection contains a second identity.
Two signals:
(a) Per-positive: if an image is closer to at least one negative than
to the rest of the positive class, it's likely a mislabeled face.
(b) 2-means split of the positive embeddings: if one cluster center
lands close to the negative mean, that cluster is a contaminating
sub-identity that's pulling the class mean toward the negatives.
"""
print("\n" + "=" * 78)
print("CONTAMINATION ANALYSIS")
print("=" * 78)
pos_embs = np.stack([l2(s.embedding) for s in pos])
neg_embs = np.stack([l2(s.embedding) for s in neg])
pos_names = [os.path.basename(s.path) for s in pos]
pos_pos = pos_embs @ pos_embs.T
np.fill_diagonal(pos_pos, np.nan)
pos_neg = pos_embs @ neg_embs.T
mean_intra = np.nanmean(pos_pos, axis=1)
max_to_neg = pos_neg.max(axis=1)
mean_to_neg = pos_neg.mean(axis=1)
print(
"\nPositives closer to a negative than to their own class avg"
"\n(these are candidates for mislabeled images):"
)
print(
f"\n{'max_neg':>7} {'mean_neg':>8} {'mean_intra':>10} "
f"{'delta':>6} name"
)
rows = list(zip(pos_names, max_to_neg, mean_to_neg, mean_intra))
rows.sort(key=lambda r: -(r[1] - r[3]))
for nm, mxn, mnn, mi in rows[:15]:
delta = mxn - mi
marker = " <<" if delta > 0 else ""
print(f"{mxn:7.3f} {mnn:8.3f} {mi:10.3f} {delta:6.3f} {nm}{marker}")
# 2-means in cosine space (no sklearn dependency).
print("\n2-means split of positive embeddings (cosine space):")
rng = np.random.default_rng(0)
best = None
for _ in range(5):
idx = rng.choice(len(pos_embs), 2, replace=False)
centers = pos_embs[idx].copy()
for _ in range(50):
sims = pos_embs @ centers.T
labels = np.argmax(sims, axis=1)
new_centers = np.stack(
[
l2(pos_embs[labels == k].mean(axis=0))
if np.any(labels == k)
else centers[k]
for k in range(2)
]
)
if np.allclose(new_centers, centers):
break
centers = new_centers
tight = float(np.mean([sims[i, labels[i]] for i in range(len(labels))]))
if best is None or tight > best[0]:
best = (tight, labels.copy(), centers.copy())
_, labels, centers = best
sizes = [int((labels == k).sum()) for k in range(2)]
neg_mean = l2(neg_embs.mean(axis=0))
print(
f" cluster 0: size={sizes[0]:>2} "
f"center<->other_center_cos={float(centers[0] @ centers[1]):.3f} "
f"center<->neg_mean_cos={float(centers[0] @ neg_mean):.3f}"
)
print(
f" cluster 1: size={sizes[1]:>2} "
f"center<->neg_mean_cos={float(centers[1] @ neg_mean):.3f}"
)
neg_aligned = 0 if centers[0] @ neg_mean > centers[1] @ neg_mean else 1
print(
f"\n cluster {neg_aligned} is more similar to the negatives — "
f"its members are the contamination candidates:"
)
for i, lbl in enumerate(labels):
if lbl == neg_aligned:
print(
f" max_to_neg={max_to_neg[i]:.3f} "
f"mean_intra={mean_intra[i]:.3f} {pos_names[i]}"
)
keep_mask = labels != neg_aligned
if keep_mask.sum() >= 3:
clean_embs = [pos[i].embedding for i in range(len(pos)) if keep_mask[i]]
clean_mean = stats.trim_mean(np.stack(clean_embs), 0.15, axis=0)
print(
f"\n Rebuilding class mean from the OTHER cluster "
f"({keep_mask.sum()} images):"
)
print(f" {'cos':>6} {'conf':>6} name")
for n in neg:
cs = cosine(n.embedding, clean_mean)
cf = similarity_to_confidence(cs)
print(f" {cs:6.3f} {cf:6.3f} {os.path.basename(n.path)}")
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(
description="Analyze a face recognition collection outside Frigate.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
ap.add_argument("--positive", required=True, help="Training folder for one identity")
ap.add_argument(
"--negative",
default=None,
help="Runtime-crop folder to score against (optional)",
)
ap.add_argument(
"--model-cache",
default="/config/model_cache",
help="Directory containing facedet/arcface.onnx and facedet/landmarkdet.yaml",
)
ap.add_argument(
"--trim",
type=float,
default=0.15,
help="trim_mean proportion (Frigate uses 0.15)",
)
ap.add_argument(
"--vector-outlier",
action="store_true",
help="Sweep the vector-wise outlier filter threshold",
)
ap.add_argument(
"--degenerate",
action="store_true",
help="Test whether negatives share a degenerate embedding region",
)
ap.add_argument(
"--contamination",
action="store_true",
help="Check whether the positive folder contains a second identity",
)
args = ap.parse_args()
arcface_path = os.path.join(args.model_cache, "facedet", "arcface.onnx")
landmark_path = os.path.join(args.model_cache, "facedet", "landmarkdet.yaml")
for p in (arcface_path, landmark_path):
if not os.path.exists(p):
print(f"ERROR: model file not found: {p}")
return 1
print(f"Loading ArcFace from {arcface_path}")
embedder = ArcFaceEmbedder(arcface_path)
print(f"Loading landmark model from {landmark_path}")
aligner = LandmarkAligner(landmark_path)
print(f"\nLoading positives from {args.positive} ...")
pos = load_folder(args.positive, aligner, embedder)
print(f" {len(pos)} positives loaded")
neg: list[FaceSample] = []
if args.negative:
print(f"\nLoading negatives from {args.negative} ...")
neg = load_folder(args.negative, aligner, embedder)
print(f" {len(neg)} negatives loaded")
if not pos:
print("no positive samples — aborting")
return 1
mean_emb = trimmed_mean([s.embedding for s in pos], trim=args.trim)
summarize_positive(pos, mean_emb)
if neg:
summarize_negative(neg, mean_emb, pos)
if args.vector_outlier:
vector_outlier_test(pos, neg, args.trim)
if args.degenerate and neg:
degenerate_embedding_test(pos, neg)
if args.contamination and neg:
contamination_analysis(pos, neg)
return 0
if __name__ == "__main__":
sys.exit(main())