This commit is contained in:
Josh Hawkins 2026-05-03 11:09:48 -05:00
parent 5eee65984a
commit a142b2dcef
3 changed files with 725 additions and 263 deletions

View File

@ -2,7 +2,6 @@
from unittest.mock import patch
from frigate.debug_replay import ReplayState
from frigate.models import Event, Recordings, ReviewSegment
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
@ -12,60 +11,111 @@ class TestDebugReplayAPI(BaseTestHttp):
super().setUp([Event, Recordings, ReviewSegment])
self.app = self.create_app()
def test_start_returns_202_with_state_preparing_clip(self):
def test_start_returns_202_with_job_id(self):
# Stub the factory to skip validation/threading and just record the
# name on the manager the way the real factory's mark_starting would.
def fake_start(**kwargs):
kwargs["replay_manager"].mark_starting(
source_camera=kwargs["source_camera"],
replay_camera_name="_replay_front",
start_ts=kwargs["start_ts"],
end_ts=kwargs["end_ts"],
)
return "job-1234"
with patch(
"frigate.debug_replay.DebugReplayManager.start",
return_value="_replay_front",
"frigate.api.debug_replay.start_debug_replay_job",
side_effect=fake_start,
):
with patch.object(
type(self.app.replay_manager),
"state",
new_callable=lambda: property(lambda s: ReplayState.preparing_clip),
):
with AuthTestClient(self.app) as client:
resp = client.post(
"/debug_replay/start",
json={
"camera": "front",
"start_time": 100,
"end_time": 200,
},
headers={"remote-user": "admin", "remote-role": "admin"},
)
with AuthTestClient(self.app) as client:
resp = client.post(
"/debug_replay/start",
json={
"camera": "front",
"start_time": 100,
"end_time": 200,
},
)
self.assertEqual(resp.status_code, 202)
body = resp.json()
self.assertTrue(body["success"])
self.assertEqual(body["job_id"], "job-1234")
self.assertEqual(body["replay_camera"], "_replay_front")
self.assertEqual(body["state"], "preparing_clip")
def test_status_returns_state_and_error_message(self):
manager = self.app.replay_manager
manager._set_state(ReplayState.error, error_message="ffmpeg failed: boom")
def test_start_returns_400_on_validation_error(self):
with patch(
"frigate.api.debug_replay.start_debug_replay_job",
side_effect=ValueError("Camera 'missing' not found"),
):
with AuthTestClient(self.app) as client:
resp = client.post(
"/debug_replay/start",
json={
"camera": "missing",
"start_time": 100,
"end_time": 200,
},
)
self.assertEqual(resp.status_code, 400)
body = resp.json()
self.assertFalse(body["success"])
self.assertIn("missing", body["message"])
def test_start_returns_409_when_session_already_active(self):
with patch(
"frigate.api.debug_replay.start_debug_replay_job",
side_effect=RuntimeError("A replay session is already active"),
):
with AuthTestClient(self.app) as client:
resp = client.post(
"/debug_replay/start",
json={
"camera": "front",
"start_time": 100,
"end_time": 200,
},
)
self.assertEqual(resp.status_code, 409)
body = resp.json()
self.assertFalse(body["success"])
def test_status_inactive_when_no_session(self):
with AuthTestClient(self.app) as client:
resp = client.get(
"/debug_replay/status",
headers={"remote-user": "admin", "remote-role": "admin"},
)
resp = client.get("/debug_replay/status")
self.assertEqual(resp.status_code, 200)
body = resp.json()
self.assertEqual(body["state"], "error")
self.assertEqual(body["error_message"], "ffmpeg failed: boom")
self.assertIsNone(body["progress_percent"])
self.assertFalse(body["active"])
self.assertIsNone(body["replay_camera"])
self.assertIsNone(body["source_camera"])
self.assertIsNone(body["start_time"])
self.assertIsNone(body["end_time"])
self.assertFalse(body["live_ready"])
# Make sure deprecated fields are gone
self.assertNotIn("state", body)
self.assertNotIn("progress_percent", body)
self.assertNotIn("error_message", body)
def test_status_returns_progress_percent_during_preparing_clip(self):
def test_status_active_after_mark_starting(self):
manager = self.app.replay_manager
manager._set_state(ReplayState.preparing_clip)
manager.progress_percent = 37.5
manager.mark_starting(
source_camera="front",
replay_camera_name="_replay_front",
start_ts=100.0,
end_ts=200.0,
)
with AuthTestClient(self.app) as client:
resp = client.get(
"/debug_replay/status",
headers={"remote-user": "admin", "remote-role": "admin"},
)
resp = client.get("/debug_replay/status")
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["progress_percent"], 37.5)
body = resp.json()
self.assertTrue(body["active"])
self.assertEqual(body["replay_camera"], "_replay_front")
self.assertEqual(body["source_camera"], "front")
self.assertEqual(body["start_time"], 100.0)
self.assertEqual(body["end_time"], 200.0)
self.assertFalse(body["live_ready"])

