mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-09 15:05:26 +03:00
Compare commits
3 Commits
7d7df9eb57
...
6dc81a6a0c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6dc81a6a0c | ||
|
|
434ef358a2 | ||
|
|
fe269b77b8 |
@ -133,6 +133,61 @@ class FaceRecognizer(ABC):
|
|||||||
return 0.0
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def build_class_mean(
|
||||||
|
embs: list[np.ndarray],
|
||||||
|
trim: float = 0.15,
|
||||||
|
outlier_threshold: float = 0.30,
|
||||||
|
min_keep_frac: float = 0.7,
|
||||||
|
max_iters: int = 3,
|
||||||
|
) -> np.ndarray:
|
||||||
|
"""Build a class-mean embedding with two-layer outlier protection.
|
||||||
|
|
||||||
|
Layer 1 (iterative, vector-wise): drop whole embeddings whose cosine
|
||||||
|
similarity to the current class mean is below ``outlier_threshold``.
|
||||||
|
Catches mislabeled or corrupted training samples (wrong face in the
|
||||||
|
folder, full-frame screenshots, extreme crops) that per-dimension
|
||||||
|
trimming cannot detect.
|
||||||
|
|
||||||
|
Layer 2 (per-dimension): ``scipy.stats.trim_mean`` on the retained set
|
||||||
|
to smooth per-component noise (lighting, expression, alignment jitter).
|
||||||
|
|
||||||
|
Collections with fewer than 5 images bypass outlier rejection — too few
|
||||||
|
samples to establish a reliable class center.
|
||||||
|
"""
|
||||||
|
arr = np.stack(embs, axis=0)
|
||||||
|
|
||||||
|
if len(arr) < 5:
|
||||||
|
return np.asarray(stats.trim_mean(arr, trim, axis=0))
|
||||||
|
|
||||||
|
keep = np.ones(len(arr), dtype=bool)
|
||||||
|
floor = max(5, int(np.ceil(min_keep_frac * len(arr))))
|
||||||
|
|
||||||
|
for _ in range(max_iters):
|
||||||
|
mean = stats.trim_mean(arr[keep], trim, axis=0)
|
||||||
|
m_norm = mean / (np.linalg.norm(mean) + 1e-9)
|
||||||
|
e_norms = arr / (np.linalg.norm(arr, axis=1, keepdims=True) + 1e-9)
|
||||||
|
cos = e_norms @ m_norm
|
||||||
|
new_keep = cos >= outlier_threshold
|
||||||
|
|
||||||
|
if new_keep.sum() < floor:
|
||||||
|
top = np.argsort(-cos)[:floor]
|
||||||
|
new_keep = np.zeros(len(arr), dtype=bool)
|
||||||
|
new_keep[top] = True
|
||||||
|
|
||||||
|
if np.array_equal(new_keep, keep):
|
||||||
|
break
|
||||||
|
keep = new_keep
|
||||||
|
|
||||||
|
dropped = int((~keep).sum())
|
||||||
|
|
||||||
|
if dropped:
|
||||||
|
logger.debug(
|
||||||
|
f"Vector-wise outlier filter dropped {dropped}/{len(arr)} embeddings"
|
||||||
|
)
|
||||||
|
|
||||||
|
return np.asarray(stats.trim_mean(arr[keep], trim, axis=0))
|
||||||
|
|
||||||
|
|
||||||
def similarity_to_confidence(
|
def similarity_to_confidence(
|
||||||
cosine_similarity: float,
|
cosine_similarity: float,
|
||||||
median: float = 0.3,
|
median: float = 0.3,
|
||||||
@ -229,7 +284,7 @@ class FaceNetRecognizer(FaceRecognizer):
|
|||||||
|
|
||||||
for name, embs in face_embeddings_map.items():
|
for name, embs in face_embeddings_map.items():
|
||||||
if embs:
|
if embs:
|
||||||
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
|
self.mean_embs[name] = build_class_mean(embs)
|
||||||
|
|
||||||
logger.debug("Finished building ArcFace model")
|
logger.debug("Finished building ArcFace model")
|
||||||
|
|
||||||
@ -340,7 +395,7 @@ class ArcFaceRecognizer(FaceRecognizer):
|
|||||||
|
|
||||||
for name, embs in face_embeddings_map.items():
|
for name, embs in face_embeddings_map.items():
|
||||||
if embs:
|
if embs:
|
||||||
self.mean_embs[name] = stats.trim_mean(embs, 0.15)
|
self.mean_embs[name] = build_class_mean(embs)
|
||||||
|
|
||||||
logger.debug("Finished building ArcFace model")
|
logger.debug("Finished building ArcFace model")
|
||||||
|
|
||||||
|
|||||||
376
testing-scripts/analyze_recording_keyframes.py
Normal file
376
testing-scripts/analyze_recording_keyframes.py
Normal file
@ -0,0 +1,376 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Analyze keyframe and timestamp structure of Frigate recording segments.
|
||||||
|
|
||||||
|
This is a diagnostic tool for investigating seek precision / GOP behavior on
|
||||||
|
recorded segments. It does not modify anything.
|
||||||
|
|
||||||
|
ffprobe is only available inside the Frigate container, at
|
||||||
|
/usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe
|
||||||
|
This script auto-resolves that path from the DEFAULT_FFMPEG_VERSION env var
|
||||||
|
(or falls back to scanning /usr/lib/ffmpeg/*/bin/ffprobe). Pass --ffprobe to
|
||||||
|
override if needed.
|
||||||
|
|
||||||
|
All recording segments on the filesystem are in UTC. The --timestamp flag
|
||||||
|
expects a UTC Unix timestamp.
|
||||||
|
|
||||||
|
Typical use:
|
||||||
|
# Inside the Frigate container (or wherever recordings are mounted)
|
||||||
|
python3 analyze_recording_keyframes.py <camera_name>
|
||||||
|
|
||||||
|
# Analyze 10 most recent segments
|
||||||
|
python3 analyze_recording_keyframes.py <camera_name> --count 10
|
||||||
|
|
||||||
|
# Locate the segment that contains a specific UTC Unix timestamp and
|
||||||
|
# show it plus surrounding segments
|
||||||
|
python3 analyze_recording_keyframes.py <camera> --timestamp 1713471234.567
|
||||||
|
|
||||||
|
# Custom recordings directory
|
||||||
|
python3 analyze_recording_keyframes.py <camera> --recordings-dir /media/frigate/recordings
|
||||||
|
|
||||||
|
# Override the ffprobe path explicitly
|
||||||
|
python3 analyze_recording_keyframes.py <camera> --ffprobe /usr/lib/ffmpeg/7.0/bin/ffprobe
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from statistics import mean, median, stdev
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_ffprobe_path(override: str | None) -> str:
|
||||||
|
"""Resolve the ffprobe binary path.
|
||||||
|
|
||||||
|
Inside the Frigate container, ffprobe lives at
|
||||||
|
/usr/lib/ffmpeg/{DEFAULT_FFMPEG_VERSION}/bin/ffprobe — the exact version
|
||||||
|
depends on the image build and is exposed as an env var.
|
||||||
|
"""
|
||||||
|
if override:
|
||||||
|
return override
|
||||||
|
version = os.environ.get("DEFAULT_FFMPEG_VERSION", "")
|
||||||
|
if version:
|
||||||
|
path = f"/usr/lib/ffmpeg/{version}/bin/ffprobe"
|
||||||
|
if Path(path).is_file():
|
||||||
|
return path
|
||||||
|
# Fall back to scanning the Frigate ffmpeg install root.
|
||||||
|
for candidate in sorted(Path("/usr/lib/ffmpeg").glob("*/bin/ffprobe")):
|
||||||
|
if candidate.is_file():
|
||||||
|
return str(candidate)
|
||||||
|
print(
|
||||||
|
"Could not locate ffprobe. Pass --ffprobe <path> or set "
|
||||||
|
"DEFAULT_FFMPEG_VERSION.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def find_recent_segments(recordings_dir: Path, camera: str, count: int) -> list[Path]:
|
||||||
|
"""Return the N most recent .mp4 segments for the given camera.
|
||||||
|
|
||||||
|
Expected layout: <recordings_dir>/<YYYY-MM-DD>/<HH>/<camera>/<MM>.<SS>.mp4
|
||||||
|
"""
|
||||||
|
pattern = f"*/*/{camera}/*.mp4"
|
||||||
|
segments = sorted(recordings_dir.glob(pattern))
|
||||||
|
return segments[-count:]
|
||||||
|
|
||||||
|
|
||||||
|
def find_segments_near_timestamp(
|
||||||
|
recordings_dir: Path, camera: str, target_ts: float, count: int
|
||||||
|
) -> tuple[list[Path], Path | None]:
|
||||||
|
"""Return `count` segments centered on the one containing `target_ts`.
|
||||||
|
|
||||||
|
Also returns the specific segment that should contain the timestamp, so
|
||||||
|
callers can highlight it in output.
|
||||||
|
"""
|
||||||
|
pattern = f"*/*/{camera}/*.mp4"
|
||||||
|
with_ts: list[tuple[float, Path]] = []
|
||||||
|
for seg in sorted(recordings_dir.glob(pattern)):
|
||||||
|
ts = filename_to_timestamp(seg)
|
||||||
|
if ts is not None:
|
||||||
|
with_ts.append((ts, seg))
|
||||||
|
|
||||||
|
if not with_ts:
|
||||||
|
return [], None
|
||||||
|
|
||||||
|
# Largest filename_ts that is <= target_ts — that's the segment that
|
||||||
|
# should contain the timestamp (Frigate catalogs segments by filename).
|
||||||
|
target_idx = -1
|
||||||
|
for i, (ts, _) in enumerate(with_ts):
|
||||||
|
if ts <= target_ts:
|
||||||
|
target_idx = i
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
if target_idx < 0:
|
||||||
|
# target_ts is before the earliest segment we have — just return the
|
||||||
|
# first `count` segments so the user can see what's available.
|
||||||
|
window = with_ts[:count]
|
||||||
|
return [seg for _, seg in window], None
|
||||||
|
|
||||||
|
half = count // 2
|
||||||
|
start = max(0, target_idx - half)
|
||||||
|
end = min(len(with_ts), start + count)
|
||||||
|
start = max(0, end - count)
|
||||||
|
|
||||||
|
window = with_ts[start:end]
|
||||||
|
return [seg for _, seg in window], with_ts[target_idx][1]
|
||||||
|
|
||||||
|
|
||||||
|
def filename_to_timestamp(segment: Path) -> float | None:
|
||||||
|
"""Parse the wall-clock time from Frigate's segment path layout."""
|
||||||
|
try:
|
||||||
|
date = segment.parent.parent.parent.name # YYYY-MM-DD
|
||||||
|
hour = segment.parent.parent.name # HH
|
||||||
|
mm_ss = segment.stem # MM.SS
|
||||||
|
minute, second = mm_ss.split(".")
|
||||||
|
dt = datetime.datetime.strptime(
|
||||||
|
f"{date} {hour}:{minute}:{second}",
|
||||||
|
"%Y-%m-%d %H:%M:%S",
|
||||||
|
).replace(tzinfo=datetime.timezone.utc)
|
||||||
|
return dt.timestamp()
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def run_ffprobe(ffprobe: str, args: list[str]) -> dict:
|
||||||
|
"""Run ffprobe and return parsed JSON, or empty dict on failure."""
|
||||||
|
result = subprocess.run(
|
||||||
|
[ffprobe, "-v", "error", *args, "-of", "json"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f" ffprobe error: {result.stderr.strip()}", file=sys.stderr)
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
return json.loads(result.stdout)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def get_format_info(ffprobe: str, segment: Path) -> tuple[dict, dict]:
|
||||||
|
"""Return (format_dict, stream_dict) for the first video stream."""
|
||||||
|
data = run_ffprobe(
|
||||||
|
ffprobe,
|
||||||
|
[
|
||||||
|
"-show_entries",
|
||||||
|
"format=duration,start_time",
|
||||||
|
"-show_entries",
|
||||||
|
"stream=codec_name,profile,r_frame_rate,width,height",
|
||||||
|
"-select_streams",
|
||||||
|
"v:0",
|
||||||
|
str(segment),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
fmt = data.get("format", {})
|
||||||
|
streams = data.get("streams") or [{}]
|
||||||
|
return fmt, streams[0]
|
||||||
|
|
||||||
|
|
||||||
|
def get_video_packets(ffprobe: str, segment: Path) -> list[dict]:
|
||||||
|
"""Return video packets with pts_time and flags."""
|
||||||
|
data = run_ffprobe(
|
||||||
|
ffprobe,
|
||||||
|
[
|
||||||
|
"-select_streams",
|
||||||
|
"v",
|
||||||
|
"-show_entries",
|
||||||
|
"packet=pts_time,dts_time,flags",
|
||||||
|
str(segment),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
return data.get("packets", [])
|
||||||
|
|
||||||
|
|
||||||
|
def analyze(ffprobe: str, segment: Path, highlight: bool = False) -> None:
|
||||||
|
marker = " <-- contains target timestamp" if highlight else ""
|
||||||
|
print(f"\n=== {segment} ==={marker}")
|
||||||
|
|
||||||
|
fmt, stream = get_format_info(ffprobe, segment)
|
||||||
|
duration = float(fmt.get("duration", 0) or 0)
|
||||||
|
start_time = float(fmt.get("start_time", 0) or 0)
|
||||||
|
codec = stream.get("codec_name", "?")
|
||||||
|
profile = stream.get("profile", "?")
|
||||||
|
width = stream.get("width", "?")
|
||||||
|
height = stream.get("height", "?")
|
||||||
|
fps = stream.get("r_frame_rate", "?/1")
|
||||||
|
|
||||||
|
filename_ts = filename_to_timestamp(segment)
|
||||||
|
filename_iso = (
|
||||||
|
datetime.datetime.fromtimestamp(
|
||||||
|
filename_ts, tz=datetime.timezone.utc
|
||||||
|
).isoformat()
|
||||||
|
if filename_ts is not None
|
||||||
|
else "?"
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f" Codec: {codec} ({profile}) {width}x{height} {fps}")
|
||||||
|
print(f" Filename time: {filename_ts} ({filename_iso})")
|
||||||
|
print(f" Format duration: {duration:.3f}s")
|
||||||
|
print(f" Format start: {start_time:.3f}s (PTS offset of first packet)")
|
||||||
|
|
||||||
|
packets = get_video_packets(ffprobe, segment)
|
||||||
|
if not packets:
|
||||||
|
print(" (no video packets)")
|
||||||
|
return
|
||||||
|
|
||||||
|
keyframe_times: list[float] = []
|
||||||
|
first_pts: float | None = None
|
||||||
|
last_pts: float | None = None
|
||||||
|
|
||||||
|
for pkt in packets:
|
||||||
|
pts_str = pkt.get("pts_time")
|
||||||
|
if pts_str is None or pts_str == "N/A":
|
||||||
|
continue
|
||||||
|
pts = float(pts_str)
|
||||||
|
if first_pts is None:
|
||||||
|
first_pts = pts
|
||||||
|
last_pts = pts
|
||||||
|
if "K" in pkt.get("flags", ""):
|
||||||
|
keyframe_times.append(pts)
|
||||||
|
|
||||||
|
total_packets = len(packets)
|
||||||
|
kf_count = len(keyframe_times)
|
||||||
|
|
||||||
|
print(f" Video packets: {total_packets}")
|
||||||
|
print(f" Keyframes: {kf_count}")
|
||||||
|
if first_pts is not None and last_pts is not None:
|
||||||
|
print(
|
||||||
|
f" Packet PTS: first={first_pts:.3f}s last={last_pts:.3f}s "
|
||||||
|
f"span={last_pts - first_pts:.3f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
if keyframe_times:
|
||||||
|
print(
|
||||||
|
f" Keyframe PTS: first={keyframe_times[0]:.3f}s "
|
||||||
|
f"last={keyframe_times[-1]:.3f}s"
|
||||||
|
)
|
||||||
|
formatted = ", ".join(f"{t:.3f}" for t in keyframe_times)
|
||||||
|
print(f" Keyframe times: [{formatted}]")
|
||||||
|
|
||||||
|
if len(keyframe_times) >= 2:
|
||||||
|
gaps = [b - a for a, b in zip(keyframe_times, keyframe_times[1:])]
|
||||||
|
avg_fps_estimate = (
|
||||||
|
total_packets / (last_pts - first_pts)
|
||||||
|
if last_pts and first_pts is not None and last_pts > first_pts
|
||||||
|
else 0
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" GOP gaps (s): min={min(gaps):.3f} max={max(gaps):.3f} "
|
||||||
|
f"mean={mean(gaps):.3f} median={median(gaps):.3f}"
|
||||||
|
)
|
||||||
|
if len(gaps) > 1:
|
||||||
|
print(f" stdev={stdev(gaps):.3f}")
|
||||||
|
print(
|
||||||
|
f" Est. mean GOP: ~{mean(gaps) * avg_fps_estimate:.1f} frames"
|
||||||
|
if avg_fps_estimate
|
||||||
|
else ""
|
||||||
|
)
|
||||||
|
if max(gaps) > 5:
|
||||||
|
print(
|
||||||
|
" !! Max GOP > 5s — consistent with adaptive/smart codec "
|
||||||
|
"(even if 'Smart Codec' is off in the UI, some cameras still "
|
||||||
|
"produce irregular GOPs under specific encoder profiles)"
|
||||||
|
)
|
||||||
|
elif kf_count == 1:
|
||||||
|
print(" !! Only one keyframe in segment — very long GOP")
|
||||||
|
|
||||||
|
# Report how well filename time aligns with first-packet PTS.
|
||||||
|
# (Filename time is what Frigate uses as recording.start_time in the DB.)
|
||||||
|
if filename_ts is not None and first_pts is not None:
|
||||||
|
print(
|
||||||
|
f" Notes: first packet PTS is {first_pts:.3f}s into the file; "
|
||||||
|
f"Frigate treats filename time as PTS=0 for seek math."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description=__doc__,
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
)
|
||||||
|
parser.add_argument("camera", help="Camera name (matches the recordings subfolder)")
|
||||||
|
parser.add_argument(
|
||||||
|
"--count",
|
||||||
|
type=int,
|
||||||
|
default=5,
|
||||||
|
help="Number of most recent segments to analyze (default: 5)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--recordings-dir",
|
||||||
|
default="/media/frigate/recordings",
|
||||||
|
help="Path to the recordings directory (default: /media/frigate/recordings)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--ffprobe",
|
||||||
|
default=None,
|
||||||
|
help=(
|
||||||
|
"Full path to the ffprobe binary. Defaults to the Frigate-bundled "
|
||||||
|
"binary at /usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--timestamp",
|
||||||
|
type=float,
|
||||||
|
default=None,
|
||||||
|
help=(
|
||||||
|
"Unix timestamp (UTC seconds, decimals allowed) to locate. The "
|
||||||
|
"script finds the segment that should contain this time and "
|
||||||
|
"analyzes it plus surrounding segments (count controls the "
|
||||||
|
"window). All on-disk segments are stored in UTC, so pass a UTC "
|
||||||
|
"Unix timestamp."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
ffprobe = resolve_ffprobe_path(args.ffprobe)
|
||||||
|
|
||||||
|
recordings_dir = Path(args.recordings_dir)
|
||||||
|
if not recordings_dir.is_dir():
|
||||||
|
print(
|
||||||
|
f"Recordings directory not found: {recordings_dir}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
target_segment: Path | None = None
|
||||||
|
if args.timestamp is not None:
|
||||||
|
segments, target_segment = find_segments_near_timestamp(
|
||||||
|
recordings_dir, args.camera, args.timestamp, args.count
|
||||||
|
)
|
||||||
|
target_iso = datetime.datetime.fromtimestamp(
|
||||||
|
args.timestamp, tz=datetime.timezone.utc
|
||||||
|
).isoformat()
|
||||||
|
mode = f"around timestamp {args.timestamp} ({target_iso})"
|
||||||
|
else:
|
||||||
|
segments = find_recent_segments(recordings_dir, args.camera, args.count)
|
||||||
|
mode = "most recent"
|
||||||
|
|
||||||
|
if not segments:
|
||||||
|
print(
|
||||||
|
f"No segments found for camera '{args.camera}' under {recordings_dir}",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if args.timestamp is not None and target_segment is None:
|
||||||
|
print(
|
||||||
|
f"!! Target timestamp {args.timestamp} is before the earliest "
|
||||||
|
f"segment on disk; showing the earliest available segments instead.",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"Analyzing {len(segments)} {mode} segment(s) for camera "
|
||||||
|
f"'{args.camera}' under {recordings_dir} (ffprobe: {ffprobe})"
|
||||||
|
)
|
||||||
|
for segment in segments:
|
||||||
|
analyze(ffprobe, segment, highlight=(segment == target_segment))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
783
testing-scripts/face_dataset.py
Normal file
783
testing-scripts/face_dataset.py
Normal file
@ -0,0 +1,783 @@
|
|||||||
|
"""
|
||||||
|
Face recognition investigation script.
|
||||||
|
|
||||||
|
Standalone replica of Frigate's ArcFace pipeline (see
|
||||||
|
frigate/data_processing/common/face/model.py and
|
||||||
|
frigate/embeddings/onnx/face_embedding.py) for analyzing a face collection
|
||||||
|
outside the running service. Useful for:
|
||||||
|
|
||||||
|
- Diagnosing why a person's collection produces false positives
|
||||||
|
- Finding outlier/contaminating training images
|
||||||
|
- Inspecting the effect of the shipped vector-wise outlier filter
|
||||||
|
|
||||||
|
Layout:
|
||||||
|
- Core pipeline: LandmarkAligner, ArcFaceEmbedder, arcface_preprocess,
|
||||||
|
similarity_to_confidence, blur_reduction — all mirroring the production
|
||||||
|
code exactly
|
||||||
|
- Default run: summarize positive and negative sets against a baseline
|
||||||
|
trim_mean class representation
|
||||||
|
- Optional diagnostics (flags): vector-outlier filter behavior, degenerate
|
||||||
|
"tiny crop" embedding clustering, and multi-identity contamination
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 face_investigate.py \\
|
||||||
|
--positive <positive_folder> \\
|
||||||
|
--negative <negative_folder> \\
|
||||||
|
[--model-cache /path/to/model_cache] \\
|
||||||
|
[--vector-outlier] [--degenerate] [--contamination]
|
||||||
|
|
||||||
|
The positive folder should contain training images for a single identity
|
||||||
|
(same layout as FACE_DIR/<name>/*.webp). The negative folder should contain
|
||||||
|
runtime crops to test against — a mix of true matches and misfires.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
import onnxruntime as ort
|
||||||
|
from PIL import Image
|
||||||
|
from scipy import stats
|
||||||
|
|
||||||
|
ARCFACE_INPUT_SIZE = 112
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Replicated Frigate pipeline
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _process_image_frigate(image: np.ndarray) -> Image.Image:
|
||||||
|
"""Mirror BaseEmbedding._process_image for an ndarray input.
|
||||||
|
|
||||||
|
NOTE: Frigate passes the output of `cv2.imread` (BGR) directly in. PIL's
|
||||||
|
`Image.fromarray` does NOT reorder channels, so the embedder effectively
|
||||||
|
receives a BGR-ordered tensor. We replicate that faithfully here. (Tested
|
||||||
|
— swapping to RGB produces near-identical embeddings; this model is
|
||||||
|
robust to channel order.)
|
||||||
|
"""
|
||||||
|
return Image.fromarray(image)
|
||||||
|
|
||||||
|
|
||||||
|
def arcface_preprocess(image_bgr: np.ndarray) -> np.ndarray:
|
||||||
|
"""Mirror ArcfaceEmbedding._preprocess_inputs."""
|
||||||
|
pil = _process_image_frigate(image_bgr)
|
||||||
|
|
||||||
|
width, height = pil.size
|
||||||
|
if width != ARCFACE_INPUT_SIZE or height != ARCFACE_INPUT_SIZE:
|
||||||
|
if width > height:
|
||||||
|
new_height = int(((height / width) * ARCFACE_INPUT_SIZE) // 4 * 4)
|
||||||
|
pil = pil.resize((ARCFACE_INPUT_SIZE, new_height))
|
||||||
|
else:
|
||||||
|
new_width = int(((width / height) * ARCFACE_INPUT_SIZE) // 4 * 4)
|
||||||
|
pil = pil.resize((new_width, ARCFACE_INPUT_SIZE))
|
||||||
|
|
||||||
|
og = np.array(pil).astype(np.float32)
|
||||||
|
og_h, og_w, channels = og.shape
|
||||||
|
|
||||||
|
frame = np.zeros(
|
||||||
|
(ARCFACE_INPUT_SIZE, ARCFACE_INPUT_SIZE, channels), dtype=np.float32
|
||||||
|
)
|
||||||
|
x_center = (ARCFACE_INPUT_SIZE - og_w) // 2
|
||||||
|
y_center = (ARCFACE_INPUT_SIZE - og_h) // 2
|
||||||
|
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
|
||||||
|
|
||||||
|
frame = (frame / 127.5) - 1.0
|
||||||
|
frame = np.transpose(frame, (2, 0, 1))
|
||||||
|
frame = np.expand_dims(frame, axis=0)
|
||||||
|
return frame
|
||||||
|
|
||||||
|
|
||||||
|
class LandmarkAligner:
|
||||||
|
"""Mirror FaceRecognizer.align_face."""
|
||||||
|
|
||||||
|
def __init__(self, landmark_model_path: str):
|
||||||
|
if not os.path.exists(landmark_model_path):
|
||||||
|
raise FileNotFoundError(landmark_model_path)
|
||||||
|
self.detector = cv2.face.createFacemarkLBF()
|
||||||
|
self.detector.loadModel(landmark_model_path)
|
||||||
|
|
||||||
|
def align(
|
||||||
|
self, image: np.ndarray, out_w: int, out_h: int
|
||||||
|
) -> tuple[np.ndarray, dict]:
|
||||||
|
land_image = (
|
||||||
|
cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if image.ndim == 3 else image
|
||||||
|
)
|
||||||
|
_, lands = self.detector.fit(
|
||||||
|
land_image, np.array([(0, 0, land_image.shape[1], land_image.shape[0])])
|
||||||
|
)
|
||||||
|
landmarks = lands[0][0]
|
||||||
|
|
||||||
|
leftEyePts = landmarks[42:48]
|
||||||
|
rightEyePts = landmarks[36:42]
|
||||||
|
leftEyeCenter = leftEyePts.mean(axis=0).astype("int")
|
||||||
|
rightEyeCenter = rightEyePts.mean(axis=0).astype("int")
|
||||||
|
|
||||||
|
dY = rightEyeCenter[1] - leftEyeCenter[1]
|
||||||
|
dX = rightEyeCenter[0] - leftEyeCenter[0]
|
||||||
|
angle = np.degrees(np.arctan2(dY, dX)) - 180
|
||||||
|
dist = float(np.sqrt((dX**2) + (dY**2)))
|
||||||
|
|
||||||
|
desiredRightEyeX = 1.0 - 0.35
|
||||||
|
desiredDist = (desiredRightEyeX - 0.35) * out_w
|
||||||
|
scale = desiredDist / dist if dist > 0 else 1.0
|
||||||
|
|
||||||
|
eyesCenter = (
|
||||||
|
int((leftEyeCenter[0] + rightEyeCenter[0]) // 2),
|
||||||
|
int((leftEyeCenter[1] + rightEyeCenter[1]) // 2),
|
||||||
|
)
|
||||||
|
M = cv2.getRotationMatrix2D(eyesCenter, angle, scale)
|
||||||
|
tX = out_w * 0.5
|
||||||
|
tY = out_h * 0.35
|
||||||
|
M[0, 2] += tX - eyesCenter[0]
|
||||||
|
M[1, 2] += tY - eyesCenter[1]
|
||||||
|
|
||||||
|
aligned = cv2.warpAffine(
|
||||||
|
image, M, (out_w, out_h), flags=cv2.INTER_CUBIC
|
||||||
|
)
|
||||||
|
info = dict(
|
||||||
|
angle=float(angle),
|
||||||
|
eye_dist_px=dist,
|
||||||
|
scale=float(scale),
|
||||||
|
landmarks=landmarks,
|
||||||
|
)
|
||||||
|
return aligned, info
|
||||||
|
|
||||||
|
|
||||||
|
class ArcFaceEmbedder:
|
||||||
|
def __init__(self, model_path: str):
|
||||||
|
self.session = ort.InferenceSession(
|
||||||
|
model_path, providers=["CPUExecutionProvider"]
|
||||||
|
)
|
||||||
|
self.input_name = self.session.get_inputs()[0].name
|
||||||
|
|
||||||
|
def embed(self, image_bgr: np.ndarray) -> np.ndarray:
|
||||||
|
tensor = arcface_preprocess(image_bgr)
|
||||||
|
out = self.session.run(None, {self.input_name: tensor})[0]
|
||||||
|
return out.squeeze()
|
||||||
|
|
||||||
|
|
||||||
|
def similarity_to_confidence(
|
||||||
|
cos_sim: float,
|
||||||
|
median: float = 0.3,
|
||||||
|
range_width: float = 0.6,
|
||||||
|
slope_factor: float = 12,
|
||||||
|
) -> float:
|
||||||
|
slope = slope_factor / range_width
|
||||||
|
return float(1.0 / (1.0 + np.exp(-slope * (cos_sim - median))))
|
||||||
|
|
||||||
|
|
||||||
|
def laplacian_variance(image: np.ndarray) -> float:
|
||||||
|
return float(cv2.Laplacian(image, cv2.CV_64F).var())
|
||||||
|
|
||||||
|
|
||||||
|
def blur_reduction(variance: float) -> float:
|
||||||
|
if variance < 120:
|
||||||
|
return 0.06
|
||||||
|
elif variance < 160:
|
||||||
|
return 0.04
|
||||||
|
elif variance < 200:
|
||||||
|
return 0.02
|
||||||
|
elif variance < 250:
|
||||||
|
return 0.01
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def cosine(a: np.ndarray, b: np.ndarray) -> float:
|
||||||
|
denom = np.linalg.norm(a) * np.linalg.norm(b)
|
||||||
|
if denom == 0:
|
||||||
|
return 0.0
|
||||||
|
return float(np.dot(a, b) / denom)
|
||||||
|
|
||||||
|
|
||||||
|
def l2(v: np.ndarray) -> np.ndarray:
|
||||||
|
return v / (np.linalg.norm(v) + 1e-9)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Sample loading
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FaceSample:
|
||||||
|
path: str
|
||||||
|
shape: tuple[int, int]
|
||||||
|
embedding: np.ndarray
|
||||||
|
blur_var: float
|
||||||
|
align_info: dict
|
||||||
|
|
||||||
|
|
||||||
|
def load_folder(
|
||||||
|
folder: str, aligner: LandmarkAligner, embedder: ArcFaceEmbedder
|
||||||
|
) -> list[FaceSample]:
|
||||||
|
samples: list[FaceSample] = []
|
||||||
|
names = sorted(os.listdir(folder))
|
||||||
|
for name in names:
|
||||||
|
if name.startswith("."):
|
||||||
|
continue
|
||||||
|
path = os.path.join(folder, name)
|
||||||
|
if not os.path.isfile(path):
|
||||||
|
continue
|
||||||
|
img = cv2.imread(path)
|
||||||
|
if img is None:
|
||||||
|
print(f" [skip unreadable] {name}")
|
||||||
|
continue
|
||||||
|
aligned, info = aligner.align(img, img.shape[1], img.shape[0])
|
||||||
|
emb = embedder.embed(aligned)
|
||||||
|
samples.append(
|
||||||
|
FaceSample(
|
||||||
|
path=path,
|
||||||
|
shape=(img.shape[1], img.shape[0]),
|
||||||
|
embedding=emb,
|
||||||
|
blur_var=laplacian_variance(img),
|
||||||
|
align_info=info,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return samples
|
||||||
|
|
||||||
|
|
||||||
|
def trimmed_mean(embs: Iterable[np.ndarray], trim: float = 0.15) -> np.ndarray:
|
||||||
|
arr = np.stack(list(embs), axis=0)
|
||||||
|
return stats.trim_mean(arr, trim, axis=0)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Baseline analyses (always run)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def summarize_positive(samples: list[FaceSample], mean_emb: np.ndarray) -> None:
|
||||||
|
"""Summary of training set: per-sample cos to class mean, intra-class stats.
|
||||||
|
|
||||||
|
Outliers with cos far below the rest are likely degrading the mean —
|
||||||
|
they'd be the first candidates the shipped vector-outlier filter drops.
|
||||||
|
"""
|
||||||
|
print("\n" + "=" * 78)
|
||||||
|
print(f"POSITIVE SET ANALYSIS ({len(samples)} images)")
|
||||||
|
print("=" * 78)
|
||||||
|
|
||||||
|
rows = []
|
||||||
|
for s in samples:
|
||||||
|
cs = cosine(s.embedding, mean_emb)
|
||||||
|
conf = similarity_to_confidence(cs)
|
||||||
|
red = blur_reduction(s.blur_var)
|
||||||
|
rows.append(
|
||||||
|
dict(
|
||||||
|
name=os.path.basename(s.path),
|
||||||
|
shape=f"{s.shape[0]}x{s.shape[1]}",
|
||||||
|
eye_px=s.align_info["eye_dist_px"],
|
||||||
|
angle=s.align_info["angle"] + 180,
|
||||||
|
blur=s.blur_var,
|
||||||
|
cos=cs,
|
||||||
|
conf=conf,
|
||||||
|
red=red,
|
||||||
|
adj_conf=max(0.0, conf - red),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
rows.sort(key=lambda r: r["cos"])
|
||||||
|
sims = np.array([r["cos"] for r in rows])
|
||||||
|
print(
|
||||||
|
f"\nCosine-to-trimmed-mean: mean={sims.mean():.3f} std={sims.std():.3f} "
|
||||||
|
f"min={sims.min():.3f} max={sims.max():.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
print("\n-- Worst matches (bottom 10, most likely hurting the mean) --")
|
||||||
|
print(
|
||||||
|
f"{'cos':>6} {'conf':>6} {'blur':>7} {'eyes':>6} "
|
||||||
|
f"{'angle':>6} {'shape':>9} name"
|
||||||
|
)
|
||||||
|
for r in rows[:10]:
|
||||||
|
print(
|
||||||
|
f"{r['cos']:6.3f} {r['conf']:6.3f} {r['blur']:7.1f} "
|
||||||
|
f"{r['eye_px']:6.1f} {r['angle']:6.1f} {r['shape']:>9} {r['name']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
print("\n-- Best matches (top 5) --")
|
||||||
|
for r in rows[-5:][::-1]:
|
||||||
|
print(
|
||||||
|
f"{r['cos']:6.3f} {r['conf']:6.3f} {r['blur']:7.1f} "
|
||||||
|
f"{r['eye_px']:6.1f} {r['angle']:6.1f} {r['shape']:>9} {r['name']}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Pairwise analysis — flags embeddings poorly correlated with the rest
|
||||||
|
print("\n-- Pairwise intra-class similarity (mean cos vs. other positives) --")
|
||||||
|
embs = np.stack([s.embedding for s in samples], axis=0)
|
||||||
|
norms = embs / (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9)
|
||||||
|
sim_matrix = norms @ norms.T
|
||||||
|
np.fill_diagonal(sim_matrix, np.nan)
|
||||||
|
mean_pairwise = np.nanmean(sim_matrix, axis=1)
|
||||||
|
names = [os.path.basename(s.path) for s in samples]
|
||||||
|
ordered = sorted(zip(names, mean_pairwise), key=lambda t: t[1])
|
||||||
|
print(f"{'mean_cos':>9} name")
|
||||||
|
for nm, mp in ordered[:10]:
|
||||||
|
print(f"{mp:9.3f} {nm}")
|
||||||
|
print(f"\n overall mean pairwise cos: {np.nanmean(sim_matrix):.3f}")
|
||||||
|
print(f" median pairwise cos: {np.nanmedian(sim_matrix):.3f}")
|
||||||
|
|
||||||
|
|
||||||
|
def summarize_negative(
|
||||||
|
neg_samples: list[FaceSample],
|
||||||
|
mean_emb: np.ndarray,
|
||||||
|
pos_samples: list[FaceSample],
|
||||||
|
) -> None:
|
||||||
|
"""Score each negative against the class mean, then show its top-3
|
||||||
|
nearest positives. High-scoring negatives that match specific outlier
|
||||||
|
positives hint at training-set contamination.
|
||||||
|
"""
|
||||||
|
print("\n" + "=" * 78)
|
||||||
|
print(f"NEGATIVE SET ANALYSIS ({len(neg_samples)} images)")
|
||||||
|
print("=" * 78)
|
||||||
|
print(
|
||||||
|
f"\n{'cos':>6} {'conf':>6} {'red':>5} {'adj':>5} "
|
||||||
|
f"{'blur':>7} {'eyes':>6} {'shape':>9} name"
|
||||||
|
)
|
||||||
|
for s in neg_samples:
|
||||||
|
cs = cosine(s.embedding, mean_emb)
|
||||||
|
conf = similarity_to_confidence(cs)
|
||||||
|
red = blur_reduction(s.blur_var)
|
||||||
|
print(
|
||||||
|
f"{cs:6.3f} {conf:6.3f} {red:5.2f} {max(0, conf - red):5.2f} "
|
||||||
|
f"{s.blur_var:7.1f} {s.align_info['eye_dist_px']:6.1f} "
|
||||||
|
f"{s.shape[0]}x{s.shape[1]:<5} {os.path.basename(s.path)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
print("\n-- For each negative, top-3 most similar positives --")
|
||||||
|
pos_embs = np.stack([p.embedding for p in pos_samples])
|
||||||
|
pos_norm = pos_embs / (np.linalg.norm(pos_embs, axis=1, keepdims=True) + 1e-9)
|
||||||
|
for s in neg_samples:
|
||||||
|
v = s.embedding / (np.linalg.norm(s.embedding) + 1e-9)
|
||||||
|
sims = pos_norm @ v
|
||||||
|
idx = np.argsort(-sims)[:3]
|
||||||
|
print(f"\n {os.path.basename(s.path)}:")
|
||||||
|
for i in idx:
|
||||||
|
print(
|
||||||
|
f" {sims[i]:6.3f} {os.path.basename(pos_samples[i].path)} "
|
||||||
|
f"blur={pos_samples[i].blur_var:.1f} "
|
||||||
|
f"eyes={pos_samples[i].align_info['eye_dist_px']:.1f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Optional diagnostics
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def vector_outlier_test(
|
||||||
|
pos: list[FaceSample], neg: list[FaceSample], base_trim: float = 0.15
|
||||||
|
) -> None:
|
||||||
|
"""Measure the shipped vector-wise outlier filter at various thresholds.
|
||||||
|
|
||||||
|
The production filter at `build_class_mean` in
|
||||||
|
frigate/data_processing/common/face/model.py uses T=0.30. This test
|
||||||
|
sweeps T so you can see which images would be dropped on a new collection
|
||||||
|
and how that affects the negative scores.
|
||||||
|
|
||||||
|
Algorithm: iteratively recompute trim_mean on the kept set, drop any
|
||||||
|
embedding with cos < T to that mean, repeat until converged. Floor at
|
||||||
|
50% of the collection to avoid collapse.
|
||||||
|
"""
|
||||||
|
print("\n" + "=" * 78)
|
||||||
|
print("VECTOR-WISE OUTLIER PRE-FILTER — layered on trim_mean(0.15)")
|
||||||
|
print("=" * 78)
|
||||||
|
|
||||||
|
all_embs = np.stack([s.embedding for s in pos])
|
||||||
|
|
||||||
|
def iterative_mean(
|
||||||
|
embs: np.ndarray,
|
||||||
|
threshold: float,
|
||||||
|
iters: int = 3,
|
||||||
|
min_keep_frac: float = 0.5,
|
||||||
|
) -> tuple[np.ndarray, np.ndarray]:
|
||||||
|
keep = np.ones(len(embs), dtype=bool)
|
||||||
|
floor = max(5, int(np.ceil(min_keep_frac * len(embs))))
|
||||||
|
for _ in range(iters):
|
||||||
|
m = stats.trim_mean(embs[keep], base_trim, axis=0)
|
||||||
|
m_norm = m / (np.linalg.norm(m) + 1e-9)
|
||||||
|
e_norms = embs / (np.linalg.norm(embs, axis=1, keepdims=True) + 1e-9)
|
||||||
|
cos_to_mean = e_norms @ m_norm
|
||||||
|
new_keep = cos_to_mean >= threshold
|
||||||
|
if new_keep.sum() < floor:
|
||||||
|
top_idx = np.argsort(-cos_to_mean)[:floor]
|
||||||
|
new_keep = np.zeros_like(new_keep)
|
||||||
|
new_keep[top_idx] = True
|
||||||
|
if np.array_equal(new_keep, keep):
|
||||||
|
break
|
||||||
|
keep = new_keep
|
||||||
|
final = stats.trim_mean(embs[keep], base_trim, axis=0)
|
||||||
|
return final, keep
|
||||||
|
|
||||||
|
provisional = stats.trim_mean(all_embs, base_trim, axis=0)
|
||||||
|
p_norm = provisional / (np.linalg.norm(provisional) + 1e-9)
|
||||||
|
e_norms_all = all_embs / (np.linalg.norm(all_embs, axis=1, keepdims=True) + 1e-9)
|
||||||
|
cos_to_prov = e_norms_all @ p_norm
|
||||||
|
print("\nDistribution of cos(positive, provisional trim_mean):")
|
||||||
|
print(
|
||||||
|
f" min={cos_to_prov.min():.3f} p10={np.percentile(cos_to_prov, 10):.3f} "
|
||||||
|
f"p25={np.percentile(cos_to_prov, 25):.3f} "
|
||||||
|
f"median={np.median(cos_to_prov):.3f} "
|
||||||
|
f"p75={np.percentile(cos_to_prov, 75):.3f} max={cos_to_prov.max():.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
baseline_mean = stats.trim_mean(all_embs, base_trim, axis=0)
|
||||||
|
baseline_pos = np.array([cosine(p.embedding, baseline_mean) for p in pos])
|
||||||
|
baseline_neg = (
|
||||||
|
np.array([cosine(n.embedding, baseline_mean) for n in neg])
|
||||||
|
if neg
|
||||||
|
else np.array([])
|
||||||
|
)
|
||||||
|
baseline_conf_neg = np.array(
|
||||||
|
[similarity_to_confidence(c) for c in baseline_neg]
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"\nBaseline (trim_mean only, {len(pos)} images):"
|
||||||
|
f"\n pos cos min={baseline_pos.min():.3f} "
|
||||||
|
f"mean={baseline_pos.mean():.3f} max={baseline_pos.max():.3f}"
|
||||||
|
)
|
||||||
|
if len(neg):
|
||||||
|
print(
|
||||||
|
f" neg cos min={baseline_neg.min():.3f} "
|
||||||
|
f"mean={baseline_neg.mean():.3f} max={baseline_neg.max():.3f}"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" neg conf min={baseline_conf_neg.min():.3f} "
|
||||||
|
f"mean={baseline_conf_neg.mean():.3f} max={baseline_conf_neg.max():.3f}"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" margin (pos.min - neg.max): "
|
||||||
|
f"{baseline_pos.min() - baseline_neg.max():+.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
print("\nIterative (refine mean → drop vectors with cos<T → repeat):")
|
||||||
|
print(
|
||||||
|
f"\n{'T':>5} {'kept':>6} {'pos min':>7} {'pos mean':>8} "
|
||||||
|
f"{'neg max':>7} {'neg mean':>8} {'neg conf.max':>12} {'margin':>7}"
|
||||||
|
)
|
||||||
|
for T in [0.15, 0.20, 0.25, 0.28, 0.30, 0.33, 0.36, 0.40]:
|
||||||
|
mean, keep = iterative_mean(all_embs, T)
|
||||||
|
pos_sims = np.array([cosine(p.embedding, mean) for p in pos])
|
||||||
|
neg_sims = (
|
||||||
|
np.array([cosine(n.embedding, mean) for n in neg])
|
||||||
|
if neg
|
||||||
|
else np.array([])
|
||||||
|
)
|
||||||
|
neg_conf = np.array([similarity_to_confidence(c) for c in neg_sims])
|
||||||
|
margin = pos_sims.min() - (neg_sims.max() if len(neg_sims) else 0)
|
||||||
|
print(
|
||||||
|
f"{T:5.2f} {int(keep.sum()):>3}/{len(pos):<2} "
|
||||||
|
f"{pos_sims.min():7.3f} {pos_sims.mean():8.3f} "
|
||||||
|
f"{neg_sims.max() if len(neg_sims) else float('nan'):7.3f} "
|
||||||
|
f"{neg_sims.mean() if len(neg_sims) else float('nan'):8.3f} "
|
||||||
|
f"{neg_conf.max() if len(neg_conf) else float('nan'):12.3f} "
|
||||||
|
f"{margin:+7.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Show which images get dropped at the shipped threshold + neighbors
|
||||||
|
for T_show in (0.25, 0.30, 0.33):
|
||||||
|
_, keep = iterative_mean(all_embs, T_show)
|
||||||
|
print(
|
||||||
|
f"\nAt T={T_show}, the {int((~keep).sum())} dropped positives are:"
|
||||||
|
)
|
||||||
|
final_mean = stats.trim_mean(all_embs[keep], base_trim, axis=0)
|
||||||
|
m_n = final_mean / (np.linalg.norm(final_mean) + 1e-9)
|
||||||
|
for i, (p, k) in enumerate(zip(pos, keep)):
|
||||||
|
if not k:
|
||||||
|
e_n = p.embedding / (np.linalg.norm(p.embedding) + 1e-9)
|
||||||
|
cos_final = float(e_n @ m_n)
|
||||||
|
print(
|
||||||
|
f" cos_to_clean_mean={cos_final:6.3f} "
|
||||||
|
f"shape={p.shape[0]}x{p.shape[1]} "
|
||||||
|
f"eyes={p.align_info['eye_dist_px']:6.1f} "
|
||||||
|
f"blur={p.blur_var:7.1f} "
|
||||||
|
f"{os.path.basename(p.path)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def degenerate_embedding_test(
|
||||||
|
pos: list[FaceSample], neg: list[FaceSample]
|
||||||
|
) -> None:
|
||||||
|
"""Detect whether negatives and low-quality positives share a degenerate
|
||||||
|
'tiny/noisy face' region of the embedding space.
|
||||||
|
|
||||||
|
Signal: if neg-to-neg cos is higher than pos-to-pos cos, the negatives
|
||||||
|
aren't really per-identity embeddings — they're dominated by upsample /
|
||||||
|
low-resolution artifacts that all map to a similar corner of embedding
|
||||||
|
space regardless of who the face belongs to.
|
||||||
|
|
||||||
|
Also rebuilds the mean using only high-intra-similarity positives to
|
||||||
|
show whether a cleaner training set separates the negatives.
|
||||||
|
"""
|
||||||
|
print("\n" + "=" * 78)
|
||||||
|
print("DEGENERATE-EMBEDDING TEST")
|
||||||
|
print("=" * 78)
|
||||||
|
|
||||||
|
pos_embs = np.stack([l2(s.embedding) for s in pos])
|
||||||
|
neg_embs = np.stack([l2(s.embedding) for s in neg])
|
||||||
|
|
||||||
|
nn = neg_embs @ neg_embs.T
|
||||||
|
np.fill_diagonal(nn, np.nan)
|
||||||
|
pp = pos_embs @ pos_embs.T
|
||||||
|
np.fill_diagonal(pp, np.nan)
|
||||||
|
pn = pos_embs @ neg_embs.T
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"\n neg<->neg mean cos : {np.nanmean(nn):.3f} "
|
||||||
|
f"(how tightly negatives cluster together)"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" pos<->pos mean cos : {np.nanmean(pp):.3f} "
|
||||||
|
f"(how tightly positives cluster)"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" pos<->neg mean cos : {pn.mean():.3f} "
|
||||||
|
f"(cross-class — should be low for a clean class)"
|
||||||
|
)
|
||||||
|
if np.nanmean(nn) > np.nanmean(pp):
|
||||||
|
print(
|
||||||
|
"\n >> neg<->neg > pos<->pos: negatives cluster more tightly than\n"
|
||||||
|
" positives. This is the degenerate-embedding signature —\n"
|
||||||
|
" upsampled tiny crops share a common 'face-like blob' region\n"
|
||||||
|
" regardless of identity."
|
||||||
|
)
|
||||||
|
|
||||||
|
mean_intra = np.nanmean(pp, axis=1)
|
||||||
|
for thresh in (0.30, 0.33, 0.36):
|
||||||
|
keep = mean_intra >= thresh
|
||||||
|
if keep.sum() < 5:
|
||||||
|
continue
|
||||||
|
clean_embs = [pos[i].embedding for i in range(len(pos)) if keep[i]]
|
||||||
|
clean_mean = stats.trim_mean(np.stack(clean_embs), 0.15, axis=0)
|
||||||
|
neg_scores = np.array([cosine(n.embedding, clean_mean) for n in neg])
|
||||||
|
neg_confs = np.array([similarity_to_confidence(c) for c in neg_scores])
|
||||||
|
pos_scores = np.array(
|
||||||
|
[
|
||||||
|
cosine(pos[i].embedding, clean_mean)
|
||||||
|
for i in range(len(pos))
|
||||||
|
if keep[i]
|
||||||
|
]
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"\n mean_intra >= {thresh}: keeping {int(keep.sum())}/{len(pos)} positives"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" pos cos vs mean : min={pos_scores.min():.3f} "
|
||||||
|
f"mean={pos_scores.mean():.3f} max={pos_scores.max():.3f}"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" neg cos vs mean : min={neg_scores.min():.3f} "
|
||||||
|
f"mean={neg_scores.mean():.3f} max={neg_scores.max():.3f}"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" neg conf : min={neg_confs.min():.3f} "
|
||||||
|
f"mean={neg_confs.mean():.3f} max={neg_confs.max():.3f}"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" margin (pos.min - neg.max): "
|
||||||
|
f"{pos_scores.min() - neg_scores.max():+.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def contamination_analysis(
|
||||||
|
pos: list[FaceSample], neg: list[FaceSample]
|
||||||
|
) -> None:
|
||||||
|
"""Check whether the positive collection contains a second identity.
|
||||||
|
|
||||||
|
Two signals:
|
||||||
|
(a) Per-positive: if an image is closer to at least one negative than
|
||||||
|
to the rest of the positive class, it's likely a mislabeled face.
|
||||||
|
(b) 2-means split of the positive embeddings: if one cluster center
|
||||||
|
lands close to the negative mean, that cluster is a contaminating
|
||||||
|
sub-identity that's pulling the class mean toward the negatives.
|
||||||
|
"""
|
||||||
|
print("\n" + "=" * 78)
|
||||||
|
print("CONTAMINATION ANALYSIS")
|
||||||
|
print("=" * 78)
|
||||||
|
|
||||||
|
pos_embs = np.stack([l2(s.embedding) for s in pos])
|
||||||
|
neg_embs = np.stack([l2(s.embedding) for s in neg])
|
||||||
|
pos_names = [os.path.basename(s.path) for s in pos]
|
||||||
|
|
||||||
|
pos_pos = pos_embs @ pos_embs.T
|
||||||
|
np.fill_diagonal(pos_pos, np.nan)
|
||||||
|
pos_neg = pos_embs @ neg_embs.T
|
||||||
|
|
||||||
|
mean_intra = np.nanmean(pos_pos, axis=1)
|
||||||
|
max_to_neg = pos_neg.max(axis=1)
|
||||||
|
mean_to_neg = pos_neg.mean(axis=1)
|
||||||
|
|
||||||
|
print(
|
||||||
|
"\nPositives closer to a negative than to their own class avg"
|
||||||
|
"\n(these are candidates for mislabeled images):"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f"\n{'max_neg':>7} {'mean_neg':>8} {'mean_intra':>10} "
|
||||||
|
f"{'delta':>6} name"
|
||||||
|
)
|
||||||
|
rows = list(zip(pos_names, max_to_neg, mean_to_neg, mean_intra))
|
||||||
|
rows.sort(key=lambda r: -(r[1] - r[3]))
|
||||||
|
for nm, mxn, mnn, mi in rows[:15]:
|
||||||
|
delta = mxn - mi
|
||||||
|
marker = " <<" if delta > 0 else ""
|
||||||
|
print(f"{mxn:7.3f} {mnn:8.3f} {mi:10.3f} {delta:6.3f} {nm}{marker}")
|
||||||
|
|
||||||
|
# 2-means in cosine space (no sklearn dependency).
|
||||||
|
print("\n2-means split of positive embeddings (cosine space):")
|
||||||
|
rng = np.random.default_rng(0)
|
||||||
|
best = None
|
||||||
|
for _ in range(5):
|
||||||
|
idx = rng.choice(len(pos_embs), 2, replace=False)
|
||||||
|
centers = pos_embs[idx].copy()
|
||||||
|
for _ in range(50):
|
||||||
|
sims = pos_embs @ centers.T
|
||||||
|
labels = np.argmax(sims, axis=1)
|
||||||
|
new_centers = np.stack(
|
||||||
|
[
|
||||||
|
l2(pos_embs[labels == k].mean(axis=0))
|
||||||
|
if np.any(labels == k)
|
||||||
|
else centers[k]
|
||||||
|
for k in range(2)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
if np.allclose(new_centers, centers):
|
||||||
|
break
|
||||||
|
centers = new_centers
|
||||||
|
tight = float(np.mean([sims[i, labels[i]] for i in range(len(labels))]))
|
||||||
|
if best is None or tight > best[0]:
|
||||||
|
best = (tight, labels.copy(), centers.copy())
|
||||||
|
|
||||||
|
_, labels, centers = best
|
||||||
|
sizes = [int((labels == k).sum()) for k in range(2)]
|
||||||
|
neg_mean = l2(neg_embs.mean(axis=0))
|
||||||
|
print(
|
||||||
|
f" cluster 0: size={sizes[0]:>2} "
|
||||||
|
f"center<->other_center_cos={float(centers[0] @ centers[1]):.3f} "
|
||||||
|
f"center<->neg_mean_cos={float(centers[0] @ neg_mean):.3f}"
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
f" cluster 1: size={sizes[1]:>2} "
|
||||||
|
f"center<->neg_mean_cos={float(centers[1] @ neg_mean):.3f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
neg_aligned = 0 if centers[0] @ neg_mean > centers[1] @ neg_mean else 1
|
||||||
|
print(
|
||||||
|
f"\n cluster {neg_aligned} is more similar to the negatives — "
|
||||||
|
f"its members are the contamination candidates:"
|
||||||
|
)
|
||||||
|
for i, lbl in enumerate(labels):
|
||||||
|
if lbl == neg_aligned:
|
||||||
|
print(
|
||||||
|
f" max_to_neg={max_to_neg[i]:.3f} "
|
||||||
|
f"mean_intra={mean_intra[i]:.3f} {pos_names[i]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
keep_mask = labels != neg_aligned
|
||||||
|
if keep_mask.sum() >= 3:
|
||||||
|
clean_embs = [pos[i].embedding for i in range(len(pos)) if keep_mask[i]]
|
||||||
|
clean_mean = stats.trim_mean(np.stack(clean_embs), 0.15, axis=0)
|
||||||
|
print(
|
||||||
|
f"\n Rebuilding class mean from the OTHER cluster "
|
||||||
|
f"({keep_mask.sum()} images):"
|
||||||
|
)
|
||||||
|
print(f" {'cos':>6} {'conf':>6} name")
|
||||||
|
for n in neg:
|
||||||
|
cs = cosine(n.embedding, clean_mean)
|
||||||
|
cf = similarity_to_confidence(cs)
|
||||||
|
print(f" {cs:6.3f} {cf:6.3f} {os.path.basename(n.path)}")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# main
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
ap = argparse.ArgumentParser(
|
||||||
|
description="Analyze a face recognition collection outside Frigate.",
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
epilog=__doc__,
|
||||||
|
)
|
||||||
|
ap.add_argument("--positive", required=True, help="Training folder for one identity")
|
||||||
|
ap.add_argument(
|
||||||
|
"--negative",
|
||||||
|
default=None,
|
||||||
|
help="Runtime-crop folder to score against (optional)",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--model-cache",
|
||||||
|
default="/config/model_cache",
|
||||||
|
help="Directory containing facedet/arcface.onnx and facedet/landmarkdet.yaml",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--trim",
|
||||||
|
type=float,
|
||||||
|
default=0.15,
|
||||||
|
help="trim_mean proportion (Frigate uses 0.15)",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--vector-outlier",
|
||||||
|
action="store_true",
|
||||||
|
help="Sweep the vector-wise outlier filter threshold",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--degenerate",
|
||||||
|
action="store_true",
|
||||||
|
help="Test whether negatives share a degenerate embedding region",
|
||||||
|
)
|
||||||
|
ap.add_argument(
|
||||||
|
"--contamination",
|
||||||
|
action="store_true",
|
||||||
|
help="Check whether the positive folder contains a second identity",
|
||||||
|
)
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
arcface_path = os.path.join(args.model_cache, "facedet", "arcface.onnx")
|
||||||
|
landmark_path = os.path.join(args.model_cache, "facedet", "landmarkdet.yaml")
|
||||||
|
for p in (arcface_path, landmark_path):
|
||||||
|
if not os.path.exists(p):
|
||||||
|
print(f"ERROR: model file not found: {p}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
print(f"Loading ArcFace from {arcface_path}")
|
||||||
|
embedder = ArcFaceEmbedder(arcface_path)
|
||||||
|
print(f"Loading landmark model from {landmark_path}")
|
||||||
|
aligner = LandmarkAligner(landmark_path)
|
||||||
|
|
||||||
|
print(f"\nLoading positives from {args.positive} ...")
|
||||||
|
pos = load_folder(args.positive, aligner, embedder)
|
||||||
|
print(f" {len(pos)} positives loaded")
|
||||||
|
|
||||||
|
neg: list[FaceSample] = []
|
||||||
|
if args.negative:
|
||||||
|
print(f"\nLoading negatives from {args.negative} ...")
|
||||||
|
neg = load_folder(args.negative, aligner, embedder)
|
||||||
|
print(f" {len(neg)} negatives loaded")
|
||||||
|
|
||||||
|
if not pos:
|
||||||
|
print("no positive samples — aborting")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
mean_emb = trimmed_mean([s.embedding for s in pos], trim=args.trim)
|
||||||
|
summarize_positive(pos, mean_emb)
|
||||||
|
if neg:
|
||||||
|
summarize_negative(neg, mean_emb, pos)
|
||||||
|
|
||||||
|
if args.vector_outlier:
|
||||||
|
vector_outlier_test(pos, neg, args.trim)
|
||||||
|
if args.degenerate and neg:
|
||||||
|
degenerate_embedding_test(pos, neg)
|
||||||
|
if args.contamination and neg:
|
||||||
|
contamination_analysis(pos, neg)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Loading…
Reference in New Issue
Block a user