From 814c497bef828f45df5fff95732584c3cc254df7 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sun, 3 May 2026 15:54:20 -0500 Subject: [PATCH] Use Job infrastructure for Debug Replay (#23099) * use ReplayState enum * extract shared ffmpeg progress helper * make start call non-blocking with worker thread * expose replay state on status endpoint and return 202 from start * cancel in-flight ffmpeg when stop is called during preparation * add replay i18n strings for preparing and error states * show status in replay UI * navigate immediately on 202 from debug replay menus and dialog * remove unused * simplify to use Job infrastructure * tests * cleanup and tweaks * fetch schema * update api spec * formatting * fix e2e test * mypy * clean up * formatting * fix * fix test * don't try to show camera image until status reports ready * simplify loading logic * fix race in latest_frame on debug replay shutdown * remove toast when successfully stopping it gets hidden almost immediately --- docs/static/frigate-api.yaml | 15 +- frigate/api/debug_replay.py | 74 +-- frigate/api/media.py | 8 +- frigate/debug_replay.py | 260 +++------- frigate/jobs/debug_replay.py | 386 +++++++++++++++ frigate/record/export.py | 96 +--- .../test/http_api/test_debug_replay_api.py | 123 +++++ frigate/test/test_debug_replay.py | 242 +++++++++ frigate/test/test_debug_replay_job.py | 460 ++++++++++++++++++ frigate/test/test_export_progress.py | 9 +- frigate/test/test_ffmpeg_progress.py | 111 +++++ frigate/util/ffmpeg.py | 124 ++++- web/e2e/specs/replay.spec.ts | 6 +- web/public/locales/en/views/replay.json | 21 +- .../components/menu/SearchResultActions.tsx | 12 +- .../components/overlay/DebugReplayDialog.tsx | 5 +- .../overlay/MobileReviewSettingsDrawer.tsx | 5 +- web/src/components/timeline/EventMenu.tsx | 12 +- web/src/pages/Replay.tsx | 238 +++++---- 19 files changed, 1767 insertions(+), 440 deletions(-) create mode 100644 frigate/jobs/debug_replay.py create mode 100644 frigate/test/http_api/test_debug_replay_api.py create mode 100644 frigate/test/test_debug_replay.py create mode 100644 frigate/test/test_debug_replay_job.py create mode 100644 frigate/test/test_ffmpeg_progress.py diff --git a/docs/static/frigate-api.yaml b/docs/static/frigate-api.yaml index 60621ff4e..9c4e44051 100644 --- a/docs/static/frigate-api.yaml +++ b/docs/static/frigate-api.yaml @@ -5997,7 +5997,10 @@ paths: tags: - App summary: Start debug replay - description: Start a debug replay session from camera recordings. + description: + Start a debug replay session from camera recordings. Returns + immediately while clip generation runs as a background job; subscribe + to the 'debug_replay' job_state WS topic to track progress. operationId: start_debug_replay_debug_replay_start_post requestBody: required: true @@ -6006,12 +6009,16 @@ paths: schema: $ref: "#/components/schemas/DebugReplayStartBody" responses: - "200": + "202": description: Successful Response content: application/json: schema: $ref: "#/components/schemas/DebugReplayStartResponse" + "400": + description: Invalid camera, time range, or no recordings + "409": + description: A replay session is already active "422": description: Validation Error content: @@ -6272,10 +6279,14 @@ components: replay_camera: type: string title: Replay Camera + job_id: + type: string + title: Job Id type: object required: - success - replay_camera + - job_id title: DebugReplayStartResponse description: Response for starting a debug replay session. DebugReplayStatusResponse: diff --git a/frigate/api/debug_replay.py b/frigate/api/debug_replay.py index 027d4e50c..171bf1b98 100644 --- a/frigate/api/debug_replay.py +++ b/frigate/api/debug_replay.py @@ -10,6 +10,7 @@ from pydantic import BaseModel, Field from frigate.api.auth import require_role from frigate.api.defs.tags import Tags +from frigate.jobs.debug_replay import start_debug_replay_job logger = logging.getLogger(__name__) @@ -29,10 +30,17 @@ class DebugReplayStartResponse(BaseModel): success: bool replay_camera: str + job_id: str class DebugReplayStatusResponse(BaseModel): - """Response for debug replay status.""" + """Response for debug replay status. + + Returns only session-presence fields. Startup progress and error + details flow through the job_state WebSocket topic via the + debug_replay job (see frigate.jobs.debug_replay); the + Replay page subscribes there with useJobStatus("debug_replay"). + """ active: bool replay_camera: str | None = None @@ -51,15 +59,32 @@ class DebugReplayStopResponse(BaseModel): @router.post( "/debug_replay/start", response_model=DebugReplayStartResponse, + status_code=202, + responses={ + 400: {"description": "Invalid camera, time range, or no recordings"}, + 409: {"description": "A replay session is already active"}, + }, dependencies=[Depends(require_role(["admin"]))], summary="Start debug replay", - description="Start a debug replay session from camera recordings.", + description="Start a debug replay session from camera recordings. Returns " + "immediately while clip generation runs as a background job; subscribe " + "to the 'debug_replay' job_state WS topic to track progress.", ) async def start_debug_replay(request: Request, body: DebugReplayStartBody): - """Start a debug replay session.""" + """Start a debug replay session asynchronously.""" replay_manager = request.app.replay_manager - if replay_manager.active: + try: + job_id = await asyncio.to_thread( + start_debug_replay_job, + source_camera=body.camera, + start_ts=body.start_time, + end_ts=body.end_time, + frigate_config=request.app.frigate_config, + config_publisher=request.app.config_publisher, + replay_manager=replay_manager, + ) + except RuntimeError: return JSONResponse( content={ "success": False, @@ -67,38 +92,23 @@ async def start_debug_replay(request: Request, body: DebugReplayStartBody): }, status_code=409, ) - - try: - replay_camera = await asyncio.to_thread( - replay_manager.start, - source_camera=body.camera, - start_ts=body.start_time, - end_ts=body.end_time, - frigate_config=request.app.frigate_config, - config_publisher=request.app.config_publisher, - ) except ValueError: - logger.exception("Invalid parameters for debug replay start request") + logger.exception("Rejected debug replay start request") return JSONResponse( content={ "success": False, - "message": "Invalid debug replay request parameters", + "message": "Invalid debug replay parameters", }, status_code=400, ) - except RuntimeError: - logger.exception("Error while starting debug replay session") - return JSONResponse( - content={ - "success": False, - "message": "An internal error occurred while starting debug replay", - }, - status_code=500, - ) - return DebugReplayStartResponse( - success=True, - replay_camera=replay_camera, + return JSONResponse( + content={ + "success": True, + "replay_camera": replay_manager.replay_camera_name, + "job_id": job_id, + }, + status_code=202, ) @@ -118,12 +128,16 @@ def get_debug_replay_status(request: Request): if replay_manager.active and replay_camera: frame_processor = request.app.detected_frames_processor - frame = frame_processor.get_current_frame(replay_camera) + frame = ( + frame_processor.get_current_frame(replay_camera) + if frame_processor is not None + else None + ) if frame is not None: frame_time = frame_processor.get_current_frame_time(replay_camera) camera_config = request.app.frigate_config.cameras.get(replay_camera) - retry_interval = 10 + retry_interval = 10.0 if camera_config is not None: retry_interval = float(camera_config.ffmpeg.retry_interval or 10) diff --git a/frigate/api/media.py b/frigate/api/media.py index 69f0b8372..c8285eda1 100644 --- a/frigate/api/media.py +++ b/frigate/api/media.py @@ -174,12 +174,10 @@ async def latest_frame( } quality_params = get_image_quality_params(extension.value, params.quality) - if camera_name in request.app.frigate_config.cameras: + camera_config = request.app.frigate_config.cameras.get(camera_name) + if camera_config is not None: frame = frame_processor.get_current_frame(camera_name, draw_options) - retry_interval = float( - request.app.frigate_config.cameras.get(camera_name).ffmpeg.retry_interval - or 10 - ) + retry_interval = float(camera_config.ffmpeg.retry_interval or 10) is_offline = False if frame is None or datetime.now().timestamp() > ( diff --git a/frigate/debug_replay.py b/frigate/debug_replay.py index 15ca3777a..ac04090e4 100644 --- a/frigate/debug_replay.py +++ b/frigate/debug_replay.py @@ -1,9 +1,13 @@ -"""Debug replay camera management for replaying recordings with detection overlays.""" +"""Debug replay camera management for replaying recordings with detection overlays. + +The startup work (ffmpeg concat + camera config publish) lives in +frigate.jobs.debug_replay. This module owns only session presence +(active), session metadata, and post-session cleanup. +""" import logging import os import shutil -import subprocess as sp import threading from ruamel.yaml import YAML @@ -21,7 +25,7 @@ from frigate.const import ( REPLAY_DIR, THUMB_DIR, ) -from frigate.models import Recordings +from frigate.jobs.debug_replay import cancel_debug_replay_job, wait_for_runner from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files from frigate.util.config import find_config_file @@ -29,7 +33,14 @@ logger = logging.getLogger(__name__) class DebugReplayManager: - """Manages a single debug replay session.""" + """Owns the lifecycle pointers for a single debug replay session. + + A session exists from the moment mark_starting is called (synchronously, + inside the API handler) until clear_session runs (on success cleanup, + failure, or stop). The active property is the source of truth that the + status bar consumes — broader than the startup job, which only covers the + preparing_clip / starting_camera window. + """ def __init__(self) -> None: self._lock = threading.Lock() @@ -41,144 +52,66 @@ class DebugReplayManager: @property def active(self) -> bool: - """Whether a replay session is currently active.""" + """True from mark_starting until clear_session.""" return self.replay_camera_name is not None - def start( + def mark_starting( self, source_camera: str, + replay_camera_name: str, start_ts: float, end_ts: float, - frigate_config: FrigateConfig, - config_publisher: CameraConfigUpdatePublisher, - ) -> str: - """Start a debug replay session. + ) -> None: + """Synchronously claim the session before the job runner starts. - Args: - source_camera: Name of the source camera to replay - start_ts: Start timestamp - end_ts: End timestamp - frigate_config: Current Frigate configuration - config_publisher: Publisher for camera config updates - - Returns: - The replay camera name - - Raises: - ValueError: If a session is already active or parameters are invalid - RuntimeError: If clip generation fails + Called inside the API handler so the status bar sees active=True + immediately, before the worker thread does any ffmpeg work. """ with self._lock: - return self._start_locked( - source_camera, start_ts, end_ts, frigate_config, config_publisher - ) + self.replay_camera_name = replay_camera_name + self.source_camera = source_camera + self.start_ts = start_ts + self.end_ts = end_ts + self.clip_path = None - def _start_locked( + def mark_session_ready(self, clip_path: str) -> None: + """Record the on-disk clip path after the camera has been published.""" + with self._lock: + self.clip_path = clip_path + + def clear_session(self) -> None: + """Reset session pointers without publishing camera removal. + + Used by the job runner on failure paths. stop() does the camera + teardown plus this clear in one step. + """ + with self._lock: + self._clear_locked() + + def _clear_locked(self) -> None: + self.replay_camera_name = None + self.source_camera = None + self.clip_path = None + self.start_ts = None + self.end_ts = None + + def publish_camera( self, source_camera: str, - start_ts: float, - end_ts: float, + replay_name: str, + clip_path: str, frigate_config: FrigateConfig, config_publisher: CameraConfigUpdatePublisher, - ) -> str: - if self.active: - raise ValueError("A replay session is already active") + ) -> None: + """Build the in-memory replay camera config and publish the add event. - if source_camera not in frigate_config.cameras: - raise ValueError(f"Camera '{source_camera}' not found") - - if end_ts <= start_ts: - raise ValueError("End time must be after start time") - - # Query recordings for the source camera in the time range - recordings = ( - Recordings.select( - Recordings.path, - Recordings.start_time, - Recordings.end_time, - ) - .where( - Recordings.start_time.between(start_ts, end_ts) - | Recordings.end_time.between(start_ts, end_ts) - | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) - ) - .where(Recordings.camera == source_camera) - .order_by(Recordings.start_time.asc()) - ) - - if not recordings.count(): - raise ValueError( - f"No recordings found for camera '{source_camera}' in the specified time range" - ) - - # Create replay directory - os.makedirs(REPLAY_DIR, exist_ok=True) - - # Generate replay camera name - replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}" - - # Build concat file for ffmpeg - concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt") - clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4") - - with open(concat_file, "w") as f: - for recording in recordings: - f.write(f"file '{recording.path}'\n") - - # Concatenate recordings into a single clip with -c copy (fast) - ffmpeg_cmd = [ - frigate_config.ffmpeg.ffmpeg_path, - "-hide_banner", - "-y", - "-f", - "concat", - "-safe", - "0", - "-i", - concat_file, - "-c", - "copy", - "-movflags", - "+faststart", - clip_path, - ] - - logger.info( - "Generating replay clip for %s (%.1f - %.1f)", - source_camera, - start_ts, - end_ts, - ) - - try: - result = sp.run( - ffmpeg_cmd, - capture_output=True, - text=True, - timeout=120, - ) - if result.returncode != 0: - logger.error("FFmpeg error: %s", result.stderr) - raise RuntimeError( - f"Failed to generate replay clip: {result.stderr[-500:]}" - ) - except sp.TimeoutExpired: - raise RuntimeError("Clip generation timed out") - finally: - # Clean up concat file - if os.path.exists(concat_file): - os.remove(concat_file) - - if not os.path.exists(clip_path): - raise RuntimeError("Clip file was not created") - - # Build camera config dict for the replay camera + Called by the job runner during the starting_camera phase. + """ source_config = frigate_config.cameras[source_camera] camera_dict = self._build_camera_config_dict( source_config, replay_name, clip_path ) - # Build an in-memory config with the replay camera added config_file = find_config_file() yaml_parser = YAML() with open(config_file, "r") as f: @@ -191,75 +124,48 @@ class DebugReplayManager: try: new_config = FrigateConfig.parse_object(config_data) except Exception as e: - raise RuntimeError(f"Failed to validate replay camera config: {e}") - - # Update the running config + raise RuntimeError(f"Failed to validate replay camera config: {e}") from e frigate_config.cameras[replay_name] = new_config.cameras[replay_name] - # Publish the add event config_publisher.publish_update( CameraConfigUpdateTopic(CameraConfigUpdateEnum.add, replay_name), new_config.cameras[replay_name], ) - # Store session state - self.replay_camera_name = replay_name - self.source_camera = source_camera - self.clip_path = clip_path - self.start_ts = start_ts - self.end_ts = end_ts - - logger.info("Debug replay started: %s -> %s", source_camera, replay_name) - return replay_name - def stop( self, frigate_config: FrigateConfig, config_publisher: CameraConfigUpdatePublisher, ) -> None: - """Stop the active replay session and clean up all artifacts. + """Cancel any in-flight startup job and tear down the active session. - Args: - frigate_config: Current Frigate configuration - config_publisher: Publisher for camera config updates + Safe to call when no session is active (no-op with a warning). """ + cancel_debug_replay_job() + wait_for_runner(timeout=2.0) + with self._lock: - self._stop_locked(frigate_config, config_publisher) + if not self.active: + logger.warning("No active replay session to stop") + return - def _stop_locked( - self, - frigate_config: FrigateConfig, - config_publisher: CameraConfigUpdatePublisher, - ) -> None: - if not self.active: - logger.warning("No active replay session to stop") - return + replay_name = self.replay_camera_name - replay_name = self.replay_camera_name + # Only publish remove if the camera was actually added to the live + # config (i.e. the runner reached the starting_camera phase). + if replay_name is not None and replay_name in frigate_config.cameras: + config_publisher.publish_update( + CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name), + frigate_config.cameras[replay_name], + ) - # Publish remove event so subscribers stop and remove from their config - if replay_name in frigate_config.cameras: - config_publisher.publish_update( - CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name), - frigate_config.cameras[replay_name], - ) - # Do NOT pop here — let subscribers handle removal from the shared - # config dict when they process the ZMQ message to avoid race conditions + if replay_name is not None: + self._cleanup_db(replay_name) + self._cleanup_files(replay_name) - # Defensive DB cleanup - self._cleanup_db(replay_name) + self._clear_locked() - # Remove filesystem artifacts - self._cleanup_files(replay_name) - - # Reset state - self.replay_camera_name = None - self.source_camera = None - self.clip_path = None - self.start_ts = None - self.end_ts = None - - logger.info("Debug replay stopped and cleaned up: %s", replay_name) + logger.info("Debug replay stopped and cleaned up: %s", replay_name) def _build_camera_config_dict( self, @@ -267,16 +173,7 @@ class DebugReplayManager: replay_name: str, clip_path: str, ) -> dict: - """Build a camera config dictionary for the replay camera. - - Args: - source_config: Source camera's CameraConfig - replay_name: Name for the replay camera - clip_path: Path to the replay clip file - - Returns: - Camera config as a dictionary - """ + """Build a camera config dictionary for the replay camera.""" # Extract detect config (exclude computed fields) detect_dict = source_config.detect.model_dump( exclude={"min_initialized", "max_disappeared", "enabled_in_config"} @@ -311,7 +208,6 @@ class DebugReplayManager: zone_dump = zone_config.model_dump( exclude={"contour", "color"}, exclude_defaults=True ) - # Always include required fields zone_dump.setdefault("coordinates", zone_config.coordinates) zones_dict[zone_name] = zone_dump diff --git a/frigate/jobs/debug_replay.py b/frigate/jobs/debug_replay.py new file mode 100644 index 000000000..0616c4629 --- /dev/null +++ b/frigate/jobs/debug_replay.py @@ -0,0 +1,386 @@ +"""Debug replay startup job: ffmpeg concat + camera config publish. + +The runner orchestrates the async portion of starting a debug replay +session. The DebugReplayManager (in frigate.debug_replay) owns session +presence so the status bar can keep reading a single `active` flag from +/debug_replay/status for the entire session window — which is broader +than this job's lifetime. +""" + +import logging +import os +import subprocess as sp +import threading +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Optional, cast + +from peewee import ModelSelect + +from frigate.config import FrigateConfig +from frigate.config.camera.updater import CameraConfigUpdatePublisher +from frigate.const import REPLAY_CAMERA_PREFIX, REPLAY_DIR +from frigate.jobs.export import JobStatePublisher +from frigate.jobs.job import Job +from frigate.jobs.manager import job_is_running, set_current_job +from frigate.models import Recordings +from frigate.types import JobStatusTypesEnum +from frigate.util.ffmpeg import run_ffmpeg_with_progress + +if TYPE_CHECKING: + from frigate.debug_replay import DebugReplayManager + +logger = logging.getLogger(__name__) + +# Coalesce frequent ffmpeg progress callbacks so the WS isn't flooded. +PROGRESS_BROADCAST_MIN_INTERVAL = 1.0 + +JOB_TYPE = "debug_replay" + +STEP_PREPARING_CLIP = "preparing_clip" +STEP_STARTING_CAMERA = "starting_camera" + + +_active_runner: Optional["DebugReplayJobRunner"] = None +_runner_lock = threading.Lock() + + +def _set_active_runner(runner: Optional["DebugReplayJobRunner"]) -> None: + global _active_runner + with _runner_lock: + _active_runner = runner + + +def get_active_runner() -> Optional["DebugReplayJobRunner"]: + with _runner_lock: + return _active_runner + + +@dataclass +class DebugReplayJob(Job): + """Job state for a debug replay startup.""" + + job_type: str = JOB_TYPE + source_camera: str = "" + replay_camera_name: str = "" + start_ts: float = 0.0 + end_ts: float = 0.0 + current_step: Optional[str] = None + progress_percent: float = 0.0 + + def to_dict(self) -> dict[str, Any]: + """Whitelisted payload for the job_state WS topic. + + Replay-specific fields land in results so the frontend's + generic Job type can be parameterised cleanly. + """ + return { + "id": self.id, + "job_type": self.job_type, + "status": self.status, + "start_time": self.start_time, + "end_time": self.end_time, + "error_message": self.error_message, + "results": { + "current_step": self.current_step, + "progress_percent": self.progress_percent, + "source_camera": self.source_camera, + "replay_camera_name": self.replay_camera_name, + "start_ts": self.start_ts, + "end_ts": self.end_ts, + }, + } + + +def query_recordings(source_camera: str, start_ts: float, end_ts: float) -> ModelSelect: + """Return the Recordings query for the time range. + + Module-level so tests can patch it without instantiating a runner. + """ + query = ( + Recordings.select( + Recordings.path, + Recordings.start_time, + Recordings.end_time, + ) + .where( + Recordings.start_time.between(start_ts, end_ts) + | Recordings.end_time.between(start_ts, end_ts) + | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) + ) + .where(Recordings.camera == source_camera) + .order_by(Recordings.start_time.asc()) + ) + return cast(ModelSelect, query) + + +class DebugReplayJobRunner(threading.Thread): + """Worker thread that drives the startup job to completion. + + Owns the live ffmpeg Popen reference for cancellation. Cancellation + is two-step (threading.Event + proc.terminate()) so the runner + both knows it should stop and is unblocked from its blocking subprocess + wait. + """ + + def __init__( + self, + job: DebugReplayJob, + frigate_config: FrigateConfig, + config_publisher: CameraConfigUpdatePublisher, + replay_manager: "DebugReplayManager", + publisher: Optional[JobStatePublisher] = None, + ) -> None: + super().__init__(daemon=True, name=f"debug_replay_{job.id}") + self.job = job + self.frigate_config = frigate_config + self.config_publisher = config_publisher + self.replay_manager = replay_manager + self.publisher = publisher if publisher is not None else JobStatePublisher() + self._cancel_event = threading.Event() + self._active_process: sp.Popen | None = None + self._proc_lock = threading.Lock() + self._last_broadcast_monotonic: float = 0.0 + + def cancel(self) -> None: + """Request cancellation. Idempotent.""" + self._cancel_event.set() + with self._proc_lock: + proc = self._active_process + if proc is not None: + try: + proc.terminate() + except Exception as exc: + logger.warning("Failed to terminate ffmpeg subprocess: %s", exc) + + def is_cancelled(self) -> bool: + return self._cancel_event.is_set() + + def _record_proc(self, proc: sp.Popen) -> None: + with self._proc_lock: + self._active_process = proc + # Race: cancel arrived between Popen and _record_proc. + if self._cancel_event.is_set(): + try: + proc.terminate() + except Exception: + pass + + def _broadcast(self, force: bool = False) -> None: + now = time.monotonic() + if ( + not force + and now - self._last_broadcast_monotonic < PROGRESS_BROADCAST_MIN_INTERVAL + ): + return + self._last_broadcast_monotonic = now + + try: + self.publisher.publish(self.job.to_dict()) + except Exception as err: + logger.warning("Publisher raised during job state broadcast: %s", err) + + def run(self) -> None: + replay_name = self.job.replay_camera_name + os.makedirs(REPLAY_DIR, exist_ok=True) + concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt") + clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4") + + self.job.status = JobStatusTypesEnum.running + self.job.start_time = time.time() + self.job.current_step = STEP_PREPARING_CLIP + self._broadcast(force=True) + + try: + recordings = query_recordings( + self.job.source_camera, self.job.start_ts, self.job.end_ts + ) + with open(concat_file, "w") as f: + for recording in recordings: + f.write(f"file '{recording.path}'\n") + + ffmpeg_cmd = [ + self.frigate_config.ffmpeg.ffmpeg_path, + "-hide_banner", + "-y", + "-f", + "concat", + "-safe", + "0", + "-i", + concat_file, + "-c", + "copy", + "-movflags", + "+faststart", + clip_path, + ] + + logger.info( + "Generating replay clip for %s (%.1f - %.1f)", + self.job.source_camera, + self.job.start_ts, + self.job.end_ts, + ) + + def _on_progress(percent: float) -> None: + self.job.progress_percent = percent + self._broadcast() + + try: + returncode, stderr = run_ffmpeg_with_progress( + ffmpeg_cmd, + expected_duration_seconds=max( + 0.0, self.job.end_ts - self.job.start_ts + ), + on_progress=_on_progress, + process_started=self._record_proc, + use_low_priority=True, + ) + finally: + with self._proc_lock: + self._active_process = None + + if self._cancel_event.is_set(): + self._finalize_cancelled(clip_path) + return + + if returncode != 0: + raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}") + + if not os.path.exists(clip_path): + raise RuntimeError("Clip file was not created") + + self.job.current_step = STEP_STARTING_CAMERA + self.job.progress_percent = 100.0 + self._broadcast(force=True) + + if self._cancel_event.is_set(): + self._finalize_cancelled(clip_path) + return + + self.replay_manager.publish_camera( + source_camera=self.job.source_camera, + replay_name=replay_name, + clip_path=clip_path, + frigate_config=self.frigate_config, + config_publisher=self.config_publisher, + ) + self.replay_manager.mark_session_ready(clip_path) + + self.job.status = JobStatusTypesEnum.success + self.job.end_time = time.time() + self._broadcast(force=True) + logger.info( + "Debug replay started: %s -> %s", + self.job.source_camera, + replay_name, + ) + except Exception as exc: + logger.exception("Debug replay startup failed") + self.job.status = JobStatusTypesEnum.failed + self.job.error_message = str(exc) + self.job.end_time = time.time() + self._broadcast(force=True) + self.replay_manager.clear_session() + _remove_silent(clip_path) + finally: + _remove_silent(concat_file) + _set_active_runner(None) + + def _finalize_cancelled(self, clip_path: str) -> None: + logger.info("Debug replay startup cancelled") + self.job.status = JobStatusTypesEnum.cancelled + self.job.end_time = time.time() + self._broadcast(force=True) + # The caller of cancel_debug_replay_job (DebugReplayManager.stop) owns + # session cleanup — db rows, filesystem artifacts, clear_session. We + # only clean up the partial concat output we created. + _remove_silent(clip_path) + + +def _remove_silent(path: str) -> None: + try: + if os.path.exists(path): + os.remove(path) + except OSError: + pass + + +def start_debug_replay_job( + *, + source_camera: str, + start_ts: float, + end_ts: float, + frigate_config: FrigateConfig, + config_publisher: CameraConfigUpdatePublisher, + replay_manager: "DebugReplayManager", +) -> str: + """Validate, create job, start runner. Returns the job id. + + Raises ValueError for bad params (camera missing, time range + invalid, no recordings) and RuntimeError if a session is already + active. + """ + if job_is_running(JOB_TYPE) or replay_manager.active: + raise RuntimeError("A replay session is already active") + + if source_camera not in frigate_config.cameras: + raise ValueError(f"Camera '{source_camera}' not found") + + if end_ts <= start_ts: + raise ValueError("End time must be after start time") + + recordings = query_recordings(source_camera, start_ts, end_ts) + if not recordings.count(): + raise ValueError( + f"No recordings found for camera '{source_camera}' in the specified time range" + ) + + replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}" + replay_manager.mark_starting( + source_camera=source_camera, + replay_camera_name=replay_name, + start_ts=start_ts, + end_ts=end_ts, + ) + + job = DebugReplayJob( + source_camera=source_camera, + replay_camera_name=replay_name, + start_ts=start_ts, + end_ts=end_ts, + ) + set_current_job(job) + + runner = DebugReplayJobRunner( + job=job, + frigate_config=frigate_config, + config_publisher=config_publisher, + replay_manager=replay_manager, + ) + _set_active_runner(runner) + runner.start() + + return job.id + + +def cancel_debug_replay_job() -> bool: + """Signal the active runner to cancel. + + Returns True if a runner was signalled, False if no job was active. + """ + runner = get_active_runner() + if runner is None: + return False + runner.cancel() + return True + + +def wait_for_runner(timeout: float = 2.0) -> bool: + """Join the active runner. Returns True if the runner ended in time.""" + runner = get_active_runner() + if runner is None: + return True + runner.join(timeout=timeout) + return not runner.is_alive() diff --git a/frigate/record/export.py b/frigate/record/export.py index cf1506c37..ef2fdc810 100644 --- a/frigate/record/export.py +++ b/frigate/record/export.py @@ -23,13 +23,13 @@ from frigate.const import ( EXPORT_DIR, MAX_PLAYLIST_SECONDS, PREVIEW_FRAME_TYPE, - PROCESS_PRIORITY_LOW, ) from frigate.ffmpeg_presets import ( EncodeTypeEnum, parse_preset_hardware_acceleration_encode, ) from frigate.models import Export, Previews, Recordings, ReviewSegment +from frigate.util.ffmpeg import run_ffmpeg_with_progress from frigate.util.time import is_current_hour logger = logging.getLogger(__name__) @@ -243,107 +243,29 @@ class RecordingExporter(threading.Thread): return total - def _inject_progress_flags(self, ffmpeg_cmd: list[str]) -> list[str]: - """Insert FFmpeg progress reporting flags before the output path. - - ``-progress pipe:2`` writes structured key=value lines to stderr, - ``-nostats`` suppresses the noisy default stats output. - """ - if not ffmpeg_cmd: - return ffmpeg_cmd - return ffmpeg_cmd[:-1] + ["-progress", "pipe:2", "-nostats", ffmpeg_cmd[-1]] - def _run_ffmpeg_with_progress( self, ffmpeg_cmd: list[str], playlist_lines: str | list[str], step: str = "encoding", ) -> tuple[int, str]: - """Run an FFmpeg export command, parsing progress events from stderr. + """Delegate to the shared helper, mapping percent → (step, percent). - Returns ``(returncode, captured_stderr)``. Stdout is left attached to - the parent process so we don't have to drain it (and risk a deadlock - if the buffer fills). Progress percent is computed against the - expected output duration; values are clamped to [0, 100] inside - :py:meth:`_emit_progress`. + Returns ``(returncode, captured_stderr)``. """ - cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + self._inject_progress_flags( - ffmpeg_cmd - ) - if isinstance(playlist_lines, list): stdin_payload = "\n".join(playlist_lines) else: stdin_payload = playlist_lines - expected_duration = self._expected_output_duration_seconds() - - self._emit_progress(step, 0.0) - - proc = sp.Popen( - cmd, - stdin=sp.PIPE, - stderr=sp.PIPE, - text=True, - encoding="ascii", - errors="replace", + return run_ffmpeg_with_progress( + ffmpeg_cmd, + expected_duration_seconds=self._expected_output_duration_seconds(), + on_progress=lambda percent: self._emit_progress(step, percent), + stdin_payload=stdin_payload, + use_low_priority=True, ) - assert proc.stdin is not None - assert proc.stderr is not None - - try: - proc.stdin.write(stdin_payload) - except (BrokenPipeError, OSError): - # FFmpeg may have rejected the input early; still wait for it - # to terminate so the returncode is meaningful. - pass - finally: - try: - proc.stdin.close() - except (BrokenPipeError, OSError): - pass - - captured: list[str] = [] - - try: - for raw_line in proc.stderr: - captured.append(raw_line) - line = raw_line.strip() - - if not line: - continue - - if line.startswith("out_time_us="): - if expected_duration <= 0: - continue - try: - out_time_us = int(line.split("=", 1)[1]) - except (ValueError, IndexError): - continue - if out_time_us < 0: - continue - out_seconds = out_time_us / 1_000_000.0 - percent = (out_seconds / expected_duration) * 100.0 - self._emit_progress(step, percent) - elif line == "progress=end": - self._emit_progress(step, 100.0) - break - except Exception: - logger.exception("Failed reading FFmpeg progress for %s", self.export_id) - - proc.wait() - - # Drain any remaining stderr so callers can log it on failure. - try: - remaining = proc.stderr.read() - if remaining: - captured.append(remaining) - except Exception: - pass - - return proc.returncode, "".join(captured) - def get_datetime_from_timestamp(self, timestamp: int) -> str: # return in iso format using the configured ui.timezone when set, # so the auto-generated export name reflects local time rather diff --git a/frigate/test/http_api/test_debug_replay_api.py b/frigate/test/http_api/test_debug_replay_api.py new file mode 100644 index 000000000..45c2c5478 --- /dev/null +++ b/frigate/test/http_api/test_debug_replay_api.py @@ -0,0 +1,123 @@ +"""Tests for /debug_replay API endpoints.""" + +from unittest.mock import patch + +from frigate.models import Event, Recordings, ReviewSegment +from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp + + +class TestDebugReplayAPI(BaseTestHttp): + def setUp(self): + super().setUp([Event, Recordings, ReviewSegment]) + self.app = self.create_app() + + def test_start_returns_202_with_job_id(self): + # Stub the factory to skip validation/threading and just record the + # name on the manager the way the real factory's mark_starting would. + def fake_start(**kwargs): + kwargs["replay_manager"].mark_starting( + source_camera=kwargs["source_camera"], + replay_camera_name="_replay_front", + start_ts=kwargs["start_ts"], + end_ts=kwargs["end_ts"], + ) + return "job-1234" + + with patch( + "frigate.api.debug_replay.start_debug_replay_job", + side_effect=fake_start, + ): + with AuthTestClient(self.app) as client: + resp = client.post( + "/debug_replay/start", + json={ + "camera": "front", + "start_time": 100, + "end_time": 200, + }, + ) + + self.assertEqual(resp.status_code, 202) + body = resp.json() + self.assertTrue(body["success"]) + self.assertEqual(body["job_id"], "job-1234") + self.assertEqual(body["replay_camera"], "_replay_front") + + def test_start_returns_400_on_validation_error(self): + with patch( + "frigate.api.debug_replay.start_debug_replay_job", + side_effect=ValueError("Camera 'missing' not found"), + ): + with AuthTestClient(self.app) as client: + resp = client.post( + "/debug_replay/start", + json={ + "camera": "missing", + "start_time": 100, + "end_time": 200, + }, + ) + + self.assertEqual(resp.status_code, 400) + body = resp.json() + self.assertFalse(body["success"]) + # Message is hard-coded so we don't echo exception text back to clients + # (CodeQL: information exposure through an exception). + self.assertEqual(body["message"], "Invalid debug replay parameters") + + def test_start_returns_409_when_session_already_active(self): + with patch( + "frigate.api.debug_replay.start_debug_replay_job", + side_effect=RuntimeError("A replay session is already active"), + ): + with AuthTestClient(self.app) as client: + resp = client.post( + "/debug_replay/start", + json={ + "camera": "front", + "start_time": 100, + "end_time": 200, + }, + ) + + self.assertEqual(resp.status_code, 409) + body = resp.json() + self.assertFalse(body["success"]) + + def test_status_inactive_when_no_session(self): + with AuthTestClient(self.app) as client: + resp = client.get("/debug_replay/status") + + self.assertEqual(resp.status_code, 200) + body = resp.json() + self.assertFalse(body["active"]) + self.assertIsNone(body["replay_camera"]) + self.assertIsNone(body["source_camera"]) + self.assertIsNone(body["start_time"]) + self.assertIsNone(body["end_time"]) + self.assertFalse(body["live_ready"]) + # Make sure deprecated fields are gone + self.assertNotIn("state", body) + self.assertNotIn("progress_percent", body) + self.assertNotIn("error_message", body) + + def test_status_active_after_mark_starting(self): + manager = self.app.replay_manager + manager.mark_starting( + source_camera="front", + replay_camera_name="_replay_front", + start_ts=100.0, + end_ts=200.0, + ) + + with AuthTestClient(self.app) as client: + resp = client.get("/debug_replay/status") + + self.assertEqual(resp.status_code, 200) + body = resp.json() + self.assertTrue(body["active"]) + self.assertEqual(body["replay_camera"], "_replay_front") + self.assertEqual(body["source_camera"], "front") + self.assertEqual(body["start_time"], 100.0) + self.assertEqual(body["end_time"], 200.0) + self.assertFalse(body["live_ready"]) diff --git a/frigate/test/test_debug_replay.py b/frigate/test/test_debug_replay.py new file mode 100644 index 000000000..e7f9df42d --- /dev/null +++ b/frigate/test/test_debug_replay.py @@ -0,0 +1,242 @@ +"""Tests for the simplified DebugReplayManager. + +Startup orchestration lives in ``frigate.jobs.debug_replay`` (covered by +``test_debug_replay_job``). The manager owns only session presence and +cleanup. +""" + +import unittest +import unittest.mock +from unittest.mock import MagicMock, patch + + +class TestDebugReplayManagerSession(unittest.TestCase): + def test_inactive_by_default(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + + self.assertFalse(manager.active) + self.assertIsNone(manager.replay_camera_name) + self.assertIsNone(manager.source_camera) + self.assertIsNone(manager.clip_path) + self.assertIsNone(manager.start_ts) + self.assertIsNone(manager.end_ts) + + def test_mark_starting_sets_session_pointers_and_active(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + + manager.mark_starting( + source_camera="front", + replay_camera_name="_replay_front", + start_ts=100.0, + end_ts=200.0, + ) + + self.assertTrue(manager.active) + self.assertEqual(manager.replay_camera_name, "_replay_front") + self.assertEqual(manager.source_camera, "front") + self.assertEqual(manager.start_ts, 100.0) + self.assertEqual(manager.end_ts, 200.0) + self.assertIsNone(manager.clip_path) + + def test_mark_session_ready_sets_clip_path(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + + manager.mark_session_ready(clip_path="/tmp/replay/_replay_front.mp4") + + self.assertEqual(manager.clip_path, "/tmp/replay/_replay_front.mp4") + self.assertTrue(manager.active) + + def test_clear_session_resets_all_pointers(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + manager.mark_session_ready("/tmp/replay/clip.mp4") + + manager.clear_session() + + self.assertFalse(manager.active) + self.assertIsNone(manager.replay_camera_name) + self.assertIsNone(manager.source_camera) + self.assertIsNone(manager.clip_path) + self.assertIsNone(manager.start_ts) + self.assertIsNone(manager.end_ts) + + +class TestDebugReplayManagerStop(unittest.TestCase): + def test_stop_when_inactive_is_a_noop(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + frigate_config = MagicMock() + frigate_config.cameras = {} + publisher = MagicMock() + + # Should not raise; should not publish any events. + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + publisher.publish_update.assert_not_called() + + def test_stop_publishes_remove_when_camera_was_published(self) -> None: + from frigate.config.camera.updater import CameraConfigUpdateEnum + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + manager.mark_session_ready("/tmp/replay/_replay_front.mp4") + + camera_config = MagicMock() + frigate_config = MagicMock() + frigate_config.cameras = {"_replay_front": camera_config} + publisher = MagicMock() + + with ( + patch.object(manager, "_cleanup_db"), + patch.object(manager, "_cleanup_files"), + patch("frigate.debug_replay.cancel_debug_replay_job", return_value=False), + ): + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + # One publish_update call with a remove topic. + self.assertEqual(publisher.publish_update.call_count, 1) + topic_arg = publisher.publish_update.call_args.args[0] + self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.remove) + self.assertFalse(manager.active) + + def test_stop_skips_remove_publish_when_camera_not_in_config(self) -> None: + """Cancellation during preparing_clip: no camera was published yet.""" + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + # clip_path stays None because we cancelled before camera publish. + + frigate_config = MagicMock() + frigate_config.cameras = {} # _replay_front not present + publisher = MagicMock() + + with ( + patch.object(manager, "_cleanup_db"), + patch.object(manager, "_cleanup_files"), + patch("frigate.debug_replay.cancel_debug_replay_job", return_value=True), + ): + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + publisher.publish_update.assert_not_called() + self.assertFalse(manager.active) + + def test_stop_calls_cancel_debug_replay_job(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + + frigate_config = MagicMock() + frigate_config.cameras = {} + publisher = MagicMock() + + with ( + patch.object(manager, "_cleanup_db"), + patch.object(manager, "_cleanup_files"), + patch( + "frigate.debug_replay.cancel_debug_replay_job", + return_value=True, + ) as mock_cancel, + ): + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + mock_cancel.assert_called_once() + + +class TestDebugReplayManagerPublishCamera(unittest.TestCase): + def test_publish_camera_invokes_publisher_with_add_topic(self) -> None: + from frigate.config.camera.updater import CameraConfigUpdateEnum + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + + source_config = MagicMock() + new_camera_config = MagicMock() + frigate_config = MagicMock() + frigate_config.cameras = {"front": source_config} + publisher = MagicMock() + + with ( + patch.object( + manager, + "_build_camera_config_dict", + return_value={"enabled": True}, + ), + patch("frigate.debug_replay.find_config_file", return_value="/cfg.yml"), + patch("frigate.debug_replay.YAML") as yaml_cls, + patch("frigate.debug_replay.FrigateConfig.parse_object") as parse_object, + patch("builtins.open", unittest.mock.mock_open(read_data="cameras:\n")), + ): + yaml_instance = yaml_cls.return_value + yaml_instance.load.return_value = {"cameras": {}} + parsed = MagicMock() + parsed.cameras = {"_replay_front": new_camera_config} + parse_object.return_value = parsed + + manager.publish_camera( + source_camera="front", + replay_name="_replay_front", + clip_path="/tmp/clip.mp4", + frigate_config=frigate_config, + config_publisher=publisher, + ) + + # Camera registered into the live config dict + self.assertIn("_replay_front", frigate_config.cameras) + # Publisher invoked with an add topic + self.assertEqual(publisher.publish_update.call_count, 1) + topic_arg = publisher.publish_update.call_args.args[0] + self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.add) + + def test_publish_camera_wraps_parse_failure_in_runtime_error(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + frigate_config = MagicMock() + frigate_config.cameras = {"front": MagicMock()} + publisher = MagicMock() + + with ( + patch.object( + manager, + "_build_camera_config_dict", + return_value={"enabled": True}, + ), + patch("frigate.debug_replay.find_config_file", return_value="/cfg.yml"), + patch("frigate.debug_replay.YAML") as yaml_cls, + patch( + "frigate.debug_replay.FrigateConfig.parse_object", + side_effect=ValueError("zone foo has invalid coordinates"), + ), + patch("builtins.open", unittest.mock.mock_open(read_data="cameras:\n")), + ): + yaml_cls.return_value.load.return_value = {"cameras": {}} + + with self.assertRaises(RuntimeError) as ctx: + manager.publish_camera( + source_camera="front", + replay_name="_replay_front", + clip_path="/tmp/clip.mp4", + frigate_config=frigate_config, + config_publisher=publisher, + ) + + self.assertIn("replay camera config", str(ctx.exception)) + self.assertIn("invalid coordinates", str(ctx.exception)) + publisher.publish_update.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/frigate/test/test_debug_replay_job.py b/frigate/test/test_debug_replay_job.py new file mode 100644 index 000000000..60997564f --- /dev/null +++ b/frigate/test/test_debug_replay_job.py @@ -0,0 +1,460 @@ +"""Tests for the debug replay job runner and factory.""" + +import threading +import time +import unittest +import unittest.mock +from unittest.mock import MagicMock, patch + +from frigate.debug_replay import DebugReplayManager +from frigate.jobs.debug_replay import ( + DebugReplayJob, + cancel_debug_replay_job, + get_active_runner, + start_debug_replay_job, +) +from frigate.jobs.export import JobStatePublisher +from frigate.jobs.manager import _completed_jobs, _current_jobs +from frigate.types import JobStatusTypesEnum + + +def _reset_job_manager() -> None: + """Clear the global job manager state between tests.""" + _current_jobs.clear() + _completed_jobs.clear() + + +def _patch_publisher(test_case: unittest.TestCase) -> None: + """Replace JobStatePublisher.publish with a no-op to avoid hanging on IPC.""" + publisher_patch = patch.object( + JobStatePublisher, "publish", lambda self, payload: None + ) + publisher_patch.start() + test_case.addCleanup(publisher_patch.stop) + + +class TestDebugReplayJob(unittest.TestCase): + def test_default_fields(self) -> None: + job = DebugReplayJob() + + self.assertEqual(job.job_type, "debug_replay") + self.assertEqual(job.status, JobStatusTypesEnum.queued) + self.assertIsNone(job.current_step) + self.assertEqual(job.progress_percent, 0.0) + + def test_to_dict_whitelist(self) -> None: + job = DebugReplayJob( + source_camera="front", + replay_camera_name="_replay_front", + start_ts=100.0, + end_ts=200.0, + ) + job.current_step = "preparing_clip" + job.progress_percent = 42.5 + + payload = job.to_dict() + + # Top-level matches the standard Job shape. + for key in ( + "id", + "job_type", + "status", + "start_time", + "end_time", + "error_message", + "results", + ): + self.assertIn(key, payload, f"missing top-level field: {key}") + + results = payload["results"] + self.assertEqual(results["source_camera"], "front") + self.assertEqual(results["replay_camera_name"], "_replay_front") + self.assertEqual(results["current_step"], "preparing_clip") + self.assertEqual(results["progress_percent"], 42.5) + self.assertEqual(results["start_ts"], 100.0) + self.assertEqual(results["end_ts"], 200.0) + + +class TestStartDebugReplayJob(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def test_rejects_unknown_camera(self) -> None: + with self.assertRaises(ValueError): + start_debug_replay_job( + source_camera="missing", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + def test_rejects_invalid_time_range(self) -> None: + with self.assertRaises(ValueError): + start_debug_replay_job( + source_camera="front", + start_ts=200.0, + end_ts=100.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + def test_rejects_when_no_recordings(self) -> None: + empty_qs = MagicMock() + empty_qs.count.return_value = 0 + with patch("frigate.jobs.debug_replay.query_recordings", return_value=empty_qs): + with self.assertRaises(ValueError): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + def test_returns_job_id_and_marks_session_starting(self) -> None: + block = threading.Event() + + def slow_helper(cmd, **kwargs): + block.wait(timeout=5) + return 0, "" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=slow_helper, + ), + patch.object(self.manager, "publish_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + job_id = start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + self.assertIsInstance(job_id, str) + self.assertTrue(self.manager.active) + self.assertEqual(self.manager.replay_camera_name, "_replay_front") + self.assertEqual(self.manager.source_camera, "front") + + block.set() + + def test_rejects_concurrent_calls(self) -> None: + block = threading.Event() + + def slow_helper(cmd, **kwargs): + block.wait(timeout=5) + return 0, "" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=slow_helper, + ), + patch.object(self.manager, "publish_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + with self.assertRaises(RuntimeError): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + block.set() + + +class TestRunnerHappyPath(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def _wait_for(self, predicate, timeout: float = 5.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return False + + def test_progress_callback_updates_job_percent(self) -> None: + captured: list[float] = [] + + def fake_helper(cmd, *, on_progress=None, **kwargs): + on_progress(0.0) + on_progress(50.0) + on_progress(100.0) + return 0, "" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=fake_helper, + ), + patch.object( + self.manager, + "publish_camera", + side_effect=lambda *a, **kw: captured.append("published"), + ), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + self.assertTrue( + self._wait_for(lambda: get_active_runner() is None), + "runner did not finish", + ) + + from frigate.jobs.manager import get_current_job + + job = get_current_job("debug_replay") + self.assertIsNotNone(job) + self.assertEqual(job.status, JobStatusTypesEnum.success) + self.assertEqual(job.progress_percent, 100.0) + self.assertEqual(captured, ["published"]) + # Manager should have been told the session is ready with the clip path. + self.assertIsNotNone(self.manager.clip_path) + + +class TestRunnerFailurePath(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def _wait_for(self, predicate, timeout: float = 5.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return False + + def test_ffmpeg_failure_marks_job_failed_and_clears_session(self) -> None: + def failing_helper(cmd, **kwargs): + return 1, "ffmpeg exploded" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=failing_helper, + ), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("os.remove"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + self.assertTrue( + self._wait_for(lambda: get_active_runner() is None), + "runner did not finish", + ) + + from frigate.jobs.manager import get_current_job + + job = get_current_job("debug_replay") + self.assertIsNotNone(job) + self.assertEqual(job.status, JobStatusTypesEnum.failed) + self.assertIsNotNone(job.error_message) + self.assertIn("ffmpeg", job.error_message.lower()) + # Session cleared so a new /start is allowed + self.assertFalse(self.manager.active) + + +class TestRunnerCancellation(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def _wait_for(self, predicate, timeout: float = 5.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return False + + def test_cancel_terminates_ffmpeg_and_marks_cancelled(self) -> None: + terminated = threading.Event() + fake_proc = MagicMock() + fake_proc.terminate = MagicMock(side_effect=lambda: terminated.set()) + + def fake_helper(cmd, *, process_started=None, **kwargs): + if process_started is not None: + process_started(fake_proc) + terminated.wait(timeout=5) + return -15, "killed" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=fake_helper, + ), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("os.remove"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + # Wait for the runner to register the active process. + self.assertTrue( + self._wait_for( + lambda: ( + get_active_runner() is not None + and get_active_runner()._active_process is fake_proc + ) + ) + ) + + cancelled = cancel_debug_replay_job() + self.assertTrue(cancelled) + self.assertTrue(fake_proc.terminate.called) + + self.assertTrue( + self._wait_for(lambda: get_active_runner() is None), + "runner did not finish", + ) + + from frigate.jobs.manager import get_current_job + + job = get_current_job("debug_replay") + self.assertEqual(job.status, JobStatusTypesEnum.cancelled) + # Runner must not clear the manager session on cancellation — + # that belongs to the caller of cancel_debug_replay_job (stop()). + # If the runner cleared it, stop() would log "no active session" + # and skip its cleanup_db / cleanup_files calls. + self.assertTrue(self.manager.active) + + +if __name__ == "__main__": + unittest.main() diff --git a/frigate/test/test_export_progress.py b/frigate/test/test_export_progress.py index 62883c13a..835cf91b9 100644 --- a/frigate/test/test_export_progress.py +++ b/frigate/test/test_export_progress.py @@ -14,6 +14,7 @@ from frigate.jobs.export import ( ) from frigate.record.export import PlaybackSourceEnum, RecordingExporter from frigate.types import JobStatusTypesEnum +from frigate.util.ffmpeg import inject_progress_flags def _make_exporter( @@ -118,10 +119,9 @@ class TestExpectedOutputDuration(unittest.TestCase): class TestProgressFlagInjection(unittest.TestCase): def test_inserts_before_output_path(self) -> None: - exporter = _make_exporter() cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"] - result = exporter._inject_progress_flags(cmd) + result = inject_progress_flags(cmd) assert result == [ "ffmpeg", @@ -136,8 +136,7 @@ class TestProgressFlagInjection(unittest.TestCase): ] def test_handles_empty_cmd(self) -> None: - exporter = _make_exporter() - assert exporter._inject_progress_flags([]) == [] + assert inject_progress_flags([]) == [] class TestFfmpegProgressParsing(unittest.TestCase): @@ -167,7 +166,7 @@ class TestFfmpegProgressParsing(unittest.TestCase): fake_proc.returncode = 0 fake_proc.wait = MagicMock(return_value=0) - with patch("frigate.record.export.sp.Popen", return_value=fake_proc): + with patch("frigate.util.ffmpeg.sp.Popen", return_value=fake_proc): returncode, _stderr = exporter._run_ffmpeg_with_progress( ["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding" ) diff --git a/frigate/test/test_ffmpeg_progress.py b/frigate/test/test_ffmpeg_progress.py new file mode 100644 index 000000000..521051116 --- /dev/null +++ b/frigate/test/test_ffmpeg_progress.py @@ -0,0 +1,111 @@ +"""Tests for the shared ffmpeg progress helper.""" + +import unittest +from unittest.mock import MagicMock, patch + +from frigate.util.ffmpeg import inject_progress_flags, run_ffmpeg_with_progress + + +class TestInjectProgressFlags(unittest.TestCase): + def test_inserts_flags_before_output_path(self): + cmd = ["ffmpeg", "-i", "in.mp4", "-c", "copy", "out.mp4"] + result = inject_progress_flags(cmd) + self.assertEqual( + result, + [ + "ffmpeg", + "-i", + "in.mp4", + "-c", + "copy", + "-progress", + "pipe:2", + "-nostats", + "out.mp4", + ], + ) + + def test_empty_cmd_returns_empty(self): + self.assertEqual(inject_progress_flags([]), []) + + +class TestRunFfmpegWithProgress(unittest.TestCase): + def _make_fake_proc(self, stderr_lines, returncode=0): + proc = MagicMock() + proc.stderr = iter(stderr_lines) + proc.stdin = MagicMock() + proc.returncode = returncode + proc.wait = MagicMock() + return proc + + def test_emits_percent_from_out_time_us_lines(self): + captured: list[float] = [] + + def on_progress(percent: float) -> None: + captured.append(percent) + + stderr_lines = [ + "out_time_us=1000000\n", + "out_time_us=5000000\n", + "progress=end\n", + ] + proc = self._make_fake_proc(stderr_lines) + proc.stderr = MagicMock() + proc.stderr.__iter__ = lambda self: iter(stderr_lines) + proc.stderr.read = MagicMock(return_value="") + + with patch("subprocess.Popen", return_value=proc): + returncode, _stderr = run_ffmpeg_with_progress( + ["ffmpeg", "-i", "in", "out"], + expected_duration_seconds=10.0, + on_progress=on_progress, + use_low_priority=False, + ) + + self.assertEqual(returncode, 0) + self.assertEqual(len(captured), 4) # initial 0.0 + two parsed + final 100.0 + self.assertAlmostEqual(captured[0], 0.0) + self.assertAlmostEqual(captured[1], 10.0) + self.assertAlmostEqual(captured[2], 50.0) + self.assertAlmostEqual(captured[3], 100.0) + + def test_passes_started_process_to_callback(self): + proc = self._make_fake_proc([]) + proc.stderr = MagicMock() + proc.stderr.__iter__ = lambda self: iter([]) + proc.stderr.read = MagicMock(return_value="") + + seen: list = [] + + with patch("subprocess.Popen", return_value=proc): + run_ffmpeg_with_progress( + ["ffmpeg", "out"], + expected_duration_seconds=1.0, + process_started=lambda p: seen.append(p), + use_low_priority=False, + ) + + self.assertEqual(seen, [proc]) + + def test_clamps_percent_to_0_100(self): + captured: list[float] = [] + + def on_progress(percent: float) -> None: + captured.append(percent) + + stderr_lines = ["out_time_us=999999999999\n"] + proc = self._make_fake_proc(stderr_lines) + proc.stderr = MagicMock() + proc.stderr.__iter__ = lambda self: iter(stderr_lines) + proc.stderr.read = MagicMock(return_value="") + + with patch("subprocess.Popen", return_value=proc): + run_ffmpeg_with_progress( + ["ffmpeg", "out"], + expected_duration_seconds=10.0, + on_progress=on_progress, + use_low_priority=False, + ) + + # initial 0.0 then a clamped reading + self.assertEqual(captured[-1], 100.0) diff --git a/frigate/util/ffmpeg.py b/frigate/util/ffmpeg.py index 9abacd4ed..9f4c5569a 100644 --- a/frigate/util/ffmpeg.py +++ b/frigate/util/ffmpeg.py @@ -2,8 +2,9 @@ import logging import subprocess as sp -from typing import Any +from typing import Any, Callable, Optional +from frigate.const import PROCESS_PRIORITY_LOW from frigate.log import LogPipe @@ -46,3 +47,124 @@ def start_or_restart_ffmpeg( start_new_session=True, ) return process + + +logger = logging.getLogger(__name__) + + +def inject_progress_flags(cmd: list[str]) -> list[str]: + """Insert `-progress pipe:2 -nostats` immediately before the output path. + + `-progress pipe:2` writes structured key=value lines to stderr; + `-nostats` suppresses the noisy default stats output. The output path + is conventionally the last token in an FFmpeg argv. + """ + if not cmd: + return cmd + return cmd[:-1] + ["-progress", "pipe:2", "-nostats", cmd[-1]] + + +def run_ffmpeg_with_progress( + cmd: list[str], + *, + expected_duration_seconds: float, + on_progress: Optional[Callable[[float], None]] = None, + stdin_payload: Optional[str] = None, + process_started: Optional[Callable[[sp.Popen], None]] = None, + use_low_priority: bool = True, +) -> tuple[int, str]: + """Run an ffmpeg command, streaming progress via `-progress pipe:2`. + + Args: + cmd: ffmpeg argv. Output path must be the last token. + expected_duration_seconds: Duration of the expected output clip in + seconds. Used to convert ffmpeg's `out_time_us` into a percent. + on_progress: Optional callback invoked with a percent in [0, 100]. + Called once with 0.0 at start, again on each `out_time_us=` + stderr line, and once with 100.0 on `progress=end`. + stdin_payload: Optional string written to ffmpeg stdin (used by + export for concat playlists). + process_started: Optional callback invoked with the live `Popen` + once spawned — lets callers store the ref for cancellation. + use_low_priority: When True, prepend `nice -n PROCESS_PRIORITY_LOW` + so concat doesn't starve detection. + + Returns: + Tuple of `(returncode, captured_stderr)`. Stdout is left attached + to the parent process to avoid buffer-full deadlocks. + """ + full_cmd = inject_progress_flags(cmd) + if use_low_priority: + full_cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + full_cmd + + def emit(percent: float) -> None: + if on_progress is None: + return + try: + on_progress(max(0.0, min(100.0, percent))) + except Exception: + logger.exception("FFmpeg progress callback failed") + + emit(0.0) + + proc = sp.Popen( + full_cmd, + stdin=sp.PIPE if stdin_payload is not None else None, + stderr=sp.PIPE, + text=True, + encoding="ascii", + errors="replace", + ) + if process_started is not None: + try: + process_started(proc) + except Exception: + logger.exception("FFmpeg process_started callback failed") + + if stdin_payload is not None and proc.stdin is not None: + try: + proc.stdin.write(stdin_payload) + except (BrokenPipeError, OSError): + pass + finally: + try: + proc.stdin.close() + except (BrokenPipeError, OSError): + pass + + captured: list[str] = [] + if proc.stderr is not None: + try: + for raw_line in proc.stderr: + captured.append(raw_line) + line = raw_line.strip() + if not line: + continue + if line.startswith("out_time_us="): + if expected_duration_seconds <= 0: + continue + try: + out_time_us = int(line.split("=", 1)[1]) + except (ValueError, IndexError): + continue + if out_time_us < 0: + continue + out_seconds = out_time_us / 1_000_000.0 + emit((out_seconds / expected_duration_seconds) * 100.0) + elif line == "progress=end": + emit(100.0) + break + except Exception: + logger.exception("Failed reading FFmpeg progress stream") + + proc.wait() + + if proc.stderr is not None: + try: + remaining = proc.stderr.read() + if remaining: + captured.append(remaining) + except Exception: + pass + + return proc.returncode or 0, "".join(captured) diff --git a/web/e2e/specs/replay.spec.ts b/web/e2e/specs/replay.spec.ts index eb19ed57d..c09abf10b 100644 --- a/web/e2e/specs/replay.spec.ts +++ b/web/e2e/specs/replay.spec.ts @@ -31,7 +31,7 @@ test.describe("Replay — no active session @medium", () => { await expect( frigateApp.page.getByRole("heading", { level: 2, - name: /No Active Replay Session/i, + name: /No Active Debug Replay Session/i, }), ).toBeVisible({ timeout: 10_000 }); const goButton = frigateApp.page.getByRole("button", { @@ -48,7 +48,7 @@ test.describe("Replay — no active session @medium", () => { await expect( frigateApp.page.getByRole("heading", { level: 2, - name: /No Active Replay Session/i, + name: /No Active Debug Replay Session/i, }), ).toBeVisible({ timeout: 10_000 }); await frigateApp.page @@ -297,7 +297,7 @@ test.describe("Replay — mobile @medium @mobile", () => { await expect( frigateApp.page.getByRole("heading", { level: 2, - name: /No Active Replay Session/i, + name: /No Active Debug Replay Session/i, }), ).toBeVisible({ timeout: 10_000 }); }); diff --git a/web/public/locales/en/views/replay.json b/web/public/locales/en/views/replay.json index a966626f5..e8f50d7b7 100644 --- a/web/public/locales/en/views/replay.json +++ b/web/public/locales/en/views/replay.json @@ -19,26 +19,31 @@ "startLabel": "Start", "endLabel": "End", "toast": { - "success": "Debug replay started successfully", "error": "Failed to start debug replay: {{error}}", "alreadyActive": "A replay session is already active", - "stopped": "Debug replay stopped", "stopError": "Failed to stop debug replay: {{error}}", "goToReplay": "Go to Replay" } }, "page": { - "noSession": "No Active Replay Session", - "noSessionDesc": "Start a debug replay from the History view by clicking the Debug Replay button in the toolbar.", + "noSession": "No Active Debug Replay Session", + "noSessionDesc": "Start a Debug Replay from History view by clicking the Actions button in the toolbar and choosing Debug Replay.", "goToRecordings": "Go to History", + "preparingClip": "Preparing clip…", + "preparingClipDesc": "Frigate is stitching together recordings for the selected time range. This can take a minute for longer ranges.", + "startingCamera": "Starting Debug Replay…", + "startError": { + "title": "Failed to start Debug Replay", + "back": "Back to History" + }, "sourceCamera": "Source Camera", "replayCamera": "Replay Camera", - "initializingReplay": "Initializing replay...", - "stoppingReplay": "Stopping replay...", + "initializingReplay": "Initializing Debug Replay...", + "stoppingReplay": "Stopping Debug Replay...", "stopReplay": "Stop Replay", "confirmStop": { "title": "Stop Debug Replay?", - "description": "This will stop the replay session and clean up all temporary data. Are you sure?", + "description": "This will stop the session and clean up all temporary data. Are you sure?", "confirm": "Stop Replay", "cancel": "Cancel" }, @@ -49,6 +54,6 @@ "activeTracking": "Active tracking", "noActiveTracking": "No active tracking", "configuration": "Configuration", - "configurationDesc": "Fine tune motion detection and object tracking settings for the debug replay camera. No changes are saved to your Frigate configuration file." + "configurationDesc": "Fine tune motion detection and object tracking settings for the Debug Replay camera. No changes are saved to your Frigate configuration file." } } diff --git a/web/src/components/menu/SearchResultActions.tsx b/web/src/components/menu/SearchResultActions.tsx index 2c9967348..b88e69853 100644 --- a/web/src/components/menu/SearchResultActions.tsx +++ b/web/src/components/menu/SearchResultActions.tsx @@ -90,10 +90,6 @@ export default function SearchResultActions({ const handleDebugReplay = useCallback( (event: SearchResult) => { setIsStarting(true); - const toastId = toast.loading( - t("dialog.starting", { ns: "views/replay" }), - { position: "top-center" }, - ); axios .post("debug_replay/start", { @@ -102,11 +98,7 @@ export default function SearchResultActions({ end_time: event.end_time, }) .then((response) => { - if (response.status === 200) { - toast.success(t("dialog.toast.success", { ns: "views/replay" }), { - id: toastId, - position: "top-center", - }); + if (response.status === 202 || response.status === 200) { navigate("/replay"); } }) @@ -120,7 +112,6 @@ export default function SearchResultActions({ toast.error( t("dialog.toast.alreadyActive", { ns: "views/replay" }), { - id: toastId, position: "top-center", closeButton: true, dismissible: false, @@ -135,7 +126,6 @@ export default function SearchResultActions({ ); } else { toast.error(t("dialog.toast.error", { error: errorMessage }), { - id: toastId, position: "top-center", }); } diff --git a/web/src/components/overlay/DebugReplayDialog.tsx b/web/src/components/overlay/DebugReplayDialog.tsx index 2a9e09d08..2f9a7159a 100644 --- a/web/src/components/overlay/DebugReplayDialog.tsx +++ b/web/src/components/overlay/DebugReplayDialog.tsx @@ -209,10 +209,7 @@ export default function DebugReplayDialog({ end_time: range.before, }) .then((response) => { - if (response.status === 200) { - toast.success(t("dialog.toast.success"), { - position: "top-center", - }); + if (response.status === 202 || response.status === 200) { setMode("none"); setRange(undefined); navigate("/replay"); diff --git a/web/src/components/overlay/MobileReviewSettingsDrawer.tsx b/web/src/components/overlay/MobileReviewSettingsDrawer.tsx index c409e5cfa..bca921072 100644 --- a/web/src/components/overlay/MobileReviewSettingsDrawer.tsx +++ b/web/src/components/overlay/MobileReviewSettingsDrawer.tsx @@ -262,10 +262,7 @@ export default function MobileReviewSettingsDrawer({ end_time: debugReplayRange.before, }); - if (response.status === 200) { - toast.success(t("dialog.toast.success", { ns: "views/replay" }), { - position: "top-center", - }); + if (response.status === 202 || response.status === 200) { setDebugReplayMode("none"); setDebugReplayRange(undefined); setDrawerMode("none"); diff --git a/web/src/components/timeline/EventMenu.tsx b/web/src/components/timeline/EventMenu.tsx index 71aa12bd6..375430c2e 100644 --- a/web/src/components/timeline/EventMenu.tsx +++ b/web/src/components/timeline/EventMenu.tsx @@ -53,10 +53,6 @@ export default function EventMenu({ const handleDebugReplay = useCallback( (event: Event) => { setIsStarting(true); - const toastId = toast.loading( - t("dialog.starting", { ns: "views/replay" }), - { position: "top-center" }, - ); axios .post("debug_replay/start", { @@ -65,11 +61,7 @@ export default function EventMenu({ end_time: event.end_time, }) .then((response) => { - if (response.status === 200) { - toast.success(t("dialog.toast.success", { ns: "views/replay" }), { - id: toastId, - position: "top-center", - }); + if (response.status === 202 || response.status === 200) { navigate("/replay"); } }) @@ -83,7 +75,6 @@ export default function EventMenu({ toast.error( t("dialog.toast.alreadyActive", { ns: "views/replay" }), { - id: toastId, position: "top-center", closeButton: true, dismissible: false, @@ -98,7 +89,6 @@ export default function EventMenu({ ); } else { toast.error(t("dialog.toast.error", { error: errorMessage }), { - id: toastId, position: "top-center", }); } diff --git a/web/src/pages/Replay.tsx b/web/src/pages/Replay.tsx index 2a6ea9ad1..cd73f2ac3 100644 --- a/web/src/pages/Replay.tsx +++ b/web/src/pages/Replay.tsx @@ -42,7 +42,9 @@ import { CameraConfig, FrigateConfig } from "@/types/frigateConfig"; import { getIconForLabel } from "@/utils/iconUtil"; import { getTranslatedLabel } from "@/utils/i18n"; import { Card } from "@/components/ui/card"; +import { Progress } from "@/components/ui/progress"; import { ObjectType } from "@/types/ws"; +import { useJobStatus } from "@/api/ws"; import WsMessageFeed from "@/components/ws/WsMessageFeed"; import { ConfigSectionTemplate } from "@/components/config-form/sections/ConfigSectionTemplate"; @@ -53,6 +55,7 @@ import { isDesktop, isMobile } from "react-device-detect"; import Logo from "@/components/Logo"; import { Separator } from "@/components/ui/separator"; import { useDocDomain } from "@/hooks/use-doc-domain"; +import { useConfigSchema } from "@/hooks/use-config-schema"; import DebugDrawingLayer from "@/components/overlay/DebugDrawingLayer"; import { IoMdArrowRoundBack } from "react-icons/io"; @@ -65,6 +68,15 @@ type DebugReplayStatus = { live_ready: boolean; }; +type DebugReplayJobResults = { + current_step: "preparing_clip" | "starting_camera" | null; + progress_percent: number | null; + source_camera: string | null; + replay_camera_name: string | null; + start_ts: number | null; + end_ts: number | null; +}; + type DebugOptions = { bbox: boolean; timestamp: boolean; @@ -105,8 +117,6 @@ const DEBUG_OPTION_I18N_KEY: Record = { paths: "paths", }; -const REPLAY_INIT_SKELETON_TIMEOUT_MS = 8000; - export default function Replay() { const { t } = useTranslation(["views/replay", "views/settings", "common"]); const navigate = useNavigate(); @@ -119,6 +129,9 @@ export default function Replay() { } = useSWR("debug_replay/status", { refreshInterval: 1000, }); + const { payload: replayJob } = + useJobStatus("debug_replay"); + const configSchema = useConfigSchema(); const [isInitializing, setIsInitializing] = useState(true); // Refresh status immediately on mount to avoid showing "no session" briefly @@ -130,12 +143,6 @@ export default function Replay() { initializeStatus(); }, [refreshStatus]); - useEffect(() => { - if (status?.live_ready) { - setShowReplayInitSkeleton(false); - } - }, [status?.live_ready]); - const [options, setOptions] = useState(DEFAULT_OPTIONS); const [isStopping, setIsStopping] = useState(false); const [configDialogOpen, setConfigDialogOpen] = useState(false); @@ -160,11 +167,7 @@ export default function Replay() { axios .post("debug_replay/stop") .then(() => { - toast.success(t("dialog.toast.stopped"), { - position: "top-center", - }); refreshStatus(); - navigate("/review"); }) .catch((error) => { const errorMessage = @@ -178,7 +181,7 @@ export default function Replay() { .finally(() => { setIsStopping(false); }); - }, [navigate, refreshStatus, t]); + }, [refreshStatus, t]); // Camera activity for the replay camera const { data: config } = useSWR("config", { @@ -191,35 +194,10 @@ export default function Replay() { const { objects } = useCameraActivity(replayCameraConfig); - const [showReplayInitSkeleton, setShowReplayInitSkeleton] = useState(false); - // debug draw const containerRef = useRef(null); const [debugDraw, setDebugDraw] = useState(false); - useEffect(() => { - if (!status?.active || !status.replay_camera) { - setShowReplayInitSkeleton(false); - return; - } - - setShowReplayInitSkeleton(true); - - const timeout = window.setTimeout(() => { - setShowReplayInitSkeleton(false); - }, REPLAY_INIT_SKELETON_TIMEOUT_MS); - - return () => { - window.clearTimeout(timeout); - }; - }, [status?.active, status?.replay_camera]); - - useEffect(() => { - if (status?.live_ready) { - setShowReplayInitSkeleton(false); - } - }, [status?.live_ready]); - // Format time range for display const timeRangeDisplay = useMemo(() => { if (!status?.start_time || !status?.end_time) return ""; @@ -237,8 +215,39 @@ export default function Replay() { ); } - // No active session - if (!status?.active) { + // Startup error (job failed). Only show when status.active is also true so + // we don't surface stale failed jobs after a session ended cleanly. + if (replayJob?.status === "failed" && status?.active) { + return ( +
+ + {t("page.startError.title")} + + {replayJob.error_message && ( +

+ {replayJob.error_message} +

+ )} + +
+ ); + } + + // No active session. Also covers the brief window between the runner + // pushing job.status = "cancelled" via WS and the next SWR refresh + // flipping status.active to false — without this, render falls through + // to the full replay UI and you see a flash of it before stop completes. + if (!status?.active || replayJob?.status === "cancelled") { return (
@@ -255,6 +264,52 @@ export default function Replay() { ); } + // Startup in progress (job is running). The session is active but the + // replay camera isn't ready yet; show progress / phase from the job. + const startupStep = + replayJob?.status === "running" + ? (replayJob.results?.current_step ?? null) + : null; + if (startupStep === "preparing_clip" || startupStep === "starting_camera") { + const phaseTitle = + startupStep === "preparing_clip" + ? t("page.preparingClip") + : t("page.startingCamera"); + const progressPercent = replayJob?.results?.progress_percent ?? null; + const showProgressBar = + startupStep === "preparing_clip" && progressPercent != null; + return ( +
+ {showProgressBar ? ( +
+ +
+ {Math.round(progressPercent ?? 0)}% +
+
+ ) : ( + + )} + + {phaseTitle} + + {startupStep === "preparing_clip" && ( +

+ {t("page.preparingClipDesc")} +

+ )} + +
+ ); + } + return (
@@ -345,27 +400,30 @@ export default function Replay() { ) : ( status.replay_camera && (
- - {debugDraw && ( - - )} - {showReplayInitSkeleton && ( + {status.live_ready ? ( + <> + + {debugDraw && ( + + )} + + ) : (
@@ -595,32 +653,38 @@ export default function Replay() { {t("page.configurationDesc")} -
- - -
+ {configSchema == null ? ( +
+ +
+ ) : ( +
+ + +
+ )}