make start call non-blocking with worker thread

This commit is contained in:
Josh Hawkins 2026-05-02 22:59:10 -05:00
parent 5fe58d9aa6
commit e187446d04
2 changed files with 344 additions and 97 deletions

View File

@ -25,6 +25,7 @@ from frigate.const import (
from frigate.models import Recordings
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
from frigate.util.config import find_config_file
from frigate.util.ffmpeg import run_ffmpeg_with_progress
logger = logging.getLogger(__name__)
@ -58,6 +59,9 @@ class DebugReplayManager:
self.clip_path: str | None = None
self.start_ts: float | None = None
self.end_ts: float | None = None
self._active_process: sp.Popen | None = None
self._worker_thread: threading.Thread | None = None
self.progress_percent: float | None = None
@property
def state(self) -> ReplayState:
@ -78,6 +82,8 @@ class DebugReplayManager:
"""Internal state transition helper. Always pair `error` with an error_message."""
self._state = state
self.error_message = error_message if state == ReplayState.error else None
if state in (ReplayState.idle, ReplayState.error):
self.progress_percent = None
def start(
self,
@ -87,7 +93,10 @@ class DebugReplayManager:
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> str:
"""Start a debug replay session.
"""Validate inputs and kick off async startup. Returns immediately.
The clip generation, config build, and camera publish run on a worker
thread. Poll `state` / `error_message` to track progress.
Args:
source_camera: Name of the source camera to replay
@ -97,36 +106,58 @@ class DebugReplayManager:
config_publisher: Publisher for camera config updates
Returns:
The replay camera name
The replay camera name (deterministic from source_camera)
Raises:
ValueError: If a session is already active or parameters are invalid
RuntimeError: If clip generation fails
"""
with self._lock:
return self._start_locked(
source_camera, start_ts, end_ts, frigate_config, config_publisher
)
if self.active:
raise ValueError("A replay session is already active")
def _start_locked(
self,
source_camera: str,
start_ts: float,
end_ts: float,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> str:
if self.active:
raise ValueError("A replay session is already active")
if source_camera not in frigate_config.cameras:
raise ValueError(f"Camera '{source_camera}' not found")
if source_camera not in frigate_config.cameras:
raise ValueError(f"Camera '{source_camera}' not found")
if end_ts <= start_ts:
raise ValueError("End time must be after start time")
if end_ts <= start_ts:
raise ValueError("End time must be after start time")
recordings = self._query_recordings(source_camera, start_ts, end_ts)
if not recordings.count():
raise ValueError(
f"No recordings found for camera '{source_camera}' in the specified time range"
)
# Query recordings for the source camera in the time range
recordings = (
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
self.replay_camera_name = replay_name
self.source_camera = source_camera
self.start_ts = start_ts
self.end_ts = end_ts
self.progress_percent = None
self._set_state(ReplayState.preparing_clip)
worker = threading.Thread(
target=self._run_start_worker,
name=f"debug-replay-start-{replay_name}",
args=(source_camera, start_ts, end_ts, frigate_config, config_publisher),
daemon=True,
)
self._worker_thread = worker
worker.start()
return replay_name
def _query_recordings(self, source_camera: str, start_ts: float, end_ts: float):
"""Return the Recordings query for the time range. Extracted so tests can patch.
Args:
source_camera: Name of the source camera
start_ts: Start timestamp
end_ts: End timestamp
Returns:
Peewee query for recordings in the time range
"""
return (
Recordings.select(
Recordings.path,
Recordings.start_time,
@ -141,79 +172,148 @@ class DebugReplayManager:
.order_by(Recordings.start_time.asc())
)
if not recordings.count():
raise ValueError(
f"No recordings found for camera '{source_camera}' in the specified time range"
)
def _run_start_worker(
self,
source_camera: str,
start_ts: float,
end_ts: float,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> None:
"""Worker thread body — runs ffmpeg and publishes the camera config.
Args:
source_camera: Name of the source camera to replay
start_ts: Start timestamp
end_ts: End timestamp
frigate_config: Current Frigate configuration
config_publisher: Publisher for camera config updates
"""
replay_name = self.replay_camera_name
if replay_name is None:
return
# Create replay directory
os.makedirs(REPLAY_DIR, exist_ok=True)
# Generate replay camera name
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
# Build concat file for ffmpeg
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
with open(concat_file, "w") as f:
for recording in recordings:
f.write(f"file '{recording.path}'\n")
# Concatenate recordings into a single clip with -c copy (fast)
ffmpeg_cmd = [
frigate_config.ffmpeg.ffmpeg_path,
"-hide_banner",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
concat_file,
"-c",
"copy",
"-movflags",
"+faststart",
clip_path,
]
logger.info(
"Generating replay clip for %s (%.1f - %.1f)",
source_camera,
start_ts,
end_ts,
)
try:
result = sp.run(
ffmpeg_cmd,
capture_output=True,
text=True,
timeout=120,
recordings = self._query_recordings(source_camera, start_ts, end_ts)
with open(concat_file, "w") as f:
for recording in recordings:
f.write(f"file '{recording.path}'\n")
ffmpeg_cmd = [
frigate_config.ffmpeg.ffmpeg_path,
"-hide_banner",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
concat_file,
"-c",
"copy",
"-movflags",
"+faststart",
clip_path,
]
logger.info(
"Generating replay clip for %s (%.1f - %.1f)",
source_camera,
start_ts,
end_ts,
)
if result.returncode != 0:
logger.error("FFmpeg error: %s", result.stderr)
raise RuntimeError(
f"Failed to generate replay clip: {result.stderr[-500:]}"
def _record_proc(p: sp.Popen) -> None:
self._active_process = p
def _on_progress(percent: float) -> None:
self.progress_percent = percent
try:
returncode, stderr = run_ffmpeg_with_progress(
ffmpeg_cmd,
expected_duration_seconds=max(0.0, end_ts - start_ts),
on_progress=_on_progress,
process_started=_record_proc,
use_low_priority=True,
)
except sp.TimeoutExpired:
raise RuntimeError("Clip generation timed out")
finally:
self._active_process = None
if returncode != 0:
raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}")
if not os.path.exists(clip_path):
raise RuntimeError("Clip file was not created")
with self._lock:
# If stop() ran while we were preparing, bail out cleanly.
if self._state != ReplayState.preparing_clip:
logger.info(
"Replay startup aborted (state=%s); discarding clip",
self._state,
)
return
self._set_state(ReplayState.starting_camera)
self._publish_replay_camera(
source_camera, replay_name, clip_path, frigate_config, config_publisher
)
with self._lock:
self.clip_path = clip_path
self._set_state(ReplayState.active)
logger.info("Debug replay started: %s -> %s", source_camera, replay_name)
except Exception as exc:
logger.exception("Debug replay startup failed")
with self._lock:
self._set_state(ReplayState.error, error_message=str(exc))
# Drop session pointers so the next /start is allowed.
self.replay_camera_name = None
self.source_camera = None
self.clip_path = None
self.start_ts = None
self.end_ts = None
# Best-effort cleanup of any partial clip on disk.
try:
if os.path.exists(clip_path):
os.remove(clip_path)
except OSError:
pass
finally:
# Clean up concat file
if os.path.exists(concat_file):
os.remove(concat_file)
try:
if os.path.exists(concat_file):
os.remove(concat_file)
except OSError:
pass
if not os.path.exists(clip_path):
raise RuntimeError("Clip file was not created")
def _publish_replay_camera(
self,
source_camera: str,
replay_name: str,
clip_path: str,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
) -> None:
"""Build the in-memory camera config and publish the add event.
# Build camera config dict for the replay camera
Args:
source_camera: Name of the source camera
replay_name: Name for the replay camera
clip_path: Path to the replay clip file
frigate_config: Current Frigate configuration
config_publisher: Publisher for camera config updates
"""
source_config = frigate_config.cameras[source_camera]
camera_dict = self._build_camera_config_dict(
source_config, replay_name, clip_path
)
# Build an in-memory config with the replay camera added
config_file = find_config_file()
yaml_parser = YAML()
with open(config_file, "r") as f:
@ -223,31 +323,14 @@ class DebugReplayManager:
config_data["cameras"] = {}
config_data["cameras"][replay_name] = camera_dict
try:
new_config = FrigateConfig.parse_object(config_data)
except Exception as e:
raise RuntimeError(f"Failed to validate replay camera config: {e}")
# Update the running config
new_config = FrigateConfig.parse_object(config_data)
frigate_config.cameras[replay_name] = new_config.cameras[replay_name]
# Publish the add event
config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.add, replay_name),
new_config.cameras[replay_name],
)
# Store session state
self.replay_camera_name = replay_name
self.source_camera = source_camera
self.clip_path = clip_path
self.start_ts = start_ts
self.end_ts = end_ts
self._set_state(ReplayState.active)
logger.info("Debug replay started: %s -> %s", source_camera, replay_name)
return replay_name
def stop(
self,
frigate_config: FrigateConfig,

View File

@ -1,6 +1,10 @@
"""Tests for DebugReplayManager state machine and async startup."""
import threading
import time
import unittest
import unittest.mock
from unittest.mock import MagicMock, patch
from frigate.debug_replay import DebugReplayManager, ReplayState
@ -34,3 +38,163 @@ class TestDebugReplayManagerState(unittest.TestCase):
manager._set_state(ReplayState.error, error_message="boom")
self.assertFalse(manager.active)
self.assertEqual(manager.error_message, "boom")
class TestDebugReplayManagerAsyncStart(unittest.TestCase):
def setUp(self):
self.manager = DebugReplayManager()
self.frigate_config = MagicMock()
self.frigate_config.cameras = {"front": MagicMock()}
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
self.publisher = MagicMock()
def test_progress_percent_tracks_helper_callbacks(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
on_progress(0.0)
on_progress(42.5)
on_progress(100.0)
return 0, ""
with (
patch.object(self.manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper),
patch.object(self.manager, "_publish_replay_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
for _ in range(50):
if self.manager.state == ReplayState.active:
break
time.sleep(0.05)
# Progress should have advanced through the callback values.
self.assertEqual(self.manager.state, ReplayState.active)
self.assertEqual(self.manager.progress_percent, 100.0)
def test_start_returns_immediately_with_preparing_state(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter(
[MagicMock(path="/tmp/r1.mp4")]
)
# Block the worker thread before it transitions out of preparing_clip.
worker_can_proceed = threading.Event()
def fake_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
worker_can_proceed.wait(timeout=5)
return 0, ""
with (
patch.object(
self.manager,
"_query_recordings",
return_value=recordings_qs,
),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper),
patch.object(self.manager, "_publish_replay_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
replay_name = self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
# Returned synchronously
self.assertTrue(replay_name.startswith("_replay_"))
self.assertEqual(self.manager.state, ReplayState.preparing_clip)
worker_can_proceed.set()
# Wait for worker to finish
for _ in range(50):
if self.manager.state == ReplayState.active:
break
time.sleep(0.05)
self.assertEqual(self.manager.state, ReplayState.active)
def test_start_rejects_concurrent_calls_with_value_error(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
block = threading.Event()
def slow_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
block.wait(timeout=5)
return 0, ""
with (
patch.object(self.manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=slow_helper),
patch.object(self.manager, "_publish_replay_camera"),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
with self.assertRaises(ValueError):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
block.set()
def test_start_transitions_to_error_state_when_ffmpeg_fails(self):
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
def failing_helper(cmd, *, expected_duration_seconds, on_progress, **kwargs):
return 1, "ffmpeg exploded"
with (
patch.object(self.manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=failing_helper),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("builtins.open", unittest.mock.mock_open()),
):
self.manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=self.frigate_config,
config_publisher=self.publisher,
)
for _ in range(50):
if self.manager.state == ReplayState.error:
break
time.sleep(0.05)
self.assertEqual(self.manager.state, ReplayState.error)
self.assertIsNotNone(self.manager.error_message)
self.assertIn("ffmpeg", self.manager.error_message.lower())