frigate/frigate/debug_replay.py
Josh Hawkins 2858662be9
Miscellaneous fixes (#23317)
* resolve global record.export.hwaccel_args to fix phantom camera override

* auto-stop debug replay sessions after 12 hours

* docs tweaks

* add more tips to object classification docs

* tweak language

* Store hwaccel errors with timeout so it can retry

* Add error logs for Intel GPU stats

* add area

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2026-05-27 09:19:11 -06:00

399 lines
14 KiB
Python

"""Debug replay camera management for replaying recordings with detection overlays.
The startup work (ffmpeg concat + camera config publish) lives in
frigate.jobs.debug_replay. This module owns only session presence
(active), session metadata, and post-session cleanup.
"""
import asyncio
import logging
import os
import shutil
import threading
import time
from ruamel.yaml import YAML
from frigate.config import FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdatePublisher,
CameraConfigUpdateTopic,
)
from frigate.const import (
CLIPS_DIR,
RECORD_DIR,
REPLAY_CAMERA_PREFIX,
REPLAY_DIR,
THUMB_DIR,
)
from frigate.jobs.debug_replay import (
JOB_TYPE as DEBUG_REPLAY_JOB_TYPE,
)
from frigate.jobs.debug_replay import (
cancel_debug_replay_job,
wait_for_runner,
)
from frigate.jobs.export import JobStatePublisher
from frigate.types import JobStatusTypesEnum
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
from frigate.util.config import find_config_file
logger = logging.getLogger(__name__)
MAX_SESSION_DURATION_SECONDS = 12 * 60 * 60
AUTO_STOP_CHECK_INTERVAL_SECONDS = 60
class DebugReplayManager:
"""Owns the lifecycle pointers for a single debug replay session.
A session exists from the moment mark_starting is called (synchronously,
inside the API handler) until clear_session runs (on success cleanup,
failure, or stop). The active property is the source of truth that the
status bar consumes — broader than the startup job, which only covers the
preparing_clip / starting_camera window.
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self.replay_camera_name: str | None = None
self.source_camera: str | None = None
self.clip_path: str | None = None
self.start_ts: float | None = None
self.end_ts: float | None = None
self.session_started_at: float | None = None
self._job_state_publisher = JobStatePublisher()
@property
def active(self) -> bool:
"""True from mark_starting until clear_session."""
return self.replay_camera_name is not None
def mark_starting(
self,
source_camera: str,
replay_camera_name: str,
start_ts: float,
end_ts: float,
) -> None:
"""Synchronously claim the session before the job runner starts.
Called inside the API handler so the status bar sees active=True
immediately, before the worker thread does any ffmpeg work.
"""
with self._lock:
self.replay_camera_name = replay_camera_name
self.source_camera = source_camera
self.start_ts = start_ts
self.end_ts = end_ts
self.clip_path = None
self.session_started_at = time.time()
def mark_session_ready(self, clip_path: str) -> None:
"""Record the on-disk clip path after the camera has been published."""
with self._lock:
self.clip_path = clip_path
def clear_session(self) -> None:
"""Reset session pointers without publishing camera removal.
Used by the job runner on failure paths. stop() does the camera
teardown plus this clear in one step.
"""
with self._lock:
self._clear_locked()
def _clear_locked(self) -> None:
self.replay_camera_name = None
self.source_camera = None
self.clip_path = None
self.start_ts = None
self.end_ts = None
self.session_started_at = None
def publish_camera(
self,
source_camera: str,
replay_name: str,
clip_path: str,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> None:
"""Build the in-memory replay camera config and publish the add event.
Called by the job runner during the starting_camera phase.
"""
source_config = frigate_config.cameras[source_camera]
camera_dict = self._build_camera_config_dict(
source_config, replay_name, clip_path
)
config_file = find_config_file()
yaml_parser = YAML()
with open(config_file, "r") as f:
config_data = yaml_parser.load(f)
if "cameras" not in config_data or config_data["cameras"] is None:
config_data["cameras"] = {}
config_data["cameras"][replay_name] = camera_dict
try:
new_config = FrigateConfig.parse_object(config_data)
except Exception as e:
raise RuntimeError(f"Failed to validate replay camera config: {e}") from e
frigate_config.cameras[replay_name] = new_config.cameras[replay_name]
config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.add, replay_name),
new_config.cameras[replay_name],
)
def stop(
self,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> None:
"""Cancel any in-flight startup job and tear down the active session.
Safe to call when no session is active (no-op with a warning).
"""
cancel_debug_replay_job()
wait_for_runner(timeout=2.0)
with self._lock:
if not self.active:
logger.warning("No active replay session to stop")
return
replay_name = self.replay_camera_name
source_camera = self.source_camera
# Only publish remove if the camera was actually added to the live
# config (i.e. the runner reached the starting_camera phase).
if replay_name is not None and replay_name in frigate_config.cameras:
config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name),
frigate_config.cameras[replay_name],
)
frigate_config.cameras.pop(replay_name, None)
if replay_name is not None:
self._cleanup_db(replay_name)
self._cleanup_files(replay_name)
self._job_state_publisher.publish(
{
"id": "stopped",
"job_type": DEBUG_REPLAY_JOB_TYPE,
"status": JobStatusTypesEnum.cancelled,
"start_time": None,
"end_time": time.time(),
"error_message": None,
"results": {
"source_camera": source_camera,
"replay_camera_name": replay_name,
},
}
)
self._clear_locked()
logger.info("Debug replay stopped and cleaned up: %s", replay_name)
def _build_camera_config_dict(
self,
source_config,
replay_name: str,
clip_path: str,
) -> dict:
"""Build a camera config dictionary for the replay camera."""
# Extract detect config (exclude computed fields)
detect_dict = source_config.detect.model_dump(
exclude={"min_initialized", "max_disappeared", "enabled_in_config"}
)
# Extract objects config, using .dict() on filters to convert
# RuntimeFilterConfig ndarray masks back to string coordinates
objects_dict = {
"track": source_config.objects.track,
"mask": {
mask_id: (
mask_cfg.model_dump(
exclude={"raw_coordinates", "enabled_in_config"}
)
if mask_cfg is not None
else None
)
for mask_id, mask_cfg in source_config.objects.mask.items()
}
if source_config.objects.mask
else {},
"filters": {
name: filt.dict() if hasattr(filt, "dict") else filt.model_dump()
for name, filt in source_config.objects.filters.items()
},
}
# Extract zones (exclude_defaults avoids serializing empty defaults
# like distances=[] that fail validation on re-parse)
zones_dict = {}
for zone_name, zone_config in source_config.zones.items():
zone_dump = zone_config.model_dump(
exclude={"contour", "color"}, exclude_defaults=True
)
zone_dump.setdefault("coordinates", zone_config.coordinates)
zones_dict[zone_name] = zone_dump
# Extract LPR and face recognition configs
lpr_dict = source_config.lpr.model_dump()
face_recognition_dict = source_config.face_recognition.model_dump()
# Extract motion config (exclude runtime fields)
motion_dict = {}
if source_config.motion is not None:
motion_dict = source_config.motion.model_dump(
exclude={
"frame_shape",
"raw_mask",
"mask",
"enabled_in_config",
"rasterized_mask",
}
)
if source_config.motion.mask:
motion_dict["mask"] = {
mask_id: (
mask_cfg.model_dump(
exclude={"raw_coordinates", "enabled_in_config"}
)
if mask_cfg is not None
else None
)
for mask_id, mask_cfg in source_config.motion.mask.items()
}
return {
"enabled": True,
"ffmpeg": {
"inputs": [
{
"path": clip_path,
"roles": ["detect"],
"input_args": "-re -stream_loop -1 -fflags +genpts",
}
],
"hwaccel_args": [],
},
"detect": detect_dict,
"objects": objects_dict,
"zones": zones_dict,
"motion": motion_dict,
"record": {"enabled": False},
"snapshots": {"enabled": False},
"review": {
"alerts": {"enabled": False},
"detections": {"enabled": False},
},
"birdseye": {"enabled": False},
"audio": {"enabled": False},
"lpr": lpr_dict,
"face_recognition": face_recognition_dict,
}
def _cleanup_db(self, camera_name: str) -> None:
"""Defensively remove any database rows for the replay camera."""
cleanup_camera_db(camera_name)
def _cleanup_files(self, camera_name: str) -> None:
"""Remove filesystem artifacts for the replay camera."""
cleanup_camera_files(camera_name)
# Remove replay-specific cache directory
if os.path.exists(REPLAY_DIR):
try:
shutil.rmtree(REPLAY_DIR)
logger.debug("Removed replay cache directory")
except Exception as e:
logger.error("Failed to remove replay cache: %s", e)
def cleanup_replay_cameras() -> None:
"""Remove any stale replay camera artifacts on startup.
Since replay cameras are memory-only and never written to YAML, they
won't appear in the config after a restart. This function cleans up
filesystem and database artifacts from any replay that was running when
the process stopped.
Must be called AFTER the database is bound.
"""
stale_cameras: set[str] = set()
# Scan filesystem for leftover replay artifacts to derive camera names
for dir_path in [RECORD_DIR, CLIPS_DIR, THUMB_DIR]:
if os.path.isdir(dir_path):
for entry in os.listdir(dir_path):
if entry.startswith(REPLAY_CAMERA_PREFIX):
stale_cameras.add(entry)
if os.path.isdir(REPLAY_DIR):
for entry in os.listdir(REPLAY_DIR):
if entry.startswith(REPLAY_CAMERA_PREFIX) and entry.endswith(".mp4"):
stale_cameras.add(entry.removesuffix(".mp4"))
if not stale_cameras:
return
logger.info("Cleaning up stale replay camera artifacts: %s", list(stale_cameras))
manager = DebugReplayManager()
for camera_name in stale_cameras:
manager._cleanup_db(camera_name)
manager._cleanup_files(camera_name)
if os.path.exists(REPLAY_DIR):
try:
shutil.rmtree(REPLAY_DIR)
except Exception as e:
logger.error("Failed to remove replay cache directory: %s", e)
async def debug_replay_auto_stop_watchdog(
manager: DebugReplayManager,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> None:
"""Auto-stop debug replay sessions that exceed MAX_SESSION_DURATION_SECONDS.
Backstop against a session left running for days. The cap is intentionally
generous so realistic tuning and overnight soak workflows aren't disrupted.
"""
while True:
try:
await asyncio.sleep(AUTO_STOP_CHECK_INTERVAL_SECONDS)
started_at = manager.session_started_at
if not manager.active or started_at is None:
continue
if time.time() - started_at < MAX_SESSION_DURATION_SECONDS:
continue
replay_name = manager.replay_camera_name
await asyncio.to_thread(
manager.stop,
frigate_config=frigate_config,
config_publisher=config_publisher,
)
logger.info(
"Debug replay auto-stopped after exceeding max session duration of %d hours: %s",
MAX_SESSION_DURATION_SECONDS // 3600,
replay_name,
)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Error in debug replay auto-stop watchdog")