diff --git a/frigate/test/http_api/test_debug_replay_api.py b/frigate/test/http_api/test_debug_replay_api.py index bb6552786..a0bf4b4e5 100644 --- a/frigate/test/http_api/test_debug_replay_api.py +++ b/frigate/test/http_api/test_debug_replay_api.py @@ -2,7 +2,6 @@ from unittest.mock import patch -from frigate.debug_replay import ReplayState from frigate.models import Event, Recordings, ReviewSegment from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp @@ -12,60 +11,111 @@ class TestDebugReplayAPI(BaseTestHttp): super().setUp([Event, Recordings, ReviewSegment]) self.app = self.create_app() - def test_start_returns_202_with_state_preparing_clip(self): + def test_start_returns_202_with_job_id(self): + # Stub the factory to skip validation/threading and just record the + # name on the manager the way the real factory's mark_starting would. + def fake_start(**kwargs): + kwargs["replay_manager"].mark_starting( + source_camera=kwargs["source_camera"], + replay_camera_name="_replay_front", + start_ts=kwargs["start_ts"], + end_ts=kwargs["end_ts"], + ) + return "job-1234" + with patch( - "frigate.debug_replay.DebugReplayManager.start", - return_value="_replay_front", + "frigate.api.debug_replay.start_debug_replay_job", + side_effect=fake_start, ): - with patch.object( - type(self.app.replay_manager), - "state", - new_callable=lambda: property(lambda s: ReplayState.preparing_clip), - ): - with AuthTestClient(self.app) as client: - resp = client.post( - "/debug_replay/start", - json={ - "camera": "front", - "start_time": 100, - "end_time": 200, - }, - headers={"remote-user": "admin", "remote-role": "admin"}, - ) + with AuthTestClient(self.app) as client: + resp = client.post( + "/debug_replay/start", + json={ + "camera": "front", + "start_time": 100, + "end_time": 200, + }, + ) self.assertEqual(resp.status_code, 202) body = resp.json() self.assertTrue(body["success"]) + self.assertEqual(body["job_id"], "job-1234") self.assertEqual(body["replay_camera"], "_replay_front") - self.assertEqual(body["state"], "preparing_clip") - def test_status_returns_state_and_error_message(self): - manager = self.app.replay_manager - manager._set_state(ReplayState.error, error_message="ffmpeg failed: boom") + def test_start_returns_400_on_validation_error(self): + with patch( + "frigate.api.debug_replay.start_debug_replay_job", + side_effect=ValueError("Camera 'missing' not found"), + ): + with AuthTestClient(self.app) as client: + resp = client.post( + "/debug_replay/start", + json={ + "camera": "missing", + "start_time": 100, + "end_time": 200, + }, + ) + self.assertEqual(resp.status_code, 400) + body = resp.json() + self.assertFalse(body["success"]) + self.assertIn("missing", body["message"]) + + def test_start_returns_409_when_session_already_active(self): + with patch( + "frigate.api.debug_replay.start_debug_replay_job", + side_effect=RuntimeError("A replay session is already active"), + ): + with AuthTestClient(self.app) as client: + resp = client.post( + "/debug_replay/start", + json={ + "camera": "front", + "start_time": 100, + "end_time": 200, + }, + ) + + self.assertEqual(resp.status_code, 409) + body = resp.json() + self.assertFalse(body["success"]) + + def test_status_inactive_when_no_session(self): with AuthTestClient(self.app) as client: - resp = client.get( - "/debug_replay/status", - headers={"remote-user": "admin", "remote-role": "admin"}, - ) + resp = client.get("/debug_replay/status") self.assertEqual(resp.status_code, 200) body = resp.json() - self.assertEqual(body["state"], "error") - self.assertEqual(body["error_message"], "ffmpeg failed: boom") - self.assertIsNone(body["progress_percent"]) self.assertFalse(body["active"]) + self.assertIsNone(body["replay_camera"]) + self.assertIsNone(body["source_camera"]) + self.assertIsNone(body["start_time"]) + self.assertIsNone(body["end_time"]) + self.assertFalse(body["live_ready"]) + # Make sure deprecated fields are gone + self.assertNotIn("state", body) + self.assertNotIn("progress_percent", body) + self.assertNotIn("error_message", body) - def test_status_returns_progress_percent_during_preparing_clip(self): + def test_status_active_after_mark_starting(self): manager = self.app.replay_manager - manager._set_state(ReplayState.preparing_clip) - manager.progress_percent = 37.5 + manager.mark_starting( + source_camera="front", + replay_camera_name="_replay_front", + start_ts=100.0, + end_ts=200.0, + ) with AuthTestClient(self.app) as client: - resp = client.get( - "/debug_replay/status", - headers={"remote-user": "admin", "remote-role": "admin"}, - ) + resp = client.get("/debug_replay/status") self.assertEqual(resp.status_code, 200) - self.assertEqual(resp.json()["progress_percent"], 37.5) + body = resp.json() + self.assertTrue(body["active"]) + self.assertEqual(body["replay_camera"], "_replay_front") + self.assertEqual(body["source_camera"], "front") + self.assertEqual(body["start_time"], 100.0) + self.assertEqual(body["end_time"], 200.0) + self.assertFalse(body["live_ready"]) diff --git a/frigate/test/test_debug_replay.py b/frigate/test/test_debug_replay.py index d88b58d37..7d04fbd57 100644 --- a/frigate/test/test_debug_replay.py +++ b/frigate/test/test_debug_replay.py @@ -1,252 +1,209 @@ -"""Tests for DebugReplayManager state machine and async startup.""" +"""Tests for the simplified DebugReplayManager. + +Startup orchestration lives in ``frigate.jobs.debug_replay`` (covered by +``test_debug_replay_job``). The manager owns only session presence and +cleanup. +""" -import threading -import time import unittest import unittest.mock from unittest.mock import MagicMock, patch -from frigate.debug_replay import DebugReplayManager, ReplayState +class TestDebugReplayManagerSession(unittest.TestCase): + def test_inactive_by_default(self) -> None: + from frigate.debug_replay import DebugReplayManager -class TestDebugReplayManagerState(unittest.TestCase): - def test_initial_state_is_idle(self): manager = DebugReplayManager() - self.assertEqual(manager.state, ReplayState.idle) - self.assertIsNone(manager.error_message) self.assertFalse(manager.active) + self.assertIsNone(manager.replay_camera_name) + self.assertIsNone(manager.source_camera) + self.assertIsNone(manager.clip_path) + self.assertIsNone(manager.start_ts) + self.assertIsNone(manager.end_ts) + + def test_mark_starting_sets_session_pointers_and_active(self) -> None: + from frigate.debug_replay import DebugReplayManager - def test_active_property_true_for_preparing_starting_and_active_states(self): manager = DebugReplayManager() - manager._set_state(ReplayState.preparing_clip) - self.assertTrue(manager.active) - - manager._set_state(ReplayState.starting_camera) - self.assertTrue(manager.active) - - manager._set_state(ReplayState.active) - self.assertTrue(manager.active) - - def test_active_property_false_for_idle_and_error_states(self): - manager = DebugReplayManager() - - manager._set_state(ReplayState.idle) - self.assertFalse(manager.active) - - manager._set_state(ReplayState.error, error_message="boom") - self.assertFalse(manager.active) - self.assertEqual(manager.error_message, "boom") - - -class TestDebugReplayManagerAsyncStart(unittest.TestCase): - def setUp(self): - self.manager = DebugReplayManager() - self.frigate_config = MagicMock() - self.frigate_config.cameras = {"front": MagicMock()} - self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" - self.publisher = MagicMock() - - def test_progress_percent_tracks_helper_callbacks(self): - recordings_qs = MagicMock() - recordings_qs.count.return_value = 1 - recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) - - def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): - on_progress(0.0) - on_progress(42.5) - on_progress(100.0) - return 0, "" - - with ( - patch.object(self.manager, "_query_recordings", return_value=recordings_qs), - patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper), - patch.object(self.manager, "_publish_replay_camera"), - patch("os.path.exists", return_value=True), - patch("os.makedirs"), - patch("builtins.open", unittest.mock.mock_open()), - ): - self.manager.start( - source_camera="front", - start_ts=100.0, - end_ts=200.0, - frigate_config=self.frigate_config, - config_publisher=self.publisher, - ) - for _ in range(50): - if self.manager.state == ReplayState.active: - break - time.sleep(0.05) - - # Progress should have advanced through the callback values. - self.assertEqual(self.manager.state, ReplayState.active) - self.assertEqual(self.manager.progress_percent, 100.0) - - def test_start_returns_immediately_with_preparing_state(self): - recordings_qs = MagicMock() - recordings_qs.count.return_value = 1 - recordings_qs.__iter__.return_value = iter( - [MagicMock(path="/tmp/r1.mp4")] + manager.mark_starting( + source_camera="front", + replay_camera_name="_replay_front", + start_ts=100.0, + end_ts=200.0, ) - # Block the worker thread before it transitions out of preparing_clip. - worker_can_proceed = threading.Event() + self.assertTrue(manager.active) + self.assertEqual(manager.replay_camera_name, "_replay_front") + self.assertEqual(manager.source_camera, "front") + self.assertEqual(manager.start_ts, 100.0) + self.assertEqual(manager.end_ts, 200.0) + self.assertIsNone(manager.clip_path) - def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): - worker_can_proceed.wait(timeout=5) - return 0, "" + def test_mark_session_ready_sets_clip_path(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + + manager.mark_session_ready(clip_path="/tmp/replay/_replay_front.mp4") + + self.assertEqual(manager.clip_path, "/tmp/replay/_replay_front.mp4") + self.assertTrue(manager.active) + + def test_clear_session_resets_all_pointers(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + manager.mark_session_ready("/tmp/replay/clip.mp4") + + manager.clear_session() + + self.assertFalse(manager.active) + self.assertIsNone(manager.replay_camera_name) + self.assertIsNone(manager.source_camera) + self.assertIsNone(manager.clip_path) + self.assertIsNone(manager.start_ts) + self.assertIsNone(manager.end_ts) + + +class TestDebugReplayManagerStop(unittest.TestCase): + def test_stop_when_inactive_is_a_noop(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + frigate_config = MagicMock() + frigate_config.cameras = {} + publisher = MagicMock() + + # Should not raise; should not publish any events. + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + publisher.publish_update.assert_not_called() + + def test_stop_publishes_remove_when_camera_was_published(self) -> None: + from frigate.config.camera.updater import CameraConfigUpdateEnum + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + manager.mark_session_ready("/tmp/replay/_replay_front.mp4") + + camera_config = MagicMock() + frigate_config = MagicMock() + frigate_config.cameras = {"_replay_front": camera_config} + publisher = MagicMock() + + with ( + patch.object(manager, "_cleanup_db"), + patch.object(manager, "_cleanup_files"), + patch( + "frigate.debug_replay.cancel_debug_replay_job", return_value=False + ), + ): + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + # One publish_update call with a remove topic. + self.assertEqual(publisher.publish_update.call_count, 1) + topic_arg = publisher.publish_update.call_args.args[0] + self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.remove) + self.assertFalse(manager.active) + + def test_stop_skips_remove_publish_when_camera_not_in_config(self) -> None: + """Cancellation during preparing_clip: no camera was published yet.""" + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + # clip_path stays None because we cancelled before camera publish. + + frigate_config = MagicMock() + frigate_config.cameras = {} # _replay_front not present + publisher = MagicMock() + + with ( + patch.object(manager, "_cleanup_db"), + patch.object(manager, "_cleanup_files"), + patch( + "frigate.debug_replay.cancel_debug_replay_job", return_value=True + ), + ): + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + publisher.publish_update.assert_not_called() + self.assertFalse(manager.active) + + def test_stop_calls_cancel_debug_replay_job(self) -> None: + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + manager.mark_starting("front", "_replay_front", 100.0, 200.0) + + frigate_config = MagicMock() + frigate_config.cameras = {} + publisher = MagicMock() + + with ( + patch.object(manager, "_cleanup_db"), + patch.object(manager, "_cleanup_files"), + patch( + "frigate.debug_replay.cancel_debug_replay_job", + return_value=True, + ) as mock_cancel, + ): + manager.stop(frigate_config=frigate_config, config_publisher=publisher) + + mock_cancel.assert_called_once() + + +class TestDebugReplayManagerPublishCamera(unittest.TestCase): + def test_publish_camera_invokes_publisher_with_add_topic(self) -> None: + from frigate.config.camera.updater import CameraConfigUpdateEnum + from frigate.debug_replay import DebugReplayManager + + manager = DebugReplayManager() + + source_config = MagicMock() + new_camera_config = MagicMock() + frigate_config = MagicMock() + frigate_config.cameras = {"front": source_config} + publisher = MagicMock() with ( patch.object( - self.manager, - "_query_recordings", - return_value=recordings_qs, + manager, + "_build_camera_config_dict", + return_value={"enabled": True}, ), - patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper), - patch.object(self.manager, "_publish_replay_camera"), - patch("os.path.exists", return_value=True), - patch("os.makedirs"), - patch("builtins.open", unittest.mock.mock_open()), + patch("frigate.debug_replay.find_config_file", return_value="/cfg.yml"), + patch("frigate.debug_replay.YAML") as yaml_cls, + patch("frigate.debug_replay.FrigateConfig.parse_object") as parse_object, + patch("builtins.open", unittest.mock.mock_open(read_data="cameras:\n")), ): - replay_name = self.manager.start( + yaml_instance = yaml_cls.return_value + yaml_instance.load.return_value = {"cameras": {}} + parsed = MagicMock() + parsed.cameras = {"_replay_front": new_camera_config} + parse_object.return_value = parsed + + manager.publish_camera( source_camera="front", - start_ts=100.0, - end_ts=200.0, - frigate_config=self.frigate_config, - config_publisher=self.publisher, - ) - - # Returned synchronously - self.assertTrue(replay_name.startswith("_replay_")) - self.assertEqual(self.manager.state, ReplayState.preparing_clip) - - worker_can_proceed.set() - - # Wait for worker to finish - for _ in range(50): - if self.manager.state == ReplayState.active: - break - time.sleep(0.05) - self.assertEqual(self.manager.state, ReplayState.active) - - def test_start_rejects_concurrent_calls_with_value_error(self): - recordings_qs = MagicMock() - recordings_qs.count.return_value = 1 - recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) - - block = threading.Event() - - def slow_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): - block.wait(timeout=5) - return 0, "" - - with ( - patch.object(self.manager, "_query_recordings", return_value=recordings_qs), - patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=slow_helper), - patch.object(self.manager, "_publish_replay_camera"), - patch("os.path.exists", return_value=True), - patch("os.makedirs"), - patch("builtins.open", unittest.mock.mock_open()), - ): - self.manager.start( - source_camera="front", - start_ts=100.0, - end_ts=200.0, - frigate_config=self.frigate_config, - config_publisher=self.publisher, - ) - - with self.assertRaises(ValueError): - self.manager.start( - source_camera="front", - start_ts=100.0, - end_ts=200.0, - frigate_config=self.frigate_config, - config_publisher=self.publisher, - ) - - block.set() - - def test_start_transitions_to_error_state_when_ffmpeg_fails(self): - recordings_qs = MagicMock() - recordings_qs.count.return_value = 1 - recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) - - def failing_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs): - return 1, "ffmpeg exploded" - - with ( - patch.object(self.manager, "_query_recordings", return_value=recordings_qs), - patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=failing_helper), - patch("os.path.exists", return_value=True), - patch("os.makedirs"), - patch("builtins.open", unittest.mock.mock_open()), - ): - self.manager.start( - source_camera="front", - start_ts=100.0, - end_ts=200.0, - frigate_config=self.frigate_config, - config_publisher=self.publisher, - ) - - for _ in range(50): - if self.manager.state == ReplayState.error: - break - time.sleep(0.05) - self.assertEqual(self.manager.state, ReplayState.error) - self.assertIsNotNone(self.manager.error_message) - self.assertIn("ffmpeg", self.manager.error_message.lower()) - - -class TestDebugReplayManagerCancellation(unittest.TestCase): - def test_stop_during_preparing_clip_terminates_ffmpeg(self): - manager = DebugReplayManager() - frigate_config = MagicMock() - frigate_config.cameras = {"front": MagicMock()} - frigate_config.ffmpeg.ffmpeg_path = "/bin/sh" - publisher = MagicMock() - - recordings_qs = MagicMock() - recordings_qs.count.return_value = 1 - recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) - - terminated_event = threading.Event() - fake_proc = MagicMock() - original_terminate = MagicMock(side_effect=lambda: terminated_event.set()) - fake_proc.terminate = original_terminate - - def fake_helper(cmd, *, expected_duration_seconds, on_progress, process_started, **kwargs): - if process_started is not None: - process_started(fake_proc) - # Block until stop() calls fake_proc.terminate() - terminated_event.wait(timeout=5) - return -15, "killed" - - with ( - patch.object(manager, "_query_recordings", return_value=recordings_qs), - patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper), - patch("os.path.exists", return_value=True), - patch("os.makedirs"), - patch("os.remove"), - patch("builtins.open", unittest.mock.mock_open()), - ): - manager.start( - source_camera="front", - start_ts=100.0, - end_ts=200.0, + replay_name="_replay_front", + clip_path="/tmp/clip.mp4", frigate_config=frigate_config, config_publisher=publisher, ) - # Wait for the worker to register the active process - for _ in range(50): - if manager._active_process is fake_proc: - break - time.sleep(0.02) - self.assertEqual(manager.state, ReplayState.preparing_clip) - manager.stop(frigate_config=frigate_config, config_publisher=publisher) + # Camera registered into the live config dict + self.assertIn("_replay_front", frigate_config.cameras) + # Publisher invoked with an add topic + self.assertEqual(publisher.publish_update.call_count, 1) + topic_arg = publisher.publish_update.call_args.args[0] + self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.add) - self.assertTrue(original_terminate.called, "terminate() should have been called") - self.assertEqual(manager.state, ReplayState.idle) + +if __name__ == "__main__": + unittest.main() diff --git a/frigate/test/test_debug_replay_job.py b/frigate/test/test_debug_replay_job.py new file mode 100644 index 000000000..928807e58 --- /dev/null +++ b/frigate/test/test_debug_replay_job.py @@ -0,0 +1,455 @@ +"""Tests for the debug replay job runner and factory.""" + +import threading +import time +import unittest +import unittest.mock +from unittest.mock import MagicMock, patch + +from frigate.debug_replay import DebugReplayManager +from frigate.jobs.debug_replay import ( + DebugReplayJob, + cancel_debug_replay_job, + get_active_runner, + start_debug_replay_job, +) +from frigate.jobs.export import JobStatePublisher +from frigate.jobs.manager import _completed_jobs, _current_jobs +from frigate.types import JobStatusTypesEnum + + +def _reset_job_manager() -> None: + """Clear the global job manager state between tests.""" + _current_jobs.clear() + _completed_jobs.clear() + + +def _patch_publisher(test_case: unittest.TestCase) -> None: + """Replace JobStatePublisher.publish with a no-op to avoid hanging on IPC.""" + publisher_patch = patch.object( + JobStatePublisher, "publish", lambda self, payload: None + ) + publisher_patch.start() + test_case.addCleanup(publisher_patch.stop) + + +class TestDebugReplayJob(unittest.TestCase): + def test_default_fields(self) -> None: + job = DebugReplayJob() + + self.assertEqual(job.job_type, "debug_replay") + self.assertEqual(job.status, JobStatusTypesEnum.queued) + self.assertIsNone(job.current_step) + self.assertEqual(job.progress_percent, 0.0) + + def test_to_dict_whitelist(self) -> None: + job = DebugReplayJob( + source_camera="front", + replay_camera_name="_replay_front", + start_ts=100.0, + end_ts=200.0, + ) + job.current_step = "preparing_clip" + job.progress_percent = 42.5 + + payload = job.to_dict() + + # Top-level matches the standard Job shape. + for key in ( + "id", + "job_type", + "status", + "start_time", + "end_time", + "error_message", + "results", + ): + self.assertIn(key, payload, f"missing top-level field: {key}") + + results = payload["results"] + self.assertEqual(results["source_camera"], "front") + self.assertEqual(results["replay_camera_name"], "_replay_front") + self.assertEqual(results["current_step"], "preparing_clip") + self.assertEqual(results["progress_percent"], 42.5) + self.assertEqual(results["start_ts"], 100.0) + self.assertEqual(results["end_ts"], 200.0) + + +class TestStartDebugReplayJob(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def test_rejects_unknown_camera(self) -> None: + with self.assertRaises(ValueError): + start_debug_replay_job( + source_camera="missing", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + def test_rejects_invalid_time_range(self) -> None: + with self.assertRaises(ValueError): + start_debug_replay_job( + source_camera="front", + start_ts=200.0, + end_ts=100.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + def test_rejects_when_no_recordings(self) -> None: + empty_qs = MagicMock() + empty_qs.count.return_value = 0 + with patch("frigate.jobs.debug_replay.query_recordings", return_value=empty_qs): + with self.assertRaises(ValueError): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + def test_returns_job_id_and_marks_session_starting(self) -> None: + block = threading.Event() + + def slow_helper(cmd, **kwargs): + block.wait(timeout=5) + return 0, "" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=slow_helper, + ), + patch.object(self.manager, "publish_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + job_id = start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + self.assertIsInstance(job_id, str) + self.assertTrue(self.manager.active) + self.assertEqual(self.manager.replay_camera_name, "_replay_front") + self.assertEqual(self.manager.source_camera, "front") + + block.set() + + def test_rejects_concurrent_calls(self) -> None: + block = threading.Event() + + def slow_helper(cmd, **kwargs): + block.wait(timeout=5) + return 0, "" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=slow_helper, + ), + patch.object(self.manager, "publish_camera"), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + with self.assertRaises(RuntimeError): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + block.set() + + +class TestRunnerHappyPath(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def _wait_for(self, predicate, timeout: float = 5.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return False + + def test_progress_callback_updates_job_percent(self) -> None: + captured: list[float] = [] + + def fake_helper(cmd, *, on_progress=None, **kwargs): + on_progress(0.0) + on_progress(50.0) + on_progress(100.0) + return 0, "" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=fake_helper, + ), + patch.object( + self.manager, + "publish_camera", + side_effect=lambda *a, **kw: captured.append("published"), + ), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + self.assertTrue( + self._wait_for(lambda: get_active_runner() is None), + "runner did not finish", + ) + + from frigate.jobs.manager import get_current_job + + job = get_current_job("debug_replay") + self.assertIsNotNone(job) + self.assertEqual(job.status, JobStatusTypesEnum.success) + self.assertEqual(job.progress_percent, 100.0) + self.assertEqual(captured, ["published"]) + # Manager should have been told the session is ready with the clip path. + self.assertIsNotNone(self.manager.clip_path) + + +class TestRunnerFailurePath(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def _wait_for(self, predicate, timeout: float = 5.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return False + + def test_ffmpeg_failure_marks_job_failed_and_clears_session(self) -> None: + def failing_helper(cmd, **kwargs): + return 1, "ffmpeg exploded" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=failing_helper, + ), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("os.remove"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + self.assertTrue( + self._wait_for(lambda: get_active_runner() is None), + "runner did not finish", + ) + + from frigate.jobs.manager import get_current_job + + job = get_current_job("debug_replay") + self.assertIsNotNone(job) + self.assertEqual(job.status, JobStatusTypesEnum.failed) + self.assertIsNotNone(job.error_message) + self.assertIn("ffmpeg", job.error_message.lower()) + # Session cleared so a new /start is allowed + self.assertFalse(self.manager.active) + + +class TestRunnerCancellation(unittest.TestCase): + def setUp(self) -> None: + _reset_job_manager() + _patch_publisher(self) + self.manager = DebugReplayManager() + self.frigate_config = MagicMock() + self.frigate_config.cameras = {"front": MagicMock()} + self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true" + self.publisher = MagicMock() + self.recordings_qs = MagicMock() + self.recordings_qs.count.return_value = 1 + self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")]) + + def tearDown(self) -> None: + runner = get_active_runner() + if runner is not None: + runner.cancel() + runner.join(timeout=2.0) + _reset_job_manager() + + def _wait_for(self, predicate, timeout: float = 5.0) -> bool: + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return True + time.sleep(0.02) + return False + + def test_cancel_terminates_ffmpeg_and_marks_cancelled(self) -> None: + terminated = threading.Event() + fake_proc = MagicMock() + fake_proc.terminate = MagicMock(side_effect=lambda: terminated.set()) + + def fake_helper(cmd, *, process_started=None, **kwargs): + if process_started is not None: + process_started(fake_proc) + terminated.wait(timeout=5) + return -15, "killed" + + with ( + patch( + "frigate.jobs.debug_replay.query_recordings", + return_value=self.recordings_qs, + ), + patch( + "frigate.jobs.debug_replay.run_ffmpeg_with_progress", + side_effect=fake_helper, + ), + patch("os.path.exists", return_value=True), + patch("os.makedirs"), + patch("os.remove"), + patch("builtins.open", unittest.mock.mock_open()), + ): + start_debug_replay_job( + source_camera="front", + start_ts=100.0, + end_ts=200.0, + frigate_config=self.frigate_config, + config_publisher=self.publisher, + replay_manager=self.manager, + ) + + # Wait for the runner to register the active process. + self.assertTrue( + self._wait_for( + lambda: ( + get_active_runner() is not None + and get_active_runner()._active_process is fake_proc + ) + ) + ) + + cancelled = cancel_debug_replay_job() + self.assertTrue(cancelled) + self.assertTrue(fake_proc.terminate.called) + + self.assertTrue( + self._wait_for(lambda: get_active_runner() is None), + "runner did not finish", + ) + + from frigate.jobs.manager import get_current_job + + job = get_current_job("debug_replay") + self.assertEqual(job.status, JobStatusTypesEnum.cancelled) + + +if __name__ == "__main__": + unittest.main()