View File

@ -1,252 +1,209 @@
"""Tests for DebugReplayManager state machine and async startup."""
"""Tests for the simplified DebugReplayManager.
Startup orchestration lives in ``frigate.jobs.debug_replay`` (covered by
``test_debug_replay_job``). The manager owns only session presence and
cleanup.
"""
import threading
import time
import unittest
import unittest.mock
from unittest.mock import MagicMock, patch
from frigate.debug_replay import DebugReplayManager, ReplayState
class TestDebugReplayManagerSession(unittest.TestCase):
def test_inactive_by_default(self) -> None:
from frigate.debug_replay import DebugReplayManager
class TestDebugReplayManagerState(unittest.TestCase):
def test_initial_state_is_idle(self):
manager = DebugReplayManager()
self.assertEqual(manager.state, ReplayState.idle)
self.assertIsNone(manager.error_message)
self.assertFalse(manager.active)
self.assertIsNone(manager.replay_camera_name)
self.assertIsNone(manager.source_camera)
self.assertIsNone(manager.clip_path)
self.assertIsNone(manager.start_ts)
self.assertIsNone(manager.end_ts)
def test_mark_starting_sets_session_pointers_and_active(self) -> None:
from frigate.debug_replay import DebugReplayManager
def test_active_property_true_for_preparing_starting_and_active_states(self):
manager = DebugReplayManager()
manager._set_state(ReplayState.preparing_clip)
self.assertTrue(manager.active)
manager._set_state(ReplayState.starting_camera)
self.assertTrue(manager.active)
manager._set_state(ReplayState.active)
self.assertTrue(manager.active)
def test_active_property_false_for_idle_and_error_states(self):
manager = DebugReplayManager()
manager._set_state(ReplayState.idle)
self.assertFalse(manager.active)
manager._set_state(ReplayState.error, error_message="boom")
self.assertFalse(manager.active)
self.assertEqual(manager.error_message, "boom")
class TestDebugReplayManagerAsyncStart(unittest.TestCase):
def setUp(self):
self.manager = DebugReplayManager()
self.frigate_config = MagicMock()
self.frigate_config.cameras = {"front": MagicMock()}
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
self.publisher = MagicMock()
def test_progress_percent_tracks_helper_callbacks(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
on_progress(0.0)
on_progress(42.5)
on_progress(100.0)
return 0, ""
with (
patch.object(self.manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper),
patch.object(self.manager, "_publish_replay_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
for _ in range(50):
if self.manager.state == ReplayState.active:
break
time.sleep(0.05)
# Progress should have advanced through the callback values.
self.assertEqual(self.manager.state, ReplayState.active)
self.assertEqual(self.manager.progress_percent, 100.0)
def test_start_returns_immediately_with_preparing_state(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter(
[MagicMock(path="/tmp/r1.mp4")]
manager.mark_starting(
source_camera="front",
replay_camera_name="_replay_front",
start_ts=100.0,
end_ts=200.0,
)
# Block the worker thread before it transitions out of preparing_clip.
worker_can_proceed = threading.Event()
self.assertTrue(manager.active)
self.assertEqual(manager.replay_camera_name, "_replay_front")
self.assertEqual(manager.source_camera, "front")
self.assertEqual(manager.start_ts, 100.0)
self.assertEqual(manager.end_ts, 200.0)
self.assertIsNone(manager.clip_path)
def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
worker_can_proceed.wait(timeout=5)
return 0, ""
def test_mark_session_ready_sets_clip_path(self) -> None:
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
manager.mark_session_ready(clip_path="/tmp/replay/_replay_front.mp4")
self.assertEqual(manager.clip_path, "/tmp/replay/_replay_front.mp4")
self.assertTrue(manager.active)
def test_clear_session_resets_all_pointers(self) -> None:
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
manager.mark_session_ready("/tmp/replay/clip.mp4")
manager.clear_session()
self.assertFalse(manager.active)
self.assertIsNone(manager.replay_camera_name)
self.assertIsNone(manager.source_camera)
self.assertIsNone(manager.clip_path)
self.assertIsNone(manager.start_ts)
self.assertIsNone(manager.end_ts)
class TestDebugReplayManagerStop(unittest.TestCase):
def test_stop_when_inactive_is_a_noop(self) -> None:
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
frigate_config = MagicMock()
frigate_config.cameras = {}
publisher = MagicMock()
# Should not raise; should not publish any events.
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
publisher.publish_update.assert_not_called()
def test_stop_publishes_remove_when_camera_was_published(self) -> None:
from frigate.config.camera.updater import CameraConfigUpdateEnum
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
manager.mark_session_ready("/tmp/replay/_replay_front.mp4")
camera_config = MagicMock()
frigate_config = MagicMock()
frigate_config.cameras = {"_replay_front": camera_config}
publisher = MagicMock()
with (
patch.object(manager, "_cleanup_db"),
patch.object(manager, "_cleanup_files"),
patch(
"frigate.debug_replay.cancel_debug_replay_job", return_value=False
),
):
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
# One publish_update call with a remove topic.
self.assertEqual(publisher.publish_update.call_count, 1)
topic_arg = publisher.publish_update.call_args.args[0]
self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.remove)
self.assertFalse(manager.active)
def test_stop_skips_remove_publish_when_camera_not_in_config(self) -> None:
"""Cancellation during preparing_clip: no camera was published yet."""
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
# clip_path stays None because we cancelled before camera publish.
frigate_config = MagicMock()
frigate_config.cameras = {} # _replay_front not present
publisher = MagicMock()
with (
patch.object(manager, "_cleanup_db"),
patch.object(manager, "_cleanup_files"),
patch(
"frigate.debug_replay.cancel_debug_replay_job", return_value=True
),
):
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
publisher.publish_update.assert_not_called()
self.assertFalse(manager.active)
def test_stop_calls_cancel_debug_replay_job(self) -> None:
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
frigate_config = MagicMock()
frigate_config.cameras = {}
publisher = MagicMock()
with (
patch.object(manager, "_cleanup_db"),
patch.object(manager, "_cleanup_files"),
patch(
"frigate.debug_replay.cancel_debug_replay_job",
return_value=True,
) as mock_cancel,
):
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
mock_cancel.assert_called_once()
class TestDebugReplayManagerPublishCamera(unittest.TestCase):
def test_publish_camera_invokes_publisher_with_add_topic(self) -> None:
from frigate.config.camera.updater import CameraConfigUpdateEnum
from frigate.debug_replay import DebugReplayManager
manager = DebugReplayManager()
source_config = MagicMock()
new_camera_config = MagicMock()
frigate_config = MagicMock()
frigate_config.cameras = {"front": source_config}
publisher = MagicMock()
with (
patch.object(
self.manager,
"_query_recordings",
return_value=recordings_qs,
manager,
"_build_camera_config_dict",
return_value={"enabled": True},
),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper),
patch.object(self.manager, "_publish_replay_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
patch("frigate.debug_replay.find_config_file", return_value="/cfg.yml"),
patch("frigate.debug_replay.YAML") as yaml_cls,
patch("frigate.debug_replay.FrigateConfig.parse_object") as parse_object,
patch("builtins.open", unittest.mock.mock_open(read_data="cameras:\n")),
):
replay_name = self.manager.start(
yaml_instance = yaml_cls.return_value
yaml_instance.load.return_value = {"cameras": {}}
parsed = MagicMock()
parsed.cameras = {"_replay_front": new_camera_config}
parse_object.return_value = parsed
manager.publish_camera(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
# Returned synchronously
self.assertTrue(replay_name.startswith("_replay_"))
self.assertEqual(self.manager.state, ReplayState.preparing_clip)
worker_can_proceed.set()
# Wait for worker to finish
for _ in range(50):
if self.manager.state == ReplayState.active:
break
time.sleep(0.05)
self.assertEqual(self.manager.state, ReplayState.active)
def test_start_rejects_concurrent_calls_with_value_error(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
block = threading.Event()
def slow_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
block.wait(timeout=5)
return 0, ""
with (
patch.object(self.manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=slow_helper),
patch.object(self.manager, "_publish_replay_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
with self.assertRaises(ValueError):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
block.set()
def test_start_transitions_to_error_state_when_ffmpeg_fails(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def failing_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
return 1, "ffmpeg exploded"
with (
patch.object(self.manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=failing_helper),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
for _ in range(50):
if self.manager.state == ReplayState.error:
break
time.sleep(0.05)
self.assertEqual(self.manager.state, ReplayState.error)
self.assertIsNotNone(self.manager.error_message)
self.assertIn("ffmpeg", self.manager.error_message.lower())
class TestDebugReplayManagerCancellation(unittest.TestCase):
def test_stop_during_preparing_clip_terminates_ffmpeg(self):
manager = DebugReplayManager()
frigate_config = MagicMock()
frigate_config.cameras = {"front": MagicMock()}
frigate_config.ffmpeg.ffmpeg_path = "/bin/sh"
publisher = MagicMock()
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
terminated_event = threading.Event()
fake_proc = MagicMock()
original_terminate = MagicMock(side_effect=lambda: terminated_event.set())
fake_proc.terminate = original_terminate
def fake_helper(cmd, *, expected_duration_seconds, on_progress, process_started, **kwargs):
if process_started is not None:
process_started(fake_proc)
# Block until stop() calls fake_proc.terminate()
terminated_event.wait(timeout=5)
return -15, "killed"
with (
patch.object(manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("os.remove"),
patch("builtins.open", unittest.mock.mock_open()),
):
manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
replay_name="_replay_front",
clip_path="/tmp/clip.mp4",
frigate_config=frigate_config,
config_publisher=publisher,
)
# Wait for the worker to register the active process
for _ in range(50):
if manager._active_process is fake_proc:
break
time.sleep(0.02)
self.assertEqual(manager.state, ReplayState.preparing_clip)
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
# Camera registered into the live config dict
self.assertIn("_replay_front", frigate_config.cameras)
# Publisher invoked with an add topic
self.assertEqual(publisher.publish_update.call_count, 1)
topic_arg = publisher.publish_update.call_args.args[0]
self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.add)
self.assertTrue(original_terminate.called, "terminate() should have been called")
self.assertEqual(manager.state, ReplayState.idle)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,455 @@
"""Tests for the debug replay job runner and factory."""
import threading
import time
import unittest
import unittest.mock
from unittest.mock import MagicMock, patch
from frigate.debug_replay import DebugReplayManager
from frigate.jobs.debug_replay import (
DebugReplayJob,
cancel_debug_replay_job,
get_active_runner,
start_debug_replay_job,
)
from frigate.jobs.export import JobStatePublisher
from frigate.jobs.manager import _completed_jobs, _current_jobs
from frigate.types import JobStatusTypesEnum
def _reset_job_manager() -> None:
"""Clear the global job manager state between tests."""
_current_jobs.clear()
_completed_jobs.clear()
def _patch_publisher(test_case: unittest.TestCase) -> None:
"""Replace JobStatePublisher.publish with a no-op to avoid hanging on IPC."""
publisher_patch = patch.object(
JobStatePublisher, "publish", lambda self, payload: None
)
publisher_patch.start()
test_case.addCleanup(publisher_patch.stop)
class TestDebugReplayJob(unittest.TestCase):
def test_default_fields(self) -> None:
job = DebugReplayJob()
self.assertEqual(job.job_type, "debug_replay")
self.assertEqual(job.status, JobStatusTypesEnum.queued)
self.assertIsNone(job.current_step)
self.assertEqual(job.progress_percent, 0.0)
def test_to_dict_whitelist(self) -> None:
job = DebugReplayJob(
source_camera="front",
replay_camera_name="_replay_front",
start_ts=100.0,
end_ts=200.0,
)
job.current_step = "preparing_clip"
job.progress_percent = 42.5
payload = job.to_dict()
# Top-level matches the standard Job<TResults> shape.
for key in (
"id",
"job_type",
"status",
"start_time",
"end_time",
"error_message",
"results",
):
self.assertIn(key, payload, f"missing top-level field: {key}")
results = payload["results"]
self.assertEqual(results["source_camera"], "front")
self.assertEqual(results["replay_camera_name"], "_replay_front")
self.assertEqual(results["current_step"], "preparing_clip")
self.assertEqual(results["progress_percent"], 42.5)
self.assertEqual(results["start_ts"], 100.0)
self.assertEqual(results["end_ts"], 200.0)
class TestStartDebugReplayJob(unittest.TestCase):
def setUp(self) -> None:
_reset_job_manager()
_patch_publisher(self)
self.manager = DebugReplayManager()
self.frigate_config = MagicMock()
self.frigate_config.cameras = {"front": MagicMock()}
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
self.publisher = MagicMock()
self.recordings_qs = MagicMock()
self.recordings_qs.count.return_value = 1
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def tearDown(self) -> None:
runner = get_active_runner()
if runner is not None:
runner.cancel()
runner.join(timeout=2.0)
_reset_job_manager()
def test_rejects_unknown_camera(self) -> None:
with self.assertRaises(ValueError):
start_debug_replay_job(
source_camera="missing",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
def test_rejects_invalid_time_range(self) -> None:
with self.assertRaises(ValueError):
start_debug_replay_job(
source_camera="front",
start_ts=200.0,
end_ts=100.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
def test_rejects_when_no_recordings(self) -> None:
empty_qs = MagicMock()
empty_qs.count.return_value = 0
with patch("frigate.jobs.debug_replay.query_recordings", return_value=empty_qs):
with self.assertRaises(ValueError):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
def test_returns_job_id_and_marks_session_starting(self) -> None:
block = threading.Event()
def slow_helper(cmd, **kwargs):
block.wait(timeout=5)
return 0, ""
with (
patch(
"frigate.jobs.debug_replay.query_recordings",
return_value=self.recordings_qs,
),
patch(
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
side_effect=slow_helper,
),
patch.object(self.manager, "publish_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
job_id = start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
self.assertIsInstance(job_id, str)
self.assertTrue(self.manager.active)
self.assertEqual(self.manager.replay_camera_name, "_replay_front")
self.assertEqual(self.manager.source_camera, "front")
block.set()
def test_rejects_concurrent_calls(self) -> None:
block = threading.Event()
def slow_helper(cmd, **kwargs):
block.wait(timeout=5)
return 0, ""
with (
patch(
"frigate.jobs.debug_replay.query_recordings",
return_value=self.recordings_qs,
),
patch(
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
side_effect=slow_helper,
),
patch.object(self.manager, "publish_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
with self.assertRaises(RuntimeError):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
block.set()
class TestRunnerHappyPath(unittest.TestCase):
def setUp(self) -> None:
_reset_job_manager()
_patch_publisher(self)
self.manager = DebugReplayManager()
self.frigate_config = MagicMock()
self.frigate_config.cameras = {"front": MagicMock()}
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
self.publisher = MagicMock()
self.recordings_qs = MagicMock()
self.recordings_qs.count.return_value = 1
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def tearDown(self) -> None:
runner = get_active_runner()
if runner is not None:
runner.cancel()
runner.join(timeout=2.0)
_reset_job_manager()
def _wait_for(self, predicate, timeout: float = 5.0) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return True
time.sleep(0.02)
return False
def test_progress_callback_updates_job_percent(self) -> None:
captured: list[float] = []
def fake_helper(cmd, *, on_progress=None, **kwargs):
on_progress(0.0)
on_progress(50.0)
on_progress(100.0)
return 0, ""
with (
patch(
"frigate.jobs.debug_replay.query_recordings",
return_value=self.recordings_qs,
),
patch(
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
side_effect=fake_helper,
),
patch.object(
self.manager,
"publish_camera",
side_effect=lambda *a, **kw: captured.append("published"),
),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
self.assertTrue(
self._wait_for(lambda: get_active_runner() is None),
"runner did not finish",
)
from frigate.jobs.manager import get_current_job
job = get_current_job("debug_replay")
self.assertIsNotNone(job)
self.assertEqual(job.status, JobStatusTypesEnum.success)
self.assertEqual(job.progress_percent, 100.0)
self.assertEqual(captured, ["published"])
# Manager should have been told the session is ready with the clip path.
self.assertIsNotNone(self.manager.clip_path)
class TestRunnerFailurePath(unittest.TestCase):
def setUp(self) -> None:
_reset_job_manager()
_patch_publisher(self)
self.manager = DebugReplayManager()
self.frigate_config = MagicMock()
self.frigate_config.cameras = {"front": MagicMock()}
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
self.publisher = MagicMock()
self.recordings_qs = MagicMock()
self.recordings_qs.count.return_value = 1
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def tearDown(self) -> None:
runner = get_active_runner()
if runner is not None:
runner.cancel()
runner.join(timeout=2.0)
_reset_job_manager()
def _wait_for(self, predicate, timeout: float = 5.0) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return True
time.sleep(0.02)
return False
def test_ffmpeg_failure_marks_job_failed_and_clears_session(self) -> None:
def failing_helper(cmd, **kwargs):
return 1, "ffmpeg exploded"
with (
patch(
"frigate.jobs.debug_replay.query_recordings",
return_value=self.recordings_qs,
),
patch(
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
side_effect=failing_helper,
),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("os.remove"),
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
self.assertTrue(
self._wait_for(lambda: get_active_runner() is None),
"runner did not finish",
)
from frigate.jobs.manager import get_current_job
job = get_current_job("debug_replay")
self.assertIsNotNone(job)
self.assertEqual(job.status, JobStatusTypesEnum.failed)
self.assertIsNotNone(job.error_message)
self.assertIn("ffmpeg", job.error_message.lower())
# Session cleared so a new /start is allowed
self.assertFalse(self.manager.active)
class TestRunnerCancellation(unittest.TestCase):
def setUp(self) -> None:
_reset_job_manager()
_patch_publisher(self)
self.manager = DebugReplayManager()
self.frigate_config = MagicMock()
self.frigate_config.cameras = {"front": MagicMock()}
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
self.publisher = MagicMock()
self.recordings_qs = MagicMock()
self.recordings_qs.count.return_value = 1
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def tearDown(self) -> None:
runner = get_active_runner()
if runner is not None:
runner.cancel()
runner.join(timeout=2.0)
_reset_job_manager()
def _wait_for(self, predicate, timeout: float = 5.0) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
if predicate():
return True
time.sleep(0.02)
return False
def test_cancel_terminates_ffmpeg_and_marks_cancelled(self) -> None:
terminated = threading.Event()
fake_proc = MagicMock()
fake_proc.terminate = MagicMock(side_effect=lambda: terminated.set())
def fake_helper(cmd, *, process_started=None, **kwargs):
if process_started is not None:
process_started(fake_proc)
terminated.wait(timeout=5)
return -15, "killed"
with (
patch(
"frigate.jobs.debug_replay.query_recordings",
return_value=self.recordings_qs,
),
patch(
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
side_effect=fake_helper,
),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("os.remove"),
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
)
# Wait for the runner to register the active process.
self.assertTrue(
self._wait_for(
lambda: (
get_active_runner() is not None
and get_active_runner()._active_process is fake_proc
)
)
)
cancelled = cancel_debug_replay_job()
self.assertTrue(cancelled)
self.assertTrue(fake_proc.terminate.called)
self.assertTrue(
self._wait_for(lambda: get_active_runner() is None),
"runner did not finish",
)
from frigate.jobs.manager import get_current_job
job = get_current_job("debug_replay")
self.assertEqual(job.status, JobStatusTypesEnum.cancelled)
if __name__ == "__main__":
unittest.main()