mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-07 05:55:27 +03:00
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* use ReplayState enum * extract shared ffmpeg progress helper * make start call non-blocking with worker thread * expose replay state on status endpoint and return 202 from start * cancel in-flight ffmpeg when stop is called during preparation * add replay i18n strings for preparing and error states * show status in replay UI * navigate immediately on 202 from debug replay menus and dialog * remove unused * simplify to use Job infrastructure * tests * cleanup and tweaks * fetch schema * update api spec * formatting * fix e2e test * mypy * clean up * formatting * fix * fix test * don't try to show camera image until status reports ready * simplify loading logic * fix race in latest_frame on debug replay shutdown * remove toast when successfully stopping it gets hidden almost immediately
387 lines
12 KiB
Python
387 lines
12 KiB
Python
"""Debug replay startup job: ffmpeg concat + camera config publish.
|
|
|
|
The runner orchestrates the async portion of starting a debug replay
|
|
session. The DebugReplayManager (in frigate.debug_replay) owns session
|
|
presence so the status bar can keep reading a single `active` flag from
|
|
/debug_replay/status for the entire session window — which is broader
|
|
than this job's lifetime.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import subprocess as sp
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import TYPE_CHECKING, Any, Optional, cast
|
|
|
|
from peewee import ModelSelect
|
|
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.camera.updater import CameraConfigUpdatePublisher
|
|
from frigate.const import REPLAY_CAMERA_PREFIX, REPLAY_DIR
|
|
from frigate.jobs.export import JobStatePublisher
|
|
from frigate.jobs.job import Job
|
|
from frigate.jobs.manager import job_is_running, set_current_job
|
|
from frigate.models import Recordings
|
|
from frigate.types import JobStatusTypesEnum
|
|
from frigate.util.ffmpeg import run_ffmpeg_with_progress
|
|
|
|
if TYPE_CHECKING:
|
|
from frigate.debug_replay import DebugReplayManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Coalesce frequent ffmpeg progress callbacks so the WS isn't flooded.
|
|
PROGRESS_BROADCAST_MIN_INTERVAL = 1.0
|
|
|
|
JOB_TYPE = "debug_replay"
|
|
|
|
STEP_PREPARING_CLIP = "preparing_clip"
|
|
STEP_STARTING_CAMERA = "starting_camera"
|
|
|
|
|
|
_active_runner: Optional["DebugReplayJobRunner"] = None
|
|
_runner_lock = threading.Lock()
|
|
|
|
|
|
def _set_active_runner(runner: Optional["DebugReplayJobRunner"]) -> None:
|
|
global _active_runner
|
|
with _runner_lock:
|
|
_active_runner = runner
|
|
|
|
|
|
def get_active_runner() -> Optional["DebugReplayJobRunner"]:
|
|
with _runner_lock:
|
|
return _active_runner
|
|
|
|
|
|
@dataclass
|
|
class DebugReplayJob(Job):
|
|
"""Job state for a debug replay startup."""
|
|
|
|
job_type: str = JOB_TYPE
|
|
source_camera: str = ""
|
|
replay_camera_name: str = ""
|
|
start_ts: float = 0.0
|
|
end_ts: float = 0.0
|
|
current_step: Optional[str] = None
|
|
progress_percent: float = 0.0
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Whitelisted payload for the job_state WS topic.
|
|
|
|
Replay-specific fields land in results so the frontend's
|
|
generic Job<TResults> type can be parameterised cleanly.
|
|
"""
|
|
return {
|
|
"id": self.id,
|
|
"job_type": self.job_type,
|
|
"status": self.status,
|
|
"start_time": self.start_time,
|
|
"end_time": self.end_time,
|
|
"error_message": self.error_message,
|
|
"results": {
|
|
"current_step": self.current_step,
|
|
"progress_percent": self.progress_percent,
|
|
"source_camera": self.source_camera,
|
|
"replay_camera_name": self.replay_camera_name,
|
|
"start_ts": self.start_ts,
|
|
"end_ts": self.end_ts,
|
|
},
|
|
}
|
|
|
|
|
|
def query_recordings(source_camera: str, start_ts: float, end_ts: float) -> ModelSelect:
|
|
"""Return the Recordings query for the time range.
|
|
|
|
Module-level so tests can patch it without instantiating a runner.
|
|
"""
|
|
query = (
|
|
Recordings.select(
|
|
Recordings.path,
|
|
Recordings.start_time,
|
|
Recordings.end_time,
|
|
)
|
|
.where(
|
|
Recordings.start_time.between(start_ts, end_ts)
|
|
| Recordings.end_time.between(start_ts, end_ts)
|
|
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
|
|
)
|
|
.where(Recordings.camera == source_camera)
|
|
.order_by(Recordings.start_time.asc())
|
|
)
|
|
return cast(ModelSelect, query)
|
|
|
|
|
|
class DebugReplayJobRunner(threading.Thread):
|
|
"""Worker thread that drives the startup job to completion.
|
|
|
|
Owns the live ffmpeg Popen reference for cancellation. Cancellation
|
|
is two-step (threading.Event + proc.terminate()) so the runner
|
|
both knows it should stop and is unblocked from its blocking subprocess
|
|
wait.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
job: DebugReplayJob,
|
|
frigate_config: FrigateConfig,
|
|
config_publisher: CameraConfigUpdatePublisher,
|
|
replay_manager: "DebugReplayManager",
|
|
publisher: Optional[JobStatePublisher] = None,
|
|
) -> None:
|
|
super().__init__(daemon=True, name=f"debug_replay_{job.id}")
|
|
self.job = job
|
|
self.frigate_config = frigate_config
|
|
self.config_publisher = config_publisher
|
|
self.replay_manager = replay_manager
|
|
self.publisher = publisher if publisher is not None else JobStatePublisher()
|
|
self._cancel_event = threading.Event()
|
|
self._active_process: sp.Popen | None = None
|
|
self._proc_lock = threading.Lock()
|
|
self._last_broadcast_monotonic: float = 0.0
|
|
|
|
def cancel(self) -> None:
|
|
"""Request cancellation. Idempotent."""
|
|
self._cancel_event.set()
|
|
with self._proc_lock:
|
|
proc = self._active_process
|
|
if proc is not None:
|
|
try:
|
|
proc.terminate()
|
|
except Exception as exc:
|
|
logger.warning("Failed to terminate ffmpeg subprocess: %s", exc)
|
|
|
|
def is_cancelled(self) -> bool:
|
|
return self._cancel_event.is_set()
|
|
|
|
def _record_proc(self, proc: sp.Popen) -> None:
|
|
with self._proc_lock:
|
|
self._active_process = proc
|
|
# Race: cancel arrived between Popen and _record_proc.
|
|
if self._cancel_event.is_set():
|
|
try:
|
|
proc.terminate()
|
|
except Exception:
|
|
pass
|
|
|
|
def _broadcast(self, force: bool = False) -> None:
|
|
now = time.monotonic()
|
|
if (
|
|
not force
|
|
and now - self._last_broadcast_monotonic < PROGRESS_BROADCAST_MIN_INTERVAL
|
|
):
|
|
return
|
|
self._last_broadcast_monotonic = now
|
|
|
|
try:
|
|
self.publisher.publish(self.job.to_dict())
|
|
except Exception as err:
|
|
logger.warning("Publisher raised during job state broadcast: %s", err)
|
|
|
|
def run(self) -> None:
|
|
replay_name = self.job.replay_camera_name
|
|
os.makedirs(REPLAY_DIR, exist_ok=True)
|
|
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
|
|
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
|
|
|
|
self.job.status = JobStatusTypesEnum.running
|
|
self.job.start_time = time.time()
|
|
self.job.current_step = STEP_PREPARING_CLIP
|
|
self._broadcast(force=True)
|
|
|
|
try:
|
|
recordings = query_recordings(
|
|
self.job.source_camera, self.job.start_ts, self.job.end_ts
|
|
)
|
|
with open(concat_file, "w") as f:
|
|
for recording in recordings:
|
|
f.write(f"file '{recording.path}'\n")
|
|
|
|
ffmpeg_cmd = [
|
|
self.frigate_config.ffmpeg.ffmpeg_path,
|
|
"-hide_banner",
|
|
"-y",
|
|
"-f",
|
|
"concat",
|
|
"-safe",
|
|
"0",
|
|
"-i",
|
|
concat_file,
|
|
"-c",
|
|
"copy",
|
|
"-movflags",
|
|
"+faststart",
|
|
clip_path,
|
|
]
|
|
|
|
logger.info(
|
|
"Generating replay clip for %s (%.1f - %.1f)",
|
|
self.job.source_camera,
|
|
self.job.start_ts,
|
|
self.job.end_ts,
|
|
)
|
|
|
|
def _on_progress(percent: float) -> None:
|
|
self.job.progress_percent = percent
|
|
self._broadcast()
|
|
|
|
try:
|
|
returncode, stderr = run_ffmpeg_with_progress(
|
|
ffmpeg_cmd,
|
|
expected_duration_seconds=max(
|
|
0.0, self.job.end_ts - self.job.start_ts
|
|
),
|
|
on_progress=_on_progress,
|
|
process_started=self._record_proc,
|
|
use_low_priority=True,
|
|
)
|
|
finally:
|
|
with self._proc_lock:
|
|
self._active_process = None
|
|
|
|
if self._cancel_event.is_set():
|
|
self._finalize_cancelled(clip_path)
|
|
return
|
|
|
|
if returncode != 0:
|
|
raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}")
|
|
|
|
if not os.path.exists(clip_path):
|
|
raise RuntimeError("Clip file was not created")
|
|
|
|
self.job.current_step = STEP_STARTING_CAMERA
|
|
self.job.progress_percent = 100.0
|
|
self._broadcast(force=True)
|
|
|
|
if self._cancel_event.is_set():
|
|
self._finalize_cancelled(clip_path)
|
|
return
|
|
|
|
self.replay_manager.publish_camera(
|
|
source_camera=self.job.source_camera,
|
|
replay_name=replay_name,
|
|
clip_path=clip_path,
|
|
frigate_config=self.frigate_config,
|
|
config_publisher=self.config_publisher,
|
|
)
|
|
self.replay_manager.mark_session_ready(clip_path)
|
|
|
|
self.job.status = JobStatusTypesEnum.success
|
|
self.job.end_time = time.time()
|
|
self._broadcast(force=True)
|
|
logger.info(
|
|
"Debug replay started: %s -> %s",
|
|
self.job.source_camera,
|
|
replay_name,
|
|
)
|
|
except Exception as exc:
|
|
logger.exception("Debug replay startup failed")
|
|
self.job.status = JobStatusTypesEnum.failed
|
|
self.job.error_message = str(exc)
|
|
self.job.end_time = time.time()
|
|
self._broadcast(force=True)
|
|
self.replay_manager.clear_session()
|
|
_remove_silent(clip_path)
|
|
finally:
|
|
_remove_silent(concat_file)
|
|
_set_active_runner(None)
|
|
|
|
def _finalize_cancelled(self, clip_path: str) -> None:
|
|
logger.info("Debug replay startup cancelled")
|
|
self.job.status = JobStatusTypesEnum.cancelled
|
|
self.job.end_time = time.time()
|
|
self._broadcast(force=True)
|
|
# The caller of cancel_debug_replay_job (DebugReplayManager.stop) owns
|
|
# session cleanup — db rows, filesystem artifacts, clear_session. We
|
|
# only clean up the partial concat output we created.
|
|
_remove_silent(clip_path)
|
|
|
|
|
|
def _remove_silent(path: str) -> None:
|
|
try:
|
|
if os.path.exists(path):
|
|
os.remove(path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def start_debug_replay_job(
|
|
*,
|
|
source_camera: str,
|
|
start_ts: float,
|
|
end_ts: float,
|
|
frigate_config: FrigateConfig,
|
|
config_publisher: CameraConfigUpdatePublisher,
|
|
replay_manager: "DebugReplayManager",
|
|
) -> str:
|
|
"""Validate, create job, start runner. Returns the job id.
|
|
|
|
Raises ValueError for bad params (camera missing, time range
|
|
invalid, no recordings) and RuntimeError if a session is already
|
|
active.
|
|
"""
|
|
if job_is_running(JOB_TYPE) or replay_manager.active:
|
|
raise RuntimeError("A replay session is already active")
|
|
|
|
if source_camera not in frigate_config.cameras:
|
|
raise ValueError(f"Camera '{source_camera}' not found")
|
|
|
|
if end_ts <= start_ts:
|
|
raise ValueError("End time must be after start time")
|
|
|
|
recordings = query_recordings(source_camera, start_ts, end_ts)
|
|
if not recordings.count():
|
|
raise ValueError(
|
|
f"No recordings found for camera '{source_camera}' in the specified time range"
|
|
)
|
|
|
|
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
|
|
replay_manager.mark_starting(
|
|
source_camera=source_camera,
|
|
replay_camera_name=replay_name,
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
)
|
|
|
|
job = DebugReplayJob(
|
|
source_camera=source_camera,
|
|
replay_camera_name=replay_name,
|
|
start_ts=start_ts,
|
|
end_ts=end_ts,
|
|
)
|
|
set_current_job(job)
|
|
|
|
runner = DebugReplayJobRunner(
|
|
job=job,
|
|
frigate_config=frigate_config,
|
|
config_publisher=config_publisher,
|
|
replay_manager=replay_manager,
|
|
)
|
|
_set_active_runner(runner)
|
|
runner.start()
|
|
|
|
return job.id
|
|
|
|
|
|
def cancel_debug_replay_job() -> bool:
|
|
"""Signal the active runner to cancel.
|
|
|
|
Returns True if a runner was signalled, False if no job was active.
|
|
"""
|
|
runner = get_active_runner()
|
|
if runner is None:
|
|
return False
|
|
runner.cancel()
|
|
return True
|
|
|
|
|
|
def wait_for_runner(timeout: float = 2.0) -> bool:
|
|
"""Join the active runner. Returns True if the runner ended in time."""
|
|
runner = get_active_runner()
|
|
if runner is None:
|
|
return True
|
|
runner.join(timeout=timeout)
|
|
return not runner.is_alive()
|