diff --git a/testing-scripts/analyze_recording_keyframes.py b/testing-scripts/analyze_recording_keyframes.py new file mode 100644 index 000000000..982cac82f --- /dev/null +++ b/testing-scripts/analyze_recording_keyframes.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +"""Analyze keyframe and timestamp structure of Frigate recording segments. + +This is a diagnostic tool for investigating seek precision / GOP behavior on +recorded segments. It does not modify anything. + +ffprobe is only available inside the Frigate container, at + /usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe +This script auto-resolves that path from the DEFAULT_FFMPEG_VERSION env var +(or falls back to scanning /usr/lib/ffmpeg/*/bin/ffprobe). Pass --ffprobe to +override if needed. + +All recording segments on the filesystem are in UTC. The --timestamp flag +expects a UTC Unix timestamp. + +Typical use: + # Inside the Frigate container (or wherever recordings are mounted) + python3 analyze_recording_keyframes.py + + # Analyze 10 most recent segments + python3 analyze_recording_keyframes.py --count 10 + + # Locate the segment that contains a specific UTC Unix timestamp and + # show it plus surrounding segments + python3 analyze_recording_keyframes.py --timestamp 1713471234.567 + + # Custom recordings directory + python3 analyze_recording_keyframes.py --recordings-dir /media/frigate/recordings + + # Override the ffprobe path explicitly + python3 analyze_recording_keyframes.py --ffprobe /usr/lib/ffmpeg/7.0/bin/ffprobe +""" + +import argparse +import datetime +import json +import os +import subprocess +import sys +from pathlib import Path +from statistics import mean, median, stdev + + +def resolve_ffprobe_path(override: str | None) -> str: + """Resolve the ffprobe binary path. + + Inside the Frigate container, ffprobe lives at + /usr/lib/ffmpeg/{DEFAULT_FFMPEG_VERSION}/bin/ffprobe — the exact version + depends on the image build and is exposed as an env var. + """ + if override: + return override + version = os.environ.get("DEFAULT_FFMPEG_VERSION", "") + if version: + path = f"/usr/lib/ffmpeg/{version}/bin/ffprobe" + if Path(path).is_file(): + return path + # Fall back to scanning the Frigate ffmpeg install root. + for candidate in sorted(Path("/usr/lib/ffmpeg").glob("*/bin/ffprobe")): + if candidate.is_file(): + return str(candidate) + print( + "Could not locate ffprobe. Pass --ffprobe or set " + "DEFAULT_FFMPEG_VERSION.", + file=sys.stderr, + ) + sys.exit(1) + + +def find_recent_segments(recordings_dir: Path, camera: str, count: int) -> list[Path]: + """Return the N most recent .mp4 segments for the given camera. + + Expected layout: ////..mp4 + """ + pattern = f"*/*/{camera}/*.mp4" + segments = sorted(recordings_dir.glob(pattern)) + return segments[-count:] + + +def find_segments_near_timestamp( + recordings_dir: Path, camera: str, target_ts: float, count: int +) -> tuple[list[Path], Path | None]: + """Return `count` segments centered on the one containing `target_ts`. + + Also returns the specific segment that should contain the timestamp, so + callers can highlight it in output. + """ + pattern = f"*/*/{camera}/*.mp4" + with_ts: list[tuple[float, Path]] = [] + for seg in sorted(recordings_dir.glob(pattern)): + ts = filename_to_timestamp(seg) + if ts is not None: + with_ts.append((ts, seg)) + + if not with_ts: + return [], None + + # Largest filename_ts that is <= target_ts — that's the segment that + # should contain the timestamp (Frigate catalogs segments by filename). + target_idx = -1 + for i, (ts, _) in enumerate(with_ts): + if ts <= target_ts: + target_idx = i + else: + break + + if target_idx < 0: + # target_ts is before the earliest segment we have — just return the + # first `count` segments so the user can see what's available. + window = with_ts[:count] + return [seg for _, seg in window], None + + half = count // 2 + start = max(0, target_idx - half) + end = min(len(with_ts), start + count) + start = max(0, end - count) + + window = with_ts[start:end] + return [seg for _, seg in window], with_ts[target_idx][1] + + +def filename_to_timestamp(segment: Path) -> float | None: + """Parse the wall-clock time from Frigate's segment path layout.""" + try: + date = segment.parent.parent.parent.name # YYYY-MM-DD + hour = segment.parent.parent.name # HH + mm_ss = segment.stem # MM.SS + minute, second = mm_ss.split(".") + dt = datetime.datetime.strptime( + f"{date} {hour}:{minute}:{second}", + "%Y-%m-%d %H:%M:%S", + ).replace(tzinfo=datetime.timezone.utc) + return dt.timestamp() + except (ValueError, IndexError): + return None + + +def run_ffprobe(ffprobe: str, args: list[str]) -> dict: + """Run ffprobe and return parsed JSON, or empty dict on failure.""" + result = subprocess.run( + [ffprobe, "-v", "error", *args, "-of", "json"], + capture_output=True, + text=True, + check=False, + ) + if result.returncode != 0: + print(f" ffprobe error: {result.stderr.strip()}", file=sys.stderr) + return {} + try: + return json.loads(result.stdout) + except json.JSONDecodeError: + return {} + + +def get_format_info(ffprobe: str, segment: Path) -> tuple[dict, dict]: + """Return (format_dict, stream_dict) for the first video stream.""" + data = run_ffprobe( + ffprobe, + [ + "-show_entries", + "format=duration,start_time", + "-show_entries", + "stream=codec_name,profile,r_frame_rate,width,height", + "-select_streams", + "v:0", + str(segment), + ], + ) + fmt = data.get("format", {}) + streams = data.get("streams") or [{}] + return fmt, streams[0] + + +def get_video_packets(ffprobe: str, segment: Path) -> list[dict]: + """Return video packets with pts_time and flags.""" + data = run_ffprobe( + ffprobe, + [ + "-select_streams", + "v", + "-show_entries", + "packet=pts_time,dts_time,flags", + str(segment), + ], + ) + return data.get("packets", []) + + +def analyze(ffprobe: str, segment: Path, highlight: bool = False) -> None: + marker = " <-- contains target timestamp" if highlight else "" + print(f"\n=== {segment} ==={marker}") + + fmt, stream = get_format_info(ffprobe, segment) + duration = float(fmt.get("duration", 0) or 0) + start_time = float(fmt.get("start_time", 0) or 0) + codec = stream.get("codec_name", "?") + profile = stream.get("profile", "?") + width = stream.get("width", "?") + height = stream.get("height", "?") + fps = stream.get("r_frame_rate", "?/1") + + filename_ts = filename_to_timestamp(segment) + filename_iso = ( + datetime.datetime.fromtimestamp( + filename_ts, tz=datetime.timezone.utc + ).isoformat() + if filename_ts is not None + else "?" + ) + + print(f" Codec: {codec} ({profile}) {width}x{height} {fps}") + print(f" Filename time: {filename_ts} ({filename_iso})") + print(f" Format duration: {duration:.3f}s") + print(f" Format start: {start_time:.3f}s (PTS offset of first packet)") + + packets = get_video_packets(ffprobe, segment) + if not packets: + print(" (no video packets)") + return + + keyframe_times: list[float] = [] + first_pts: float | None = None + last_pts: float | None = None + + for pkt in packets: + pts_str = pkt.get("pts_time") + if pts_str is None or pts_str == "N/A": + continue + pts = float(pts_str) + if first_pts is None: + first_pts = pts + last_pts = pts + if "K" in pkt.get("flags", ""): + keyframe_times.append(pts) + + total_packets = len(packets) + kf_count = len(keyframe_times) + + print(f" Video packets: {total_packets}") + print(f" Keyframes: {kf_count}") + if first_pts is not None and last_pts is not None: + print( + f" Packet PTS: first={first_pts:.3f}s last={last_pts:.3f}s " + f"span={last_pts - first_pts:.3f}s" + ) + + if keyframe_times: + print( + f" Keyframe PTS: first={keyframe_times[0]:.3f}s " + f"last={keyframe_times[-1]:.3f}s" + ) + formatted = ", ".join(f"{t:.3f}" for t in keyframe_times) + print(f" Keyframe times: [{formatted}]") + + if len(keyframe_times) >= 2: + gaps = [b - a for a, b in zip(keyframe_times, keyframe_times[1:])] + avg_fps_estimate = ( + total_packets / (last_pts - first_pts) + if last_pts and first_pts is not None and last_pts > first_pts + else 0 + ) + print( + f" GOP gaps (s): min={min(gaps):.3f} max={max(gaps):.3f} " + f"mean={mean(gaps):.3f} median={median(gaps):.3f}" + ) + if len(gaps) > 1: + print(f" stdev={stdev(gaps):.3f}") + print( + f" Est. mean GOP: ~{mean(gaps) * avg_fps_estimate:.1f} frames" + if avg_fps_estimate + else "" + ) + if max(gaps) > 5: + print( + " !! Max GOP > 5s — consistent with adaptive/smart codec " + "(even if 'Smart Codec' is off in the UI, some cameras still " + "produce irregular GOPs under specific encoder profiles)" + ) + elif kf_count == 1: + print(" !! Only one keyframe in segment — very long GOP") + + # Report how well filename time aligns with first-packet PTS. + # (Filename time is what Frigate uses as recording.start_time in the DB.) + if filename_ts is not None and first_pts is not None: + print( + f" Notes: first packet PTS is {first_pts:.3f}s into the file; " + f"Frigate treats filename time as PTS=0 for seek math." + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("camera", help="Camera name (matches the recordings subfolder)") + parser.add_argument( + "--count", + type=int, + default=5, + help="Number of most recent segments to analyze (default: 5)", + ) + parser.add_argument( + "--recordings-dir", + default="/media/frigate/recordings", + help="Path to the recordings directory (default: /media/frigate/recordings)", + ) + parser.add_argument( + "--ffprobe", + default=None, + help=( + "Full path to the ffprobe binary. Defaults to the Frigate-bundled " + "binary at /usr/lib/ffmpeg/$DEFAULT_FFMPEG_VERSION/bin/ffprobe." + ), + ) + parser.add_argument( + "--timestamp", + type=float, + default=None, + help=( + "Unix timestamp (UTC seconds, decimals allowed) to locate. The " + "script finds the segment that should contain this time and " + "analyzes it plus surrounding segments (count controls the " + "window). All on-disk segments are stored in UTC, so pass a UTC " + "Unix timestamp." + ), + ) + args = parser.parse_args() + + ffprobe = resolve_ffprobe_path(args.ffprobe) + + recordings_dir = Path(args.recordings_dir) + if not recordings_dir.is_dir(): + print( + f"Recordings directory not found: {recordings_dir}", + file=sys.stderr, + ) + sys.exit(1) + + target_segment: Path | None = None + if args.timestamp is not None: + segments, target_segment = find_segments_near_timestamp( + recordings_dir, args.camera, args.timestamp, args.count + ) + target_iso = datetime.datetime.fromtimestamp( + args.timestamp, tz=datetime.timezone.utc + ).isoformat() + mode = f"around timestamp {args.timestamp} ({target_iso})" + else: + segments = find_recent_segments(recordings_dir, args.camera, args.count) + mode = "most recent" + + if not segments: + print( + f"No segments found for camera '{args.camera}' under {recordings_dir}", + file=sys.stderr, + ) + sys.exit(1) + + if args.timestamp is not None and target_segment is None: + print( + f"!! Target timestamp {args.timestamp} is before the earliest " + f"segment on disk; showing the earliest available segments instead.", + file=sys.stderr, + ) + + print( + f"Analyzing {len(segments)} {mode} segment(s) for camera " + f"'{args.camera}' under {recordings_dir} (ffprobe: {ffprobe})" + ) + for segment in segments: + analyze(ffprobe, segment, highlight=(segment == target_segment)) + + +if __name__ == "__main__": + main()