backend: endpoint and util funcs

This commit is contained in:
Josh Hawkins 2026-06-11 08:09:40 -05:00
parent efe585a920
commit c2b11a97d3
4 changed files with 197 additions and 22 deletions

View File

@ -34,11 +34,15 @@ from frigate.config.camera.updater import (
)
from frigate.config.env import substitute_frigate_vars
from frigate.models import User
from frigate.util.builtin import clean_camera_user_pass
from frigate.util.builtin import clean_camera_user_pass, get_record_segment_time
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
from frigate.util.config import find_config_file
from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import ffprobe_stream, is_restricted_go2rtc_source
from frigate.util.services import (
analyze_record_keyframes,
ffprobe_stream,
is_restricted_go2rtc_source,
)
logger = logging.getLogger(__name__)
@ -362,6 +366,48 @@ def ffprobe(request: Request, paths: str = "", detailed: bool = False):
return JSONResponse(content=output)
@router.get("/keyframe_analysis", dependencies=[Depends(require_role(["admin"]))])
async def keyframe_analysis(request: Request, camera: str = ""):
"""Probe a camera's record stream and classify its keyframe spacing.
Detects smart/+ codecs and long/variable GOPs that degrade recording.
"""
config: FrigateConfig = request.app.frigate_config
if camera not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"{camera} is not a valid camera."},
status_code=404,
)
camera_config = config.cameras[camera]
if not camera_config.enabled:
return JSONResponse(
content={"success": False, "message": f"{camera} is not enabled."},
status_code=404,
)
# keyframe spacing only matters when this camera is recording
if not camera_config.record.enabled:
return JSONResponse(content={"severity": "record_disabled"})
# recording guarantees an input carries the record role; its index matches
# the "Stream N" numbering the ffprobe endpoint surfaces (same input order)
record_index, record_input = next(
(idx, i)
for idx, i in enumerate(camera_config.ffmpeg.inputs)
if "record" in i.roles
)
segment_time = get_record_segment_time(camera_config)
result = await analyze_record_keyframes(
config.ffmpeg, record_input.path, segment_time
)
result["stream_index"] = record_index
return JSONResponse(content=result)
@router.get("/ffprobe/snapshot", dependencies=[Depends(require_role(["admin"]))])
def ffprobe_snapshot(request: Request, url: str = "", timeout: int = 10):
"""Get a snapshot from a stream URL using ffmpeg."""

View File

@ -14,13 +14,16 @@ import urllib.parse
from collections.abc import Mapping
from multiprocessing.managers import ValueProxy
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
import numpy as np
from ruamel.yaml import YAML
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
if TYPE_CHECKING:
from frigate.config import CameraConfig
logger = logging.getLogger(__name__)
@ -132,6 +135,24 @@ def get_ffmpeg_arg_list(arg: Any) -> list:
return arg if isinstance(arg, list) else shlex.split(arg)
# all built-in record presets use this segment_time
DEFAULT_RECORD_SEGMENT_TIME = 10
def get_record_segment_time(config: "CameraConfig") -> int:
"""Extract -segment_time from the camera's record output args."""
record_args = get_ffmpeg_arg_list(config.ffmpeg.output_args.record)
if record_args and record_args[0].startswith("preset"):
return DEFAULT_RECORD_SEGMENT_TIME
try:
idx = record_args.index("-segment_time")
return int(record_args[idx + 1])
except (ValueError, IndexError):
return DEFAULT_RECORD_SEGMENT_TIME
def load_labels(
path: Optional[str], encoding="utf-8", prefill=91, indexed: bool | None = None
):

View File

