diff --git a/frigate/debug_replay.py b/frigate/debug_replay.py index db7458ad7..dc43acfef 100644 --- a/frigate/debug_replay.py +++ b/frigate/debug_replay.py @@ -25,6 +25,7 @@ from frigate.const import ( from frigate.models import Recordings from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files from frigate.util.config import find_config_file +from frigate.util.ffmpeg import run_ffmpeg_with_progress logger = logging.getLogger(__name__) @@ -58,6 +59,9 @@ class DebugReplayManager: self.clip_path: str | None = None self.start_ts: float | None = None self.end_ts: float | None = None + self._active_process: sp.Popen | None = None + self._worker_thread: threading.Thread | None = None + self.progress_percent: float | None = None @property def state(self) -> ReplayState: @@ -78,6 +82,8 @@ class DebugReplayManager: """Internal state transition helper. Always pair `error` with an error_message.""" self._state = state self.error_message = error_message if state == ReplayState.error else None + if state in (ReplayState.idle, ReplayState.error): + self.progress_percent = None def start( self, @@ -87,7 +93,10 @@ class DebugReplayManager: frigate_config: FrigateConfig, config_publisher: CameraConfigUpdatePublisher, ) -> str: - """Start a debug replay session. + """Validate inputs and kick off async startup. Returns immediately. + + The clip generation, config build, and camera publish run on a worker + thread. Poll `state` / `error_message` to track progress. Args: source_camera: Name of the source camera to replay @@ -97,36 +106,58 @@ class DebugReplayManager: config_publisher: Publisher for camera config updates Returns: - The replay camera name + The replay camera name (deterministic from source_camera) Raises: ValueError: If a session is already active or parameters are invalid - RuntimeError: If clip generation fails """ with self._lock: - return self._start_locked( - source_camera, start_ts, end_ts, frigate_config, config_publisher - ) + if self.active: + raise ValueError("A replay session is already active") - def _start_locked( - self, - source_camera: str, - start_ts: float, - end_ts: float, - frigate_config: FrigateConfig, - config_publisher: CameraConfigUpdatePublisher, - ) -> str: - if self.active: - raise ValueError("A replay session is already active") + if source_camera not in frigate_config.cameras: + raise ValueError(f"Camera '{source_camera}' not found") - if source_camera not in frigate_config.cameras: - raise ValueError(f"Camera '{source_camera}' not found") + if end_ts <= start_ts: + raise ValueError("End time must be after start time") - if end_ts <= start_ts: - raise ValueError("End time must be after start time") + recordings = self._query_recordings(source_camera, start_ts, end_ts) + if not recordings.count(): + raise ValueError( + f"No recordings found for camera '{source_camera}' in the specified time range" + ) - # Query recordings for the source camera in the time range - recordings = ( + replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}" + self.replay_camera_name = replay_name + self.source_camera = source_camera + self.start_ts = start_ts + self.end_ts = end_ts + self.progress_percent = None + self._set_state(ReplayState.preparing_clip) + + worker = threading.Thread( + target=self._run_start_worker, + name=f"debug-replay-start-{replay_name}", + args=(source_camera, start_ts, end_ts, frigate_config, config_publisher), + daemon=True, + ) + self._worker_thread = worker + worker.start() + + return replay_name + + def _query_recordings(self, source_camera: str, start_ts: float, end_ts: float): + """Return the Recordings query for the time range. Extracted so tests can patch. + + Args: + source_camera: Name of the source camera + start_ts: Start timestamp + end_ts: End timestamp + + Returns: + Peewee query for recordings in the time range + """ + return ( Recordings.select( Recordings.path, Recordings.start_time, @@ -141,79 +172,148 @@ class DebugReplayManager: .order_by(Recordings.start_time.asc()) ) - if not recordings.count(): - raise ValueError( - f"No recordings found for camera '{source_camera}' in the specified time range" - ) + def _run_start_worker( + self, + source_camera: str, + start_ts: float, + end_ts: float, + frigate_config: FrigateConfig, + config_publisher: CameraConfigUpdatePublisher, + ) -> None: + """Worker thread body — runs ffmpeg and publishes the camera config. + + Args: + source_camera: Name of the source camera to replay + start_ts: Start timestamp + end_ts: End timestamp + frigate_config: Current Frigate configuration + config_publisher: Publisher for camera config updates + """ + replay_name = self.replay_camera_name + if replay_name is None: + return - # Create replay directory os.makedirs(REPLAY_DIR, exist_ok=True) - - # Generate replay camera name - replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}" - - # Build concat file for ffmpeg concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt") clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4") - with open(concat_file, "w") as f: - for recording in recordings: - f.write(f"file '{recording.path}'\n") - - # Concatenate recordings into a single clip with -c copy (fast) - ffmpeg_cmd = [ - frigate_config.ffmpeg.ffmpeg_path, - "-hide_banner", - "-y", - "-f", - "concat", - "-safe", - "0", - "-i", - concat_file, - "-c", - "copy", - "-movflags", - "+faststart", - clip_path, - ] - - logger.info( - "Generating replay clip for %s (%.1f - %.1f)", - source_camera, - start_ts, - end_ts, - ) - try: - result = sp.run( - ffmpeg_cmd, - capture_output=True, - text=True, - timeout=120, + recordings = self._query_recordings(source_camera, start_ts, end_ts) + with open(concat_file, "w") as f: + for recording in recordings: + f.write(f"file '{recording.path}'\n") + + ffmpeg_cmd = [ + frigate_config.ffmpeg.ffmpeg_path, + "-hide_banner", + "-y", + "-f", + "concat", + "-safe", + "0", + "-i", + concat_file, + "-c", + "copy", + "-movflags", + "+faststart", + clip_path, + ] + + logger.info( + "Generating replay clip for %s (%.1f - %.1f)", + source_camera, + start_ts, + end_ts, ) - if result.returncode != 0: - logger.error("FFmpeg error: %s", result.stderr) - raise RuntimeError( - f"Failed to generate replay clip: {result.stderr[-500:]}" + + def _record_proc(p: sp.Popen) -> None: + self._active_process = p + + def _on_progress(percent: float) -> None: + self.progress_percent = percent + + try: + returncode, stderr = run_ffmpeg_with_progress( + ffmpeg_cmd, + expected_duration_seconds=max(0.0, end_ts - start_ts), + on_progress=_on_progress, + process_started=_record_proc, + use_low_priority=True, ) - except sp.TimeoutExpired: - raise RuntimeError("Clip generation timed out") + finally: + self._active_process = None + + if returncode != 0: + raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}") + + if not os.path.exists(clip_path): + raise RuntimeError("Clip file was not created") + + with self._lock: + # If stop() ran while we were preparing, bail out cleanly. + if self._state != ReplayState.preparing_clip: + logger.info( + "Replay startup aborted (state=%s); discarding clip", + self._state, + ) + return + self._set_state(ReplayState.starting_camera) + + self._publish_replay_camera( + source_camera, replay_name, clip_path, frigate_config, config_publisher + ) + + with self._lock: + self.clip_path = clip_path + self._set_state(ReplayState.active) + + logger.info("Debug replay started: %s -> %s", source_camera, replay_name) + except Exception as exc: + logger.exception("Debug replay startup failed") + with self._lock: + self._set_state(ReplayState.error, error_message=str(exc)) + # Drop session pointers so the next /start is allowed. + self.replay_camera_name = None + self.source_camera = None + self.clip_path = None + self.start_ts = None + self.end_ts = None + # Best-effort cleanup of any partial clip on disk. + try: + if os.path.exists(clip_path): + os.remove(clip_path) + except OSError: + pass finally: - # Clean up concat file - if os.path.exists(concat_file): - os.remove(concat_file) + try: + if os.path.exists(concat_file): + os.remove(concat_file) + except OSError: + pass - if not os.path.exists(clip_path): - raise RuntimeError("Clip file was not created") + def _publish_replay_camera( + self, + source_camera: str, + replay_name: str, + clip_path: str, + frigate_config: FrigateConfig, + config_publisher: CameraConfigUpdatePublisher, + ) -> None: + """Build the in-memory camera config and publish the add event. - # Build camera config dict for the replay camera + Args: + source_camera: Name of the source camera + replay_name: Name for the replay camera + clip_path: Path to the replay clip file + frigate_config: Current Frigate configuration + config_publisher: Publisher for camera config updates + """ source_config = frigate_config.cameras[source_camera] camera_dict = self._build_camera_config_dict( source_config, replay_name, clip_path ) - # Build an in-memory config with the replay camera added config_file = find_config_file() yaml_parser = YAML() with open(config_file, "r") as f: @@ -223,31 +323,14 @@ class DebugReplayManager: config_data["cameras"] = {} config_data["cameras"][replay_name] = camera_dict - try: - new_config = FrigateConfig.parse_object(config_data) - except Exception as e: - raise RuntimeError(f"Failed to validate replay camera config: {e}") - - # Update the running config + new_config = FrigateConfig.parse_object(config_data) frigate_config.cameras[replay_name] = new_config.cameras[replay_name] - # Publish the add event config_publisher.publish_update( CameraConfigUpdateTopic(CameraConfigUpdateEnum.add, replay_name), new_config.cameras[replay_name], ) - # Store session state - self.replay_camera_name = replay_name - self.source_camera = source_camera - self.clip_path = clip_path - self.start_ts = start_ts - self.end_ts = end_ts - self._set_state(ReplayState.active) - - logger.info("Debug replay started: %s -> %s", source_camera, replay_name) - return replay_name - def stop( self, frigate_config: FrigateConfig, diff --git a/frigate/test/test_debug_replay.py b/frigate/test/test_debug_replay.py index 3708c1606..296cbed61 100644 --- a/frigate/test/test_debug_replay.py +++ b/frigate/test/test_debug_replay.py @@ -1,6 +1,10 @@ """Tests for DebugReplayManager state machine and async startup.""" +import threading +import time import unittest +import unittest.mock +from unittest.mock import MagicMock, patch from frigate.debug_replay import DebugReplayManager, ReplayState @@ -34,3 +38,163 @@ class TestDebugReplayManagerState(unittest.TestCase): manager._set_state(ReplayState.error, error_message="boom") self.assertFalse(manager.active) self.assertEqual(manager.error_message, "boom") + + +class TestDebugReplayManagerAsyncStart(unittest.TestCase): + def setUp(self): + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + + def test_progress_percent_tracks_helper_callbacks(self): + recordings_qs = MagicMock() + recordings_qs.count.return_value = 1 + recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): + on_progress(0.0) + on_progress(42.5) + on_progress(100.0) + return 0, "" + + with ( + patch.object(self.manager, "_query_recordings", return_value=recordings_qs), + patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper), + patch.object(self.manager, "_publish_replay_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + self.manager.start( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + ) + for _ in range(50): + if self.manager.state == ReplayState.active: + break + time.sleep(0.05) + + # Progress should have advanced through the callback values. + self.assertEqual(self.manager.state, ReplayState.active) + self.assertEqual(self.manager.progress_percent, 100.0) + + def test_start_returns_immediately_with_preparing_state(self): + recordings_qs = MagicMock() + recordings_qs.count.return_value = 1 + recordings_qs.__iter__.return_value = iter( + [MagicMock(path="/tmp/r1.mp4")] + ) + + # Block the worker thread before it transitions out of preparing_clip. + worker_can_proceed = threading.Event() + + def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): + worker_can_proceed.wait(timeout=5) + return 0, "" + + with ( + patch.object( + self.manager, + "_query_recordings", + return_value=recordings_qs, + ), + patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper), + patch.object(self.manager, "_publish_replay_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + replay_name = self.manager.start( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + ) + + # Returned synchronously + self.assertTrue(replay_name.startswith("_replay_")) + self.assertEqual(self.manager.state, ReplayState.preparing_clip) + + worker_can_proceed.set() + + # Wait for worker to finish + for _ in range(50): + if self.manager.state == ReplayState.active: + break + time.sleep(0.05) + self.assertEqual(self.manager.state, ReplayState.active) + + def test_start_rejects_concurrent_calls_with_value_error(self): + recordings_qs = MagicMock() + recordings_qs.count.return_value = 1 + recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + block = threading.Event() + + def slow_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): + block.wait(timeout=5) + return 0, "" + + with ( + patch.object(self.manager, "_query_recordings", return_value=recordings_qs), + patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=slow_helper), + patch.object(self.manager, "_publish_replay_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + self.manager.start( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + ) + + with self.assertRaises(ValueError): + self.manager.start( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + ) + + block.set() + + def test_start_transitions_to_error_state_when_ffmpeg_fails(self): + recordings_qs = MagicMock() + recordings_qs.count.return_value = 1 + recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def failing_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): + return 1, "ffmpeg exploded" + + with ( + patch.object(self.manager, "_query_recordings", return_value=recordings_qs), + patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=failing_helper), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + self.manager.start( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + ) + + for _ in range(50): + if self.manager.state == ReplayState.error: + break + time.sleep(0.05) + self.assertEqual(self.manager.state, ReplayState.error) + self.assertIsNotNone(self.manager.error_message) + self.assertIn("ffmpeg", self.manager.error_message.lower())