From 5eee65984aa866a30c7f122166dced7b7069d7c0 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sun, 3 May 2026 11:09:24 -0500 Subject: [PATCH] simplify to use Job infrastructure --- frigate/api/debug_replay.py | 53 ++--- frigate/debug_replay.py | 376 ++++++---------------------------- frigate/jobs/debug_replay.py | 384 +++++++++++++++++++++++++++++++++++ web/src/pages/Replay.tsx | 90 ++++---- 4 files changed, 527 insertions(+), 376 deletions(-) create mode 100644 frigate/jobs/debug_replay.py diff --git a/frigate/api/debug_replay.py b/frigate/api/debug_replay.py index c9dcb12e4..62307a9f1 100644 --- a/frigate/api/debug_replay.py +++ b/frigate/api/debug_replay.py @@ -10,6 +10,7 @@ from pydantic import BaseModel, Field from frigate.api.auth import require_role from frigate.api.defs.tags import Tags +from frigate.jobs.debug_replay import start_debug_replay_job logger = logging.getLogger(__name__) @@ -29,16 +30,19 @@ class DebugReplayStartResponse(BaseModel): success: bool replay_camera: str - state: str + job_id: str class DebugReplayStatusResponse(BaseModel): - """Response for debug replay status.""" + """Response for debug replay status. + + Returns only session-presence fields. Startup progress and error + details flow through the job_state WebSocket topic via the + ``debug_replay`` job (see :mod:`frigate.jobs.debug_replay`); the + Replay page subscribes there with ``useJobStatus("debug_replay")``. + """ active: bool - state: str - progress_percent: float | None = None - error_message: str | None = None replay_camera: str | None = None source_camera: str | None = None start_time: float | None = None @@ -58,30 +62,30 @@ class DebugReplayStopResponse(BaseModel): dependencies=[Depends(require_role(["admin"]))], summary="Start debug replay", description="Start a debug replay session from camera recordings. Returns " - "immediately while clip generation runs asynchronously; poll " - "/debug_replay/status to track progress.", + "immediately while clip generation runs as a background job; subscribe " + "to the 'debug_replay' job_state WS topic to track progress.", ) async def start_debug_replay(request: Request, body: DebugReplayStartBody): """Start a debug replay session asynchronously.""" replay_manager = request.app.replay_manager - if replay_manager.active: - return JSONResponse( - content={ - "success": False, - "message": "A replay session is already active", - }, - status_code=409, - ) - try: - replay_camera = await asyncio.to_thread( - replay_manager.start, + job_id = await asyncio.to_thread( + start_debug_replay_job, source_camera=body.camera, start_ts=body.start_time, end_ts=body.end_time, frigate_config=request.app.frigate_config, config_publisher=request.app.config_publisher, + replay_manager=replay_manager, + ) + except RuntimeError as exc: + return JSONResponse( + content={ + "success": False, + "message": str(exc), + }, + status_code=409, ) except ValueError as exc: logger.info("Rejected debug replay start request: %s", exc) @@ -96,8 +100,8 @@ async def start_debug_replay(request: Request, body: DebugReplayStartBody): return JSONResponse( content={ "success": True, - "replay_camera": replay_camera, - "state": replay_manager.state.value, + "replay_camera": replay_manager.replay_camera_name, + "job_id": job_id, }, status_code=202, ) @@ -119,7 +123,11 @@ def get_debug_replay_status(request: Request): if replay_manager.active and replay_camera: frame_processor = request.app.detected_frames_processor - frame = frame_processor.get_current_frame(replay_camera) + frame = ( + frame_processor.get_current_frame(replay_camera) + if frame_processor is not None + else None + ) if frame is not None: frame_time = frame_processor.get_current_frame_time(replay_camera) @@ -133,9 +141,6 @@ def get_debug_replay_status(request: Request): return DebugReplayStatusResponse( active=replay_manager.active, - state=replay_manager.state.value, - progress_percent=replay_manager.progress_percent, - error_message=replay_manager.error_message, replay_camera=replay_camera, source_camera=replay_manager.source_camera, start_time=replay_manager.start_ts, diff --git a/frigate/debug_replay.py b/frigate/debug_replay.py index 9755b0564..d2ce576bd 100644 --- a/frigate/debug_replay.py +++ b/frigate/debug_replay.py @@ -1,11 +1,14 @@ -"""Debug replay camera management for replaying recordings with detection overlays.""" +"""Debug replay camera management for replaying recordings with detection overlays. + +The startup work (ffmpeg concat + camera config publish) lives in +``frigate.jobs.debug_replay``. This module owns only session presence +(``active``), session metadata, and post-session cleanup. +""" import logging import os import shutil -import subprocess as sp import threading -from enum import Enum from ruamel.yaml import YAML @@ -22,280 +25,77 @@ from frigate.const import ( REPLAY_DIR, THUMB_DIR, ) -from frigate.models import Recordings +from frigate.jobs.debug_replay import cancel_debug_replay_job, wait_for_runner from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files from frigate.util.config import find_config_file -from frigate.util.ffmpeg import run_ffmpeg_with_progress logger = logging.getLogger(__name__) -class ReplayState(str, Enum): - """State of the debug replay session lifecycle. - - idle: no session - preparing_clip: ffmpeg concat is running, no replay camera yet - starting_camera: clip ready, publishing camera config update - active: replay camera is published; first frame may not have arrived yet - error: startup failed; error_message is set - """ - - idle = "idle" - preparing_clip = "preparing_clip" - starting_camera = "starting_camera" - active = "active" - error = "error" - - class DebugReplayManager: - """Manages a single debug replay session.""" + """Owns the lifecycle pointers for a single debug replay session. + + A session exists from the moment ``mark_starting`` is called (synchronously, + inside the API handler) until ``clear_session`` runs (on success cleanup, + failure, or stop). The ``active`` property is the source of truth that the + status bar consumes — broader than the startup job, which only covers the + preparing_clip / starting_camera window. + """ def __init__(self) -> None: self._lock = threading.Lock() - self._state: ReplayState = ReplayState.idle - self.error_message: str | None = None self.replay_camera_name: str | None = None self.source_camera: str | None = None self.clip_path: str | None = None self.start_ts: float | None = None self.end_ts: float | None = None - self._active_process: sp.Popen | None = None - self._worker_thread: threading.Thread | None = None - self.progress_percent: float | None = None - - @property - def state(self) -> ReplayState: - return self._state @property def active(self) -> bool: - """Whether a replay session is in progress (preparing, starting, or active).""" - return self._state in ( - ReplayState.preparing_clip, - ReplayState.starting_camera, - ReplayState.active, - ) + """True from ``mark_starting`` until ``clear_session``.""" + return self.replay_camera_name is not None - def _set_state( - self, state: ReplayState, error_message: str | None = None - ) -> None: - """Internal state transition helper. Always pair `error` with an error_message.""" - self._state = state - self.error_message = error_message if state == ReplayState.error else None - if state in (ReplayState.idle, ReplayState.error): - self.progress_percent = None - - def start( + def mark_starting( self, source_camera: str, + replay_camera_name: str, start_ts: float, end_ts: float, - frigate_config: FrigateConfig, - config_publisher: CameraConfigUpdatePublisher, - ) -> str: - """Validate inputs and kick off async startup. Returns immediately. + ) -> None: + """Synchronously claim the session before the job runner starts. - The clip generation, config build, and camera publish run on a worker - thread. Poll `state` / `error_message` to track progress. - - Args: - source_camera: Name of the source camera to replay - start_ts: Start timestamp - end_ts: End timestamp - frigate_config: Current Frigate configuration - config_publisher: Publisher for camera config updates - - Returns: - The replay camera name (deterministic from source_camera) - - Raises: - ValueError: If a session is already active or parameters are invalid + Called inside the API handler so the status bar sees ``active=True`` + immediately, before the worker thread does any ffmpeg work. """ with self._lock: - if self.active: - raise ValueError("A replay session is already active") - - if source_camera not in frigate_config.cameras: - raise ValueError(f"Camera '{source_camera}' not found") - - if end_ts <= start_ts: - raise ValueError("End time must be after start time") - - recordings = self._query_recordings(source_camera, start_ts, end_ts) - if not recordings.count(): - raise ValueError( - f"No recordings found for camera '{source_camera}' in the specified time range" - ) - - replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}" - self.replay_camera_name = replay_name + self.replay_camera_name = replay_camera_name self.source_camera = source_camera self.start_ts = start_ts self.end_ts = end_ts - self.progress_percent = None - self._set_state(ReplayState.preparing_clip) + self.clip_path = None - worker = threading.Thread( - target=self._run_start_worker, - name=f"debug-replay-start-{replay_name}", - args=(source_camera, start_ts, end_ts, frigate_config, config_publisher), - daemon=True, - ) - self._worker_thread = worker - worker.start() + def mark_session_ready(self, clip_path: str) -> None: + """Record the on-disk clip path after the camera has been published.""" + with self._lock: + self.clip_path = clip_path - return replay_name + def clear_session(self) -> None: + """Reset session pointers without publishing camera removal. - def _query_recordings(self, source_camera: str, start_ts: float, end_ts: float): - """Return the Recordings query for the time range. Extracted so tests can patch. - - Args: - source_camera: Name of the source camera - start_ts: Start timestamp - end_ts: End timestamp - - Returns: - Peewee query for recordings in the time range + Used by the job runner on failure paths. ``stop()`` does the camera + teardown plus this clear in one step. """ - return ( - Recordings.select( - Recordings.path, - Recordings.start_time, - Recordings.end_time, - ) - .where( - Recordings.start_time.between(start_ts, end_ts) - | Recordings.end_time.between(start_ts, end_ts) - | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) - ) - .where(Recordings.camera == source_camera) - .order_by(Recordings.start_time.asc()) - ) + with self._lock: + self._clear_locked() - def _run_start_worker( - self, - source_camera: str, - start_ts: float, - end_ts: float, - frigate_config: FrigateConfig, - config_publisher: CameraConfigUpdatePublisher, - ) -> None: - """Worker thread body — runs ffmpeg and publishes the camera config. + def _clear_locked(self) -> None: + self.replay_camera_name = None + self.source_camera = None + self.clip_path = None + self.start_ts = None + self.end_ts = None - Args: - source_camera: Name of the source camera to replay - start_ts: Start timestamp - end_ts: End timestamp - frigate_config: Current Frigate configuration - config_publisher: Publisher for camera config updates - """ - replay_name = self.replay_camera_name - if replay_name is None: - return - - os.makedirs(REPLAY_DIR, exist_ok=True) - concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt") - clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4") - - try: - recordings = self._query_recordings(source_camera, start_ts, end_ts) - with open(concat_file, "w") as f: - for recording in recordings: - f.write(f"file '{recording.path}'\n") - - ffmpeg_cmd = [ - frigate_config.ffmpeg.ffmpeg_path, - "-hide_banner", - "-y", - "-f", - "concat", - "-safe", - "0", - "-i", - concat_file, - "-c", - "copy", - "-movflags", - "+faststart", - clip_path, - ] - - logger.info( - "Generating replay clip for %s (%.1f - %.1f)", - source_camera, - start_ts, - end_ts, - ) - - def _record_proc(p: sp.Popen) -> None: - self._active_process = p - - def _on_progress(percent: float) -> None: - self.progress_percent = percent - - try: - returncode, stderr = run_ffmpeg_with_progress( - ffmpeg_cmd, - expected_duration_seconds=max(0.0, end_ts - start_ts), - on_progress=_on_progress, - process_started=_record_proc, - use_low_priority=True, - ) - finally: - self._active_process = None - - if returncode != 0: - raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}") - - if not os.path.exists(clip_path): - raise RuntimeError("Clip file was not created") - - with self._lock: - # If stop() ran while we were preparing, bail out cleanly. - if self._state != ReplayState.preparing_clip: - logger.info( - "Replay startup aborted (state=%s); discarding clip", - self._state, - ) - return - self._set_state(ReplayState.starting_camera) - - self._publish_replay_camera( - source_camera, replay_name, clip_path, frigate_config, config_publisher - ) - - with self._lock: - self.clip_path = clip_path - self._set_state(ReplayState.active) - - logger.info("Debug replay started: %s -> %s", source_camera, replay_name) - except Exception as exc: - logger.exception("Debug replay startup failed") - with self._lock: - # If stop() already ran while we were preparing, don't overwrite idle state. - if self._state == ReplayState.idle: - return - self._set_state(ReplayState.error, error_message=str(exc)) - # Drop session pointers so the next /start is allowed. - self.replay_camera_name = None - self.source_camera = None - self.clip_path = None - self.start_ts = None - self.end_ts = None - # Best-effort cleanup of any partial clip on disk. - try: - if os.path.exists(clip_path): - os.remove(clip_path) - except OSError: - pass - finally: - try: - if os.path.exists(concat_file): - os.remove(concat_file) - except OSError: - pass - - def _publish_replay_camera( + def publish_camera( self, source_camera: str, replay_name: str, @@ -303,14 +103,9 @@ class DebugReplayManager: frigate_config: FrigateConfig, config_publisher: CameraConfigUpdatePublisher, ) -> None: - """Build the in-memory camera config and publish the add event. + """Build the in-memory replay camera config and publish the add event. - Args: - source_camera: Name of the source camera - replay_name: Name for the replay camera - clip_path: Path to the replay clip file - frigate_config: Current Frigate configuration - config_publisher: Publisher for camera config updates + Called by the job runner during the ``starting_camera`` phase. """ source_config = frigate_config.cameras[source_camera] camera_dict = self._build_camera_config_dict( @@ -339,64 +134,35 @@ class DebugReplayManager: frigate_config: FrigateConfig, config_publisher: CameraConfigUpdatePublisher, ) -> None: - """Stop the active replay session and clean up all artifacts. + """Cancel any in-flight startup job and tear down the active session. - Args: - frigate_config: Current Frigate configuration - config_publisher: Publisher for camera config updates + Safe to call when no session is active (no-op with a warning). """ + cancel_debug_replay_job() + wait_for_runner(timeout=2.0) + with self._lock: - self._stop_locked(frigate_config, config_publisher) + if not self.active: + logger.warning("No active replay session to stop") + return - def _stop_locked( - self, - frigate_config: FrigateConfig, - config_publisher: CameraConfigUpdatePublisher, - ) -> None: - if not self.active: - logger.warning("No active replay session to stop") - return + replay_name = self.replay_camera_name - replay_name = self.replay_camera_name - was_preparing = self._state == ReplayState.preparing_clip + # Only publish remove if the camera was actually added to the live + # config (i.e. the runner reached the starting_camera phase). + if replay_name is not None and replay_name in frigate_config.cameras: + config_publisher.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name), + frigate_config.cameras[replay_name], + ) - if was_preparing and self._active_process is not None: - logger.info("Cancelling in-flight replay clip generation") - try: - self._active_process.terminate() - except Exception as exc: - logger.warning("Failed to terminate ffmpeg subprocess: %s", exc) + if replay_name is not None: + self._cleanup_db(replay_name) + self._cleanup_files(replay_name) - # Keep a reference so we can join the worker after we've finished cleanup. - worker = self._worker_thread + self._clear_locked() - # Only publish the remove event if the camera was actually published. - if ( - not was_preparing - and replay_name is not None - and replay_name in frigate_config.cameras - ): - config_publisher.publish_update( - CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name), - frigate_config.cameras[replay_name], - ) - - if replay_name is not None: - self._cleanup_db(replay_name) - self._cleanup_files(replay_name) - - self.replay_camera_name = None - self.source_camera = None - self.clip_path = None - self.start_ts = None - self.end_ts = None - self._set_state(ReplayState.idle) - - logger.info("Debug replay stopped and cleaned up: %s", replay_name) - - # Bounded worker join so the API never hangs. - if worker is not None and worker.is_alive(): - worker.join(timeout=2.0) + logger.info("Debug replay stopped and cleaned up: %s", replay_name) def _build_camera_config_dict( self, @@ -404,16 +170,7 @@ class DebugReplayManager: replay_name: str, clip_path: str, ) -> dict: - """Build a camera config dictionary for the replay camera. - - Args: - source_config: Source camera's CameraConfig - replay_name: Name for the replay camera - clip_path: Path to the replay clip file - - Returns: - Camera config as a dictionary - """ + """Build a camera config dictionary for the replay camera.""" # Extract detect config (exclude computed fields) detect_dict = source_config.detect.model_dump( exclude={"min_initialized", "max_disappeared", "enabled_in_config"} @@ -448,7 +205,6 @@ class DebugReplayManager: zone_dump = zone_config.model_dump( exclude={"contour", "color"}, exclude_defaults=True ) - # Always include required fields zone_dump.setdefault("coordinates", zone_config.coordinates) zones_dict[zone_name] = zone_dump diff --git a/frigate/jobs/debug_replay.py b/frigate/jobs/debug_replay.py new file mode 100644 index 000000000..c3ea28e0b --- /dev/null +++ b/frigate/jobs/debug_replay.py @@ -0,0 +1,384 @@ +"""Debug replay startup job: ffmpeg concat + camera config publish. + +The runner orchestrates the async portion of starting a debug replay +session. The :class:`DebugReplayManager` (in :mod:`frigate.debug_replay`) +owns session presence so the status bar can keep reading a single +``active`` flag from ``/debug_replay/status`` for the entire session +window — which is broader than this job's lifetime. +""" + +import logging +import os +import subprocess as sp +import threading +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Optional + +from frigate.config import FrigateConfig +from frigate.config.camera.updater import CameraConfigUpdatePublisher +from frigate.const import REPLAY_CAMERA_PREFIX, REPLAY_DIR +from frigate.jobs.export import JobStatePublisher +from frigate.jobs.job import Job +from frigate.jobs.manager import job_is_running, set_current_job +from frigate.models import Recordings +from frigate.types import JobStatusTypesEnum +from frigate.util.ffmpeg import run_ffmpeg_with_progress + +if TYPE_CHECKING: + from frigate.debug_replay import DebugReplayManager + +logger = logging.getLogger(__name__) + +# Coalesce frequent ffmpeg progress callbacks so the WS isn't flooded. +PROGRESS_BROADCAST_MIN_INTERVAL = 1.0 + +JOB_TYPE = "debug_replay" + +STEP_PREPARING_CLIP = "preparing_clip" +STEP_STARTING_CAMERA = "starting_camera" + + +_active_runner: Optional["DebugReplayJobRunner"] = None +_runner_lock = threading.Lock() + + +def _set_active_runner(runner: Optional["DebugReplayJobRunner"]) -> None: + global _active_runner + with _runner_lock: + _active_runner = runner + + +def get_active_runner() -> Optional["DebugReplayJobRunner"]: + with _runner_lock: + return _active_runner + + +@dataclass +class DebugReplayJob(Job): + """Job state for a debug replay startup.""" + + job_type: str = JOB_TYPE + source_camera: str = "" + replay_camera_name: str = "" + start_ts: float = 0.0 + end_ts: float = 0.0 + current_step: Optional[str] = None + progress_percent: float = 0.0 + + def to_dict(self) -> dict[str, Any]: + """Whitelisted payload for the job_state WS topic. + + Replay-specific fields land in ``results`` so the frontend's + generic ``Job`` type can be parameterised cleanly. + """ + return { + "id": self.id, + "job_type": self.job_type, + "status": self.status, + "start_time": self.start_time, + "end_time": self.end_time, + "error_message": self.error_message, + "results": { + "current_step": self.current_step, + "progress_percent": self.progress_percent, + "source_camera": self.source_camera, + "replay_camera_name": self.replay_camera_name, + "start_ts": self.start_ts, + "end_ts": self.end_ts, + }, + } + + +def query_recordings(source_camera: str, start_ts: float, end_ts: float): + """Return the Recordings query for the time range. + + Module-level so tests can patch it without instantiating a runner. + """ + return ( + Recordings.select( + Recordings.path, + Recordings.start_time, + Recordings.end_time, + ) + .where( + Recordings.start_time.between(start_ts, end_ts) + | Recordings.end_time.between(start_ts, end_ts) + | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) + ) + .where(Recordings.camera == source_camera) + .order_by(Recordings.start_time.asc()) + ) + + +class DebugReplayJobRunner(threading.Thread): + """Worker thread that drives the startup job to completion. + + Owns the live ffmpeg ``Popen`` reference for cancellation. Cancellation + is two-step (``threading.Event`` + ``proc.terminate()``) so the runner + both knows it should stop and is unblocked from its blocking subprocess + wait. + """ + + def __init__( + self, + job: DebugReplayJob, + frigate_config: FrigateConfig, + config_publisher: CameraConfigUpdatePublisher, + replay_manager: "DebugReplayManager", + publisher: Optional[JobStatePublisher] = None, + ) -> None: + super().__init__(daemon=True, name=f"debug_replay_{job.id}") + self.job = job + self.frigate_config = frigate_config + self.config_publisher = config_publisher + self.replay_manager = replay_manager + self.publisher = publisher if publisher is not None else JobStatePublisher() + self._cancel_event = threading.Event() + self._active_process: sp.Popen | None = None + self._proc_lock = threading.Lock() + self._last_broadcast_monotonic: float = 0.0 + + def cancel(self) -> None: + """Request cancellation. Idempotent.""" + self._cancel_event.set() + with self._proc_lock: + proc = self._active_process + if proc is not None: + try: + proc.terminate() + except Exception as exc: + logger.warning("Failed to terminate ffmpeg subprocess: %s", exc) + + def is_cancelled(self) -> bool: + return self._cancel_event.is_set() + + def _record_proc(self, proc: sp.Popen) -> None: + with self._proc_lock: + self._active_process = proc + # Race: cancel arrived between Popen and _record_proc. + if self._cancel_event.is_set(): + try: + proc.terminate() + except Exception: + pass + + def _broadcast(self, force: bool = False) -> None: + now = time.monotonic() + if ( + not force + and now - self._last_broadcast_monotonic < PROGRESS_BROADCAST_MIN_INTERVAL + ): + return + self._last_broadcast_monotonic = now + + try: + self.publisher.publish(self.job.to_dict()) + except Exception as err: + logger.warning("Publisher raised during job state broadcast: %s", err) + + def run(self) -> None: + replay_name = self.job.replay_camera_name + os.makedirs(REPLAY_DIR, exist_ok=True) + concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt") + clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4") + + self.job.status = JobStatusTypesEnum.running + self.job.start_time = time.time() + self.job.current_step = STEP_PREPARING_CLIP + self._broadcast(force=True) + + try: + recordings = query_recordings( + self.job.source_camera, self.job.start_ts, self.job.end_ts + ) + with open(concat_file, "w") as f: + for recording in recordings: + f.write(f"file '{recording.path}'\n") + + ffmpeg_cmd = [ + self.frigate_config.ffmpeg.ffmpeg_path, + "-hide_banner", + "-y", + "-f", + "concat", + "-safe", + "0", + "-i", + concat_file, + "-c", + "copy", + "-movflags", + "+faststart", + clip_path, + ] + + logger.info( + "Generating replay clip for %s (%.1f - %.1f)", + self.job.source_camera, + self.job.start_ts, + self.job.end_ts, + ) + + def _on_progress(percent: float) -> None: + self.job.progress_percent = percent + self._broadcast() + + try: + returncode, stderr = run_ffmpeg_with_progress( + ffmpeg_cmd, + expected_duration_seconds=max( + 0.0, self.job.end_ts - self.job.start_ts + ), + on_progress=_on_progress, + process_started=self._record_proc, + use_low_priority=True, + ) + finally: + with self._proc_lock: + self._active_process = None + + if self._cancel_event.is_set(): + self._finalize_cancelled(clip_path) + return + + if returncode != 0: + raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}") + + if not os.path.exists(clip_path): + raise RuntimeError("Clip file was not created") + + self.job.current_step = STEP_STARTING_CAMERA + self.job.progress_percent = 100.0 + self._broadcast(force=True) + + if self._cancel_event.is_set(): + self._finalize_cancelled(clip_path) + return + + self.replay_manager.publish_camera( + source_camera=self.job.source_camera, + replay_name=replay_name, + clip_path=clip_path, + frigate_config=self.frigate_config, + config_publisher=self.config_publisher, + ) + self.replay_manager.mark_session_ready(clip_path) + + self.job.status = JobStatusTypesEnum.success + self.job.end_time = time.time() + self._broadcast(force=True) + logger.info( + "Debug replay started: %s -> %s", + self.job.source_camera, + replay_name, + ) + except Exception as exc: + logger.exception("Debug replay startup failed") + self.job.status = JobStatusTypesEnum.failed + self.job.error_message = str(exc) + self.job.end_time = time.time() + self._broadcast(force=True) + self.replay_manager.clear_session() + _remove_silent(clip_path) + finally: + _remove_silent(concat_file) + _set_active_runner(None) + + def _finalize_cancelled(self, clip_path: str) -> None: + logger.info("Debug replay startup cancelled") + self.job.status = JobStatusTypesEnum.cancelled + self.job.end_time = time.time() + self._broadcast(force=True) + # Manager session pointers are cleared by stop() on the API side + # (it already holds the cleanup contract). On any other cancellation + # path, also clear so /start can run again. + self.replay_manager.clear_session() + _remove_silent(clip_path) + + +def _remove_silent(path: str) -> None: + try: + if os.path.exists(path): + os.remove(path) + except OSError: + pass + + +def start_debug_replay_job( + *, + source_camera: str, + start_ts: float, + end_ts: float, + frigate_config: FrigateConfig, + config_publisher: CameraConfigUpdatePublisher, + replay_manager: "DebugReplayManager", +) -> str: + """Validate, create job, start runner. Returns the job id. + + Raises ``ValueError`` for bad params (camera missing, time range + invalid, no recordings) and ``RuntimeError`` if a session is already + active. + """ + if job_is_running(JOB_TYPE) or replay_manager.active: + raise RuntimeError("A replay session is already active") + + if source_camera not in frigate_config.cameras: + raise ValueError(f"Camera '{source_camera}' not found") + + if end_ts <= start_ts: + raise ValueError("End time must be after start time") + + recordings = query_recordings(source_camera, start_ts, end_ts) + if not recordings.count(): + raise ValueError( + f"No recordings found for camera '{source_camera}' in the specified time range" + ) + + replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}" + replay_manager.mark_starting( + source_camera=source_camera, + replay_camera_name=replay_name, + start_ts=start_ts, + end_ts=end_ts, + ) + + job = DebugReplayJob( + source_camera=source_camera, + replay_camera_name=replay_name, + start_ts=start_ts, + end_ts=end_ts, + ) + set_current_job(job) + + runner = DebugReplayJobRunner( + job=job, + frigate_config=frigate_config, + config_publisher=config_publisher, + replay_manager=replay_manager, + ) + _set_active_runner(runner) + runner.start() + + return job.id + + +def cancel_debug_replay_job() -> bool: + """Signal the active runner to cancel. + + Returns True if a runner was signalled, False if no job was active. + """ + runner = get_active_runner() + if runner is None: + return False + runner.cancel() + return True + + +def wait_for_runner(timeout: float = 2.0) -> bool: + """Join the active runner. Returns True if the runner ended in time.""" + runner = get_active_runner() + if runner is None: + return True + runner.join(timeout=timeout) + return not runner.is_alive() diff --git a/web/src/pages/Replay.tsx b/web/src/pages/Replay.tsx index 493bb39a9..698fa7b19 100644 --- a/web/src/pages/Replay.tsx +++ b/web/src/pages/Replay.tsx @@ -44,6 +44,7 @@ import { getTranslatedLabel } from "@/utils/i18n"; import { Card } from "@/components/ui/card"; import { Progress } from "@/components/ui/progress"; import { ObjectType } from "@/types/ws"; +import { useJobStatus } from "@/api/ws"; import WsMessageFeed from "@/components/ws/WsMessageFeed"; import { ConfigSectionTemplate } from "@/components/config-form/sections/ConfigSectionTemplate"; @@ -57,18 +58,8 @@ import { useDocDomain } from "@/hooks/use-doc-domain"; import DebugDrawingLayer from "@/components/overlay/DebugDrawingLayer"; import { IoMdArrowRoundBack } from "react-icons/io"; -type ReplayState = - | "idle" - | "preparing_clip" - | "starting_camera" - | "active" - | "error"; - type DebugReplayStatus = { active: boolean; - state: ReplayState; - progress_percent: number | null; - error_message: string | null; replay_camera: string | null; source_camera: string | null; start_time: number | null; @@ -76,6 +67,15 @@ type DebugReplayStatus = { live_ready: boolean; }; +type DebugReplayJobResults = { + current_step: "preparing_clip" | "starting_camera" | null; + progress_percent: number | null; + source_camera: string | null; + replay_camera_name: string | null; + start_ts: number | null; + end_ts: number | null; +}; + type DebugOptions = { bbox: boolean; timestamp: boolean; @@ -130,6 +130,8 @@ export default function Replay() { } = useSWR("debug_replay/status", { refreshInterval: 1000, }); + const { payload: replayJob } = + useJobStatus("debug_replay"); const [isInitializing, setIsInitializing] = useState(true); // Refresh status immediately on mount to avoid showing "no session" briefly @@ -248,34 +250,17 @@ export default function Replay() { ); } - // No active session - if (!status?.active && status?.state !== "error") { - return ( -
- - - {t("page.noSession")} - -

