Refactor debug replay to use sources for dynamic playback

This commit is contained in:
Nicolas Mowen 2026-05-18 15:44:41 -06:00
parent 77eaf9891e
commit 3127952472
4 changed files with 182 additions and 77 deletions

View File

@ -11,7 +11,11 @@ from pydantic import BaseModel, Field
from frigate.api.auth import require_role
from frigate.api.defs.tags import Tags
from frigate.jobs.debug_replay import start_debug_replay_job
from frigate.jobs.debug_replay import (
ExportDebugReplaySource,
RecordingDebugReplaySource,
start_debug_replay_job,
)
from frigate.models import Export
from frigate.util.services import get_video_properties
@ -82,13 +86,16 @@ class DebugReplayStopResponse(BaseModel):
async def start_debug_replay(request: Request, body: DebugReplayStartBody):
"""Start a debug replay session asynchronously."""
replay_manager = request.app.replay_manager
source = RecordingDebugReplaySource(
source_camera=body.camera,
start_ts=body.start_time,
end_ts=body.end_time,
)
try:
job_id = await asyncio.to_thread(
start_debug_replay_job,
source_camera=body.camera,
start_ts=body.start_time,
end_ts=body.end_time,
source=source,
frigate_config=request.app.frigate_config,
config_publisher=request.app.config_publisher,
replay_manager=replay_manager,
@ -147,7 +154,6 @@ async def start_debug_replay_from_export(
status_code=404,
)
start_ts = datetime.timestamp(export.date)
properties = await get_video_properties(
request.app.frigate_config.ffmpeg, export.video_path, get_duration=True
)
@ -162,15 +168,13 @@ async def start_debug_replay_from_export(
status_code=400,
)
end_ts = start_ts + duration
replay_manager = request.app.replay_manager
source = ExportDebugReplaySource(export=export, duration=float(duration))
try:
job_id = await asyncio.to_thread(
start_debug_replay_job,
source_camera=export.camera,
start_ts=start_ts,
end_ts=end_ts,
source=source,
frigate_config=request.app.frigate_config,
config_publisher=request.app.config_publisher,
replay_manager=replay_manager,

View File

@ -12,7 +12,9 @@ import os
import subprocess as sp
import threading
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional, cast
from peewee import ModelSelect
@ -23,7 +25,7 @@ from frigate.const import REPLAY_CAMERA_PREFIX, REPLAY_DIR
from frigate.jobs.export import JobStatePublisher
from frigate.jobs.job import Job
from frigate.jobs.manager import job_is_running, set_current_job
from frigate.models import Recordings
from frigate.models import Export, Recordings
from frigate.types import JobStatusTypesEnum
from frigate.util.ffmpeg import run_ffmpeg_with_progress
@ -114,6 +116,121 @@ def query_recordings(source_camera: str, start_ts: float, end_ts: float) -> Mode
return cast(ModelSelect, query)
class DebugReplaySource(ABC):
"""Abstract source for a debug replay session.
Provides the camera identity and time range the replay represents,
validates that usable content exists, and supplies the ffmpeg input
args used to build the replay clip.
"""
@property
@abstractmethod
def source_camera(self) -> str:
"""Camera name the replay is derived from."""
@property
@abstractmethod
def start_ts(self) -> float:
"""Unix timestamp marking the start of the replay range."""
@property
@abstractmethod
def end_ts(self) -> float:
"""Unix timestamp marking the end of the replay range."""
@abstractmethod
def validate(self) -> None:
"""Raise ValueError if the source has no usable content."""
@abstractmethod
def ffmpeg_input_args(self, working_dir: str) -> list[str]:
"""Return ffmpeg input args (including -i). May write temp files in working_dir."""
def cleanup(self, working_dir: str) -> None:
"""Remove any temp files the source created in working_dir. Default no-op."""
class RecordingDebugReplaySource(DebugReplaySource):
"""Replay source backed by the Recordings table.
Builds a concat playlist of recording files covering the time range
and feeds it to ffmpeg's concat demuxer.
"""
def __init__(self, source_camera: str, start_ts: float, end_ts: float) -> None:
self._camera = source_camera
self._start_ts = start_ts
self._end_ts = end_ts
self._concat_file: Optional[str] = None
@property
def source_camera(self) -> str:
return self._camera
@property
def start_ts(self) -> float:
return self._start_ts
@property
def end_ts(self) -> float:
return self._end_ts
def validate(self) -> None:
if self._end_ts <= self._start_ts:
raise ValueError("End time must be after start time")
if not query_recordings(self._camera, self._start_ts, self._end_ts).count():
raise ValueError(
f"No recordings found for camera '{self._camera}' in the specified time range"
)
def ffmpeg_input_args(self, working_dir: str) -> list[str]:
replay_name = f"{REPLAY_CAMERA_PREFIX}{self._camera}"
concat_file = os.path.join(working_dir, f"{replay_name}_concat.txt")
recordings = query_recordings(self._camera, self._start_ts, self._end_ts)
with open(concat_file, "w") as f:
for recording in recordings:
f.write(f"file '{recording.path}'\n")
self._concat_file = concat_file
return ["-f", "concat", "-safe", "0", "-i", concat_file]
def cleanup(self, working_dir: str) -> None:
if self._concat_file:
_remove_silent(self._concat_file)
class ExportDebugReplaySource(DebugReplaySource):
"""Replay source backed by an existing Export.
Uses the export's video file directly as the ffmpeg input — does not
require recordings to still exist for the time range.
"""
def __init__(self, export: Export, duration: float) -> None:
self._export = export
self._duration = duration
@property
def source_camera(self) -> str:
return self._export.camera
@property
def start_ts(self) -> float:
return datetime.timestamp(self._export.date)
@property
def end_ts(self) -> float:
return self.start_ts + self._duration
def validate(self) -> None:
if not os.path.exists(self._export.video_path):
raise ValueError(f"Export video file not found: {self._export.video_path}")
def ffmpeg_input_args(self, working_dir: str) -> list[str]:
return ["-i", self._export.video_path]
class DebugReplayJobRunner(threading.Thread):
"""Worker thread that drives the startup job to completion.
@ -126,6 +243,7 @@ class DebugReplayJobRunner(threading.Thread):
def __init__(
self,
job: DebugReplayJob,
source: DebugReplaySource,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
replay_manager: "DebugReplayManager",
@ -133,6 +251,7 @@ class DebugReplayJobRunner(threading.Thread):
) -> None:
super().__init__(daemon=True, name=f"debug_replay_{job.id}")
self.job = job
self.source = source
self.frigate_config = frigate_config
self.config_publisher = config_publisher
self.replay_manager = replay_manager
@ -183,7 +302,6 @@ class DebugReplayJobRunner(threading.Thread):
def run(self) -> None:
replay_name = self.job.replay_camera_name
os.makedirs(REPLAY_DIR, exist_ok=True)
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
self.job.status = JobStatusTypesEnum.running
@ -192,23 +310,13 @@ class DebugReplayJobRunner(threading.Thread):
self._broadcast(force=True)
try:
recordings = query_recordings(
self.job.source_camera, self.job.start_ts, self.job.end_ts
)
with open(concat_file, "w") as f:
for recording in recordings:
f.write(f"file '{recording.path}'\n")
input_args = self.source.ffmpeg_input_args(REPLAY_DIR)
ffmpeg_cmd = [
self.frigate_config.ffmpeg.ffmpeg_path,
"-hide_banner",
"-y",
"-f",
"concat",
"-safe",
"0",
"-i",
concat_file,
*input_args,
"-c",
"copy",
"-movflags",
@ -285,7 +393,7 @@ class DebugReplayJobRunner(threading.Thread):
self.replay_manager.clear_session()
_remove_silent(clip_path)
finally:
_remove_silent(concat_file)
self.source.cleanup(REPLAY_DIR)
_set_active_runner(None)
def _finalize_cancelled(self, clip_path: str) -> None:
@ -309,52 +417,43 @@ def _remove_silent(path: str) -> None:
def start_debug_replay_job(
*,
source_camera: str,
start_ts: float,
end_ts: float,
source: DebugReplaySource,
frigate_config: FrigateConfig,
config_publisher: CameraConfigUpdatePublisher,
replay_manager: "DebugReplayManager",
) -> str:
"""Validate, create job, start runner. Returns the job id.
Raises ValueError for bad params (camera missing, time range
invalid, no recordings) and RuntimeError if a session is already
active.
Raises ValueError for an invalid source (camera missing, source has
no usable content) and RuntimeError if a session is already active.
"""
if job_is_running(JOB_TYPE) or replay_manager.active:
raise RuntimeError("A replay session is already active")
if source_camera not in frigate_config.cameras:
raise ValueError(f"Camera '{source_camera}' not found")
if source.source_camera not in frigate_config.cameras:
raise ValueError(f"Camera '{source.source_camera}' not found")
if end_ts <= start_ts:
raise ValueError("End time must be after start time")
source.validate()
recordings = query_recordings(source_camera, start_ts, end_ts)
if not recordings.count():
raise ValueError(
f"No recordings found for camera '{source_camera}' in the specified time range"
)
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
replay_name = f"{REPLAY_CAMERA_PREFIX}{source.source_camera}"
replay_manager.mark_starting(
source_camera=source_camera,
source_camera=source.source_camera,
replay_camera_name=replay_name,
start_ts=start_ts,
end_ts=end_ts,
start_ts=source.start_ts,
end_ts=source.end_ts,
)
job = DebugReplayJob(
source_camera=source_camera,
source_camera=source.source_camera,
replay_camera_name=replay_name,
start_ts=start_ts,
end_ts=end_ts,
start_ts=source.start_ts,
end_ts=source.end_ts,
)
set_current_job(job)
runner = DebugReplayJobRunner(
job=job,
source=source,
frigate_config=frigate_config,
config_publisher=config_publisher,
replay_manager=replay_manager,

View File

@ -15,11 +15,12 @@ class TestDebugReplayAPI(BaseTestHttp):
# Stub the factory to skip validation/threading and just record the
# name on the manager the way the real factory's mark_starting would.
def fake_start(**kwargs):
source = kwargs["source"]
kwargs["replay_manager"].mark_starting(
source_camera=kwargs["source_camera"],
source_camera=source.source_camera,
replay_camera_name="_replay_front",
start_ts=kwargs["start_ts"],
end_ts=kwargs["end_ts"],
start_ts=source.start_ts,
end_ts=source.end_ts,
)
return "job-1234"

View File

@ -9,6 +9,7 @@ from unittest.mock import MagicMock, patch
from frigate.debug_replay import DebugReplayManager
from frigate.jobs.debug_replay import (
DebugReplayJob,
RecordingDebugReplaySource,
cancel_debug_replay_job,
get_active_runner,
start_debug_replay_job,
@ -99,9 +100,9 @@ class TestStartDebugReplayJob(unittest.TestCase):
def test_rejects_unknown_camera(self) -> None:
with self.assertRaises(ValueError):
start_debug_replay_job(
source_camera="missing",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="missing", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -110,9 +111,9 @@ class TestStartDebugReplayJob(unittest.TestCase):
def test_rejects_invalid_time_range(self) -> None:
with self.assertRaises(ValueError):
start_debug_replay_job(
source_camera="front",
start_ts=200.0,
end_ts=100.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=200.0, end_ts=100.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -124,9 +125,9 @@ class TestStartDebugReplayJob(unittest.TestCase):
with patch("frigate.jobs.debug_replay.query_recordings", return_value=empty_qs):
with self.assertRaises(ValueError):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -154,9 +155,9 @@ class TestStartDebugReplayJob(unittest.TestCase):
patch("builtins.open", unittest.mock.mock_open()),
):
job_id = start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -191,9 +192,9 @@ class TestStartDebugReplayJob(unittest.TestCase):
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -201,9 +202,9 @@ class TestStartDebugReplayJob(unittest.TestCase):
with self.assertRaises(RuntimeError):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -269,9 +270,9 @@ class TestRunnerHappyPath(unittest.TestCase):
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -340,9 +341,9 @@ class TestRunnerFailurePath(unittest.TestCase):
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,
@ -418,9 +419,9 @@ class TestRunnerCancellation(unittest.TestCase):
patch("builtins.open", unittest.mock.mock_open()),
):
start_debug_replay_job(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
source=RecordingDebugReplaySource(
source_camera="front", start_ts=100.0, end_ts=200.0
),
frigate_config=self.frigate_config,
config_publisher=self.publisher,
replay_manager=self.manager,