@ -879,6 +879,131 @@ def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedPro
return result
KEYFRAME_PROBE_WINDOW_SECONDS = 20
KEYFRAME_GAP_WARNING_SECONDS = 4.0
def parse_keyframe_packets(output: str) -> Tuple[List[float], Optional[float]]:
"""Parse ffprobe CSV `pts_time,flags` output.
Returns the presentation timestamps of keyframes (flags containing "K")
and the maximum timestamp observed across all packets.
"""
keyframe_pts: List[float] = []
max_pts: Optional[float] = None
for line in output.splitlines():
parts = line.split(",")
if len(parts) < 2:
continue
try:
pts = float(parts[0])
except ValueError:
continue
if max_pts is None or pts > max_pts:
max_pts = pts
if "K" in parts[1]:
keyframe_pts.append(pts)
return keyframe_pts, max_pts
def classify_keyframe_gaps(
keyframe_pts: List[float], segment_time: int
) -> dict[str, Any]:
"""Classify keyframe spacing for recording suitability.
A camera using a smart/+ codec or a long/variable GOP produces large or
irregular gaps between keyframes, which breaks time-based recording
segmentation. Severity:
- "unknown" when fewer than two keyframes were observed
- "error" when the longest gap exceeds the record segment length
- "warning" when the longest gap exceeds the warning threshold
- "ok" otherwise
"""
thresholds = {
"warning": KEYFRAME_GAP_WARNING_SECONDS,
"error": segment_time,
}
if len(keyframe_pts) < 2:
return {
"keyframe_count": len(keyframe_pts),
"max_gap": None,
"mean_gap": None,
"min_gap": None,
"segment_time": segment_time,
"severity": "unknown",
"thresholds": thresholds,
}
gaps = [b - a for a, b in zip(keyframe_pts, keyframe_pts[1:])]
max_gap = max(gaps)
if max_gap > segment_time:
severity = "error"
elif max_gap > KEYFRAME_GAP_WARNING_SECONDS:
severity = "warning"
else:
severity = "ok"
return {
"keyframe_count": len(keyframe_pts),
"max_gap": round(max_gap, 2),
"mean_gap": round(sum(gaps) / len(gaps), 2),
"min_gap": round(min(gaps), 2),
"segment_time": segment_time,
"severity": severity,
"thresholds": thresholds,
}
async def analyze_record_keyframes(
ffmpeg, url: str, segment_time: int, window: int = KEYFRAME_PROBE_WINDOW_SECONDS
) -> dict[str, Any]:
"""Probe a stream for ~`window` seconds and classify its keyframe spacing.
Reads video packet flags via ffprobe to find keyframes, then measures the
gaps between them. On timeout or failure returns an "unknown" result rather
than a false all-clear.
"""
clean_url = escape_special_characters(url)
cmd = [
ffmpeg.ffprobe_path,
"-v",
"error",
"-select_streams",
"v:0",
"-read_intervals",
f"%+{window}",
"-show_entries",
"packet=pts_time,flags",
"-of",
"csv=p=0",
clean_url,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=window + 15)
except asyncio.TimeoutError:
logger.warning("Keyframe probe timed out for record stream")
proc.kill()
return classify_keyframe_gaps([], segment_time)
except OSError as err:
logger.error("Keyframe probe failed: %s", err)
return classify_keyframe_gaps([], segment_time)
keyframe_pts, max_pts = parse_keyframe_packets(stdout.decode("utf-8", "replace"))
result = classify_keyframe_gaps(keyframe_pts, segment_time)
result["duration_observed"] = round(max_pts, 2) if max_pts is not None else None
return result
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
if not device_name:

View File

@ -24,7 +24,7 @@ from frigate.config.camera.updater import (
)
from frigate.const import PROCESS_PRIORITY_HIGH
from frigate.log import LogPipe
from frigate.util.builtin import EventsPerSecond, get_ffmpeg_arg_list
from frigate.util.builtin import EventsPerSecond, get_record_segment_time
from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg
from frigate.util.image import (
FrameManager,
@ -34,23 +34,6 @@ from frigate.util.process import FrigateProcess
logger = logging.getLogger(__name__)
# all built-in record presets use this segment_time
DEFAULT_RECORD_SEGMENT_TIME = 10
def _get_record_segment_time(config: CameraConfig) -> int:
"""Extract -segment_time from the camera's record output args."""
record_args = get_ffmpeg_arg_list(config.ffmpeg.output_args.record)
if record_args and record_args[0].startswith("preset"):
return DEFAULT_RECORD_SEGMENT_TIME
try:
idx = record_args.index("-segment_time")
return int(record_args[idx + 1])
except (ValueError, IndexError):
return DEFAULT_RECORD_SEGMENT_TIME
def capture_frames(
ffmpeg_process: sp.Popen[Any],
@ -185,7 +168,7 @@ class CameraWatchdog(threading.Thread):
# `valid` segments are published with the segment's start time, so the
# gap between consecutive publishes can reach 2 * segment_time. Pad the
# staleness threshold so it's never tighter than that worst case.
segment_time = _get_record_segment_time(self.config)
segment_time = get_record_segment_time(self.config)
self.record_stale_threshold = max(120, 2 * segment_time + 30)
# Stall tracking (based on last processed frame)