- {t("page.noSessionDesc")} -

- -
- ); - } - - // Startup error - if (status?.state === "error") { + // Startup error (job failed). Only show when status.active is also true so + // we don't surface stale failed jobs after a session ended cleanly. + if (replayJob?.status === "failed" && status?.active) { return (
{t("page.startError.title")} - {status.error_message && ( + {replayJob.error_message && (

- {status.error_message} + {replayJob.error_message}

)} +
+ ); + } + + // Startup in progress (job is running). The session is active but the + // replay camera isn't ready yet; show progress / phase from the job. + const startupStep = + replayJob?.status === "running" + ? (replayJob.results?.current_step ?? null) + : null; + if (startupStep === "preparing_clip" || startupStep === "starting_camera") { const phaseTitle = - status.state === "preparing_clip" + startupStep === "preparing_clip" ? t("page.preparingClip") : t("page.startingCamera"); + const progressPercent = replayJob?.results?.progress_percent ?? null; const showProgressBar = - status.state === "preparing_clip" && status.progress_percent != null; + startupStep === "preparing_clip" && progressPercent != null; return (
{showProgressBar ? (
- +
- {Math.round(status.progress_percent ?? 0)}% + {Math.round(progressPercent ?? 0)}%
) : ( @@ -319,7 +325,7 @@ export default function Replay() { {phaseTitle} - {status.state === "preparing_clip" && ( + {startupStep === "preparing_clip" && (

{t("page.preparingClipDesc")}