mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-07 05:55:27 +03:00
simplify to use Job infrastructure
This commit is contained in:
parent
cbefc3acc4
commit
5eee65984a
@ -10,6 +10,7 @@ from pydantic import BaseModel, Field
|
||||
|
||||
from frigate.api.auth import require_role
|
||||
from frigate.api.defs.tags import Tags
|
||||
from frigate.jobs.debug_replay import start_debug_replay_job
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -29,16 +30,19 @@ class DebugReplayStartResponse(BaseModel):
|
||||
|
||||
success: bool
|
||||
replay_camera: str
|
||||
state: str
|
||||
job_id: str
|
||||
|
||||
|
||||
class DebugReplayStatusResponse(BaseModel):
|
||||
"""Response for debug replay status."""
|
||||
"""Response for debug replay status.
|
||||
|
||||
Returns only session-presence fields. Startup progress and error
|
||||
details flow through the job_state WebSocket topic via the
|
||||
``debug_replay`` job (see :mod:`frigate.jobs.debug_replay`); the
|
||||
Replay page subscribes there with ``useJobStatus("debug_replay")``.
|
||||
"""
|
||||
|
||||
active: bool
|
||||
state: str
|
||||
progress_percent: float | None = None
|
||||
error_message: str | None = None
|
||||
replay_camera: str | None = None
|
||||
source_camera: str | None = None
|
||||
start_time: float | None = None
|
||||
@ -58,30 +62,30 @@ class DebugReplayStopResponse(BaseModel):
|
||||
dependencies=[Depends(require_role(["admin"]))],
|
||||
summary="Start debug replay",
|
||||
description="Start a debug replay session from camera recordings. Returns "
|
||||
"immediately while clip generation runs asynchronously; poll "
|
||||
"/debug_replay/status to track progress.",
|
||||
"immediately while clip generation runs as a background job; subscribe "
|
||||
"to the 'debug_replay' job_state WS topic to track progress.",
|
||||
)
|
||||
async def start_debug_replay(request: Request, body: DebugReplayStartBody):
|
||||
"""Start a debug replay session asynchronously."""
|
||||
replay_manager = request.app.replay_manager
|
||||
|
||||
if replay_manager.active:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "A replay session is already active",
|
||||
},
|
||||
status_code=409,
|
||||
)
|
||||
|
||||
try:
|
||||
replay_camera = await asyncio.to_thread(
|
||||
replay_manager.start,
|
||||
job_id = await asyncio.to_thread(
|
||||
start_debug_replay_job,
|
||||
source_camera=body.camera,
|
||||
start_ts=body.start_time,
|
||||
end_ts=body.end_time,
|
||||
frigate_config=request.app.frigate_config,
|
||||
config_publisher=request.app.config_publisher,
|
||||
replay_manager=replay_manager,
|
||||
)
|
||||
except RuntimeError as exc:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": str(exc),
|
||||
},
|
||||
status_code=409,
|
||||
)
|
||||
except ValueError as exc:
|
||||
logger.info("Rejected debug replay start request: %s", exc)
|
||||
@ -96,8 +100,8 @@ async def start_debug_replay(request: Request, body: DebugReplayStartBody):
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": True,
|
||||
"replay_camera": replay_camera,
|
||||
"state": replay_manager.state.value,
|
||||
"replay_camera": replay_manager.replay_camera_name,
|
||||
"job_id": job_id,
|
||||
},
|
||||
status_code=202,
|
||||
)
|
||||
@ -119,7 +123,11 @@ def get_debug_replay_status(request: Request):
|
||||
|
||||
if replay_manager.active and replay_camera:
|
||||
frame_processor = request.app.detected_frames_processor
|
||||
frame = frame_processor.get_current_frame(replay_camera)
|
||||
frame = (
|
||||
frame_processor.get_current_frame(replay_camera)
|
||||
if frame_processor is not None
|
||||
else None
|
||||
)
|
||||
|
||||
if frame is not None:
|
||||
frame_time = frame_processor.get_current_frame_time(replay_camera)
|
||||
@ -133,9 +141,6 @@ def get_debug_replay_status(request: Request):
|
||||
|
||||
return DebugReplayStatusResponse(
|
||||
active=replay_manager.active,
|
||||
state=replay_manager.state.value,
|
||||
progress_percent=replay_manager.progress_percent,
|
||||
error_message=replay_manager.error_message,
|
||||
replay_camera=replay_camera,
|
||||
source_camera=replay_manager.source_camera,
|
||||
start_time=replay_manager.start_ts,
|
||||
|
||||
@ -1,11 +1,14 @@
|
||||
"""Debug replay camera management for replaying recordings with detection overlays."""
|
||||
"""Debug replay camera management for replaying recordings with detection overlays.
|
||||
|
||||
The startup work (ffmpeg concat + camera config publish) lives in
|
||||
``frigate.jobs.debug_replay``. This module owns only session presence
|
||||
(``active``), session metadata, and post-session cleanup.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess as sp
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from ruamel.yaml import YAML
|
||||
|
||||
@ -22,280 +25,77 @@ from frigate.const import (
|
||||
REPLAY_DIR,
|
||||
THUMB_DIR,
|
||||
)
|
||||
from frigate.models import Recordings
|
||||
from frigate.jobs.debug_replay import cancel_debug_replay_job, wait_for_runner
|
||||
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
|
||||
from frigate.util.config import find_config_file
|
||||
from frigate.util.ffmpeg import run_ffmpeg_with_progress
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ReplayState(str, Enum):
|
||||
"""State of the debug replay session lifecycle.
|
||||
|
||||
idle: no session
|
||||
preparing_clip: ffmpeg concat is running, no replay camera yet
|
||||
starting_camera: clip ready, publishing camera config update
|
||||
active: replay camera is published; first frame may not have arrived yet
|
||||
error: startup failed; error_message is set
|
||||
"""
|
||||
|
||||
idle = "idle"
|
||||
preparing_clip = "preparing_clip"
|
||||
starting_camera = "starting_camera"
|
||||
active = "active"
|
||||
error = "error"
|
||||
|
||||
|
||||
class DebugReplayManager:
|
||||
"""Manages a single debug replay session."""
|
||||
"""Owns the lifecycle pointers for a single debug replay session.
|
||||
|
||||
A session exists from the moment ``mark_starting`` is called (synchronously,
|
||||
inside the API handler) until ``clear_session`` runs (on success cleanup,
|
||||
failure, or stop). The ``active`` property is the source of truth that the
|
||||
status bar consumes — broader than the startup job, which only covers the
|
||||
preparing_clip / starting_camera window.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = threading.Lock()
|
||||
self._state: ReplayState = ReplayState.idle
|
||||
self.error_message: str | None = None
|
||||
self.replay_camera_name: str | None = None
|
||||
self.source_camera: str | None = None
|
||||
self.clip_path: str | None = None
|
||||
self.start_ts: float | None = None
|
||||
self.end_ts: float | None = None
|
||||
self._active_process: sp.Popen | None = None
|
||||
self._worker_thread: threading.Thread | None = None
|
||||
self.progress_percent: float | None = None
|
||||
|
||||
@property
|
||||
def state(self) -> ReplayState:
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def active(self) -> bool:
|
||||
"""Whether a replay session is in progress (preparing, starting, or active)."""
|
||||
return self._state in (
|
||||
ReplayState.preparing_clip,
|
||||
ReplayState.starting_camera,
|
||||
ReplayState.active,
|
||||
)
|
||||
"""True from ``mark_starting`` until ``clear_session``."""
|
||||
return self.replay_camera_name is not None
|
||||
|
||||
def _set_state(
|
||||
self, state: ReplayState, error_message: str | None = None
|
||||
) -> None:
|
||||
"""Internal state transition helper. Always pair `error` with an error_message."""
|
||||
self._state = state
|
||||
self.error_message = error_message if state == ReplayState.error else None
|
||||
if state in (ReplayState.idle, ReplayState.error):
|
||||
self.progress_percent = None
|
||||
|
||||
def start(
|
||||
def mark_starting(
|
||||
self,
|
||||
source_camera: str,
|
||||
replay_camera_name: str,
|
||||
start_ts: float,
|
||||
end_ts: float,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> str:
|
||||
"""Validate inputs and kick off async startup. Returns immediately.
|
||||
) -> None:
|
||||
"""Synchronously claim the session before the job runner starts.
|
||||
|
||||
The clip generation, config build, and camera publish run on a worker
|
||||
thread. Poll `state` / `error_message` to track progress.
|
||||
|
||||
Args:
|
||||
source_camera: Name of the source camera to replay
|
||||
start_ts: Start timestamp
|
||||
end_ts: End timestamp
|
||||
frigate_config: Current Frigate configuration
|
||||
config_publisher: Publisher for camera config updates
|
||||
|
||||
Returns:
|
||||
The replay camera name (deterministic from source_camera)
|
||||
|
||||
Raises:
|
||||
ValueError: If a session is already active or parameters are invalid
|
||||
Called inside the API handler so the status bar sees ``active=True``
|
||||
immediately, before the worker thread does any ffmpeg work.
|
||||
"""
|
||||
with self._lock:
|
||||
if self.active:
|
||||
raise ValueError("A replay session is already active")
|
||||
|
||||
if source_camera not in frigate_config.cameras:
|
||||
raise ValueError(f"Camera '{source_camera}' not found")
|
||||
|
||||
if end_ts <= start_ts:
|
||||
raise ValueError("End time must be after start time")
|
||||
|
||||
recordings = self._query_recordings(source_camera, start_ts, end_ts)
|
||||
if not recordings.count():
|
||||
raise ValueError(
|
||||
f"No recordings found for camera '{source_camera}' in the specified time range"
|
||||
)
|
||||
|
||||
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
|
||||
self.replay_camera_name = replay_name
|
||||
self.replay_camera_name = replay_camera_name
|
||||
self.source_camera = source_camera
|
||||
self.start_ts = start_ts
|
||||
self.end_ts = end_ts
|
||||
self.progress_percent = None
|
||||
self._set_state(ReplayState.preparing_clip)
|
||||
self.clip_path = None
|
||||
|
||||
worker = threading.Thread(
|
||||
target=self._run_start_worker,
|
||||
name=f"debug-replay-start-{replay_name}",
|
||||
args=(source_camera, start_ts, end_ts, frigate_config, config_publisher),
|
||||
daemon=True,
|
||||
)
|
||||
self._worker_thread = worker
|
||||
worker.start()
|
||||
def mark_session_ready(self, clip_path: str) -> None:
|
||||
"""Record the on-disk clip path after the camera has been published."""
|
||||
with self._lock:
|
||||
self.clip_path = clip_path
|
||||
|
||||
return replay_name
|
||||
def clear_session(self) -> None:
|
||||
"""Reset session pointers without publishing camera removal.
|
||||
|
||||
def _query_recordings(self, source_camera: str, start_ts: float, end_ts: float):
|
||||
"""Return the Recordings query for the time range. Extracted so tests can patch.
|
||||
|
||||
Args:
|
||||
source_camera: Name of the source camera
|
||||
start_ts: Start timestamp
|
||||
end_ts: End timestamp
|
||||
|
||||
Returns:
|
||||
Peewee query for recordings in the time range
|
||||
Used by the job runner on failure paths. ``stop()`` does the camera
|
||||
teardown plus this clear in one step.
|
||||
"""
|
||||
return (
|
||||
Recordings.select(
|
||||
Recordings.path,
|
||||
Recordings.start_time,
|
||||
Recordings.end_time,
|
||||
)
|
||||
.where(
|
||||
Recordings.start_time.between(start_ts, end_ts)
|
||||
| Recordings.end_time.between(start_ts, end_ts)
|
||||
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
|
||||
)
|
||||
.where(Recordings.camera == source_camera)
|
||||
.order_by(Recordings.start_time.asc())
|
||||
)
|
||||
with self._lock:
|
||||
self._clear_locked()
|
||||
|
||||
def _run_start_worker(
|
||||
self,
|
||||
source_camera: str,
|
||||
start_ts: float,
|
||||
end_ts: float,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> None:
|
||||
"""Worker thread body — runs ffmpeg and publishes the camera config.
|
||||
def _clear_locked(self) -> None:
|
||||
self.replay_camera_name = None
|
||||
self.source_camera = None
|
||||
self.clip_path = None
|
||||
self.start_ts = None
|
||||
self.end_ts = None
|
||||
|
||||
Args:
|
||||
source_camera: Name of the source camera to replay
|
||||
start_ts: Start timestamp
|
||||
end_ts: End timestamp
|
||||
frigate_config: Current Frigate configuration
|
||||
config_publisher: Publisher for camera config updates
|
||||
"""
|
||||
replay_name = self.replay_camera_name
|
||||
if replay_name is None:
|
||||
return
|
||||
|
||||
os.makedirs(REPLAY_DIR, exist_ok=True)
|
||||
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
|
||||
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
|
||||
|
||||
try:
|
||||
recordings = self._query_recordings(source_camera, start_ts, end_ts)
|
||||
with open(concat_file, "w") as f:
|
||||
for recording in recordings:
|
||||
f.write(f"file '{recording.path}'\n")
|
||||
|
||||
ffmpeg_cmd = [
|
||||
frigate_config.ffmpeg.ffmpeg_path,
|
||||
"-hide_banner",
|
||||
"-y",
|
||||
"-f",
|
||||
"concat",
|
||||
"-safe",
|
||||
"0",
|
||||
"-i",
|
||||
concat_file,
|
||||
"-c",
|
||||
"copy",
|
||||
"-movflags",
|
||||
"+faststart",
|
||||
clip_path,
|
||||
]
|
||||
|
||||
logger.info(
|
||||
"Generating replay clip for %s (%.1f - %.1f)",
|
||||
source_camera,
|
||||
start_ts,
|
||||
end_ts,
|
||||
)
|
||||
|
||||
def _record_proc(p: sp.Popen) -> None:
|
||||
self._active_process = p
|
||||
|
||||
def _on_progress(percent: float) -> None:
|
||||
self.progress_percent = percent
|
||||
|
||||
try:
|
||||
returncode, stderr = run_ffmpeg_with_progress(
|
||||
ffmpeg_cmd,
|
||||
expected_duration_seconds=max(0.0, end_ts - start_ts),
|
||||
on_progress=_on_progress,
|
||||
process_started=_record_proc,
|
||||
use_low_priority=True,
|
||||
)
|
||||
finally:
|
||||
self._active_process = None
|
||||
|
||||
if returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}")
|
||||
|
||||
if not os.path.exists(clip_path):
|
||||
raise RuntimeError("Clip file was not created")
|
||||
|
||||
with self._lock:
|
||||
# If stop() ran while we were preparing, bail out cleanly.
|
||||
if self._state != ReplayState.preparing_clip:
|
||||
logger.info(
|
||||
"Replay startup aborted (state=%s); discarding clip",
|
||||
self._state,
|
||||
)
|
||||
return
|
||||
self._set_state(ReplayState.starting_camera)
|
||||
|
||||
self._publish_replay_camera(
|
||||
source_camera, replay_name, clip_path, frigate_config, config_publisher
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self.clip_path = clip_path
|
||||
self._set_state(ReplayState.active)
|
||||
|
||||
logger.info("Debug replay started: %s -> %s", source_camera, replay_name)
|
||||
except Exception as exc:
|
||||
logger.exception("Debug replay startup failed")
|
||||
with self._lock:
|
||||
# If stop() already ran while we were preparing, don't overwrite idle state.
|
||||
if self._state == ReplayState.idle:
|
||||
return
|
||||
self._set_state(ReplayState.error, error_message=str(exc))
|
||||
# Drop session pointers so the next /start is allowed.
|
||||
self.replay_camera_name = None
|
||||
self.source_camera = None
|
||||
self.clip_path = None
|
||||
self.start_ts = None
|
||||
self.end_ts = None
|
||||
# Best-effort cleanup of any partial clip on disk.
|
||||
try:
|
||||
if os.path.exists(clip_path):
|
||||
os.remove(clip_path)
|
||||
except OSError:
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
if os.path.exists(concat_file):
|
||||
os.remove(concat_file)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _publish_replay_camera(
|
||||
def publish_camera(
|
||||
self,
|
||||
source_camera: str,
|
||||
replay_name: str,
|
||||
@ -303,14 +103,9 @@ class DebugReplayManager:
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> None:
|
||||
"""Build the in-memory camera config and publish the add event.
|
||||
"""Build the in-memory replay camera config and publish the add event.
|
||||
|
||||
Args:
|
||||
source_camera: Name of the source camera
|
||||
replay_name: Name for the replay camera
|
||||
clip_path: Path to the replay clip file
|
||||
frigate_config: Current Frigate configuration
|
||||
config_publisher: Publisher for camera config updates
|
||||
Called by the job runner during the ``starting_camera`` phase.
|
||||
"""
|
||||
source_config = frigate_config.cameras[source_camera]
|
||||
camera_dict = self._build_camera_config_dict(
|
||||
@ -339,64 +134,35 @@ class DebugReplayManager:
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> None:
|
||||
"""Stop the active replay session and clean up all artifacts.
|
||||
"""Cancel any in-flight startup job and tear down the active session.
|
||||
|
||||
Args:
|
||||
frigate_config: Current Frigate configuration
|
||||
config_publisher: Publisher for camera config updates
|
||||
Safe to call when no session is active (no-op with a warning).
|
||||
"""
|
||||
cancel_debug_replay_job()
|
||||
wait_for_runner(timeout=2.0)
|
||||
|
||||
with self._lock:
|
||||
self._stop_locked(frigate_config, config_publisher)
|
||||
if not self.active:
|
||||
logger.warning("No active replay session to stop")
|
||||
return
|
||||
|
||||
def _stop_locked(
|
||||
self,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> None:
|
||||
if not self.active:
|
||||
logger.warning("No active replay session to stop")
|
||||
return
|
||||
replay_name = self.replay_camera_name
|
||||
|
||||
replay_name = self.replay_camera_name
|
||||
was_preparing = self._state == ReplayState.preparing_clip
|
||||
# Only publish remove if the camera was actually added to the live
|
||||
# config (i.e. the runner reached the starting_camera phase).
|
||||
if replay_name is not None and replay_name in frigate_config.cameras:
|
||||
config_publisher.publish_update(
|
||||
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name),
|
||||
frigate_config.cameras[replay_name],
|
||||
)
|
||||
|
||||
if was_preparing and self._active_process is not None:
|
||||
logger.info("Cancelling in-flight replay clip generation")
|
||||
try:
|
||||
self._active_process.terminate()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to terminate ffmpeg subprocess: %s", exc)
|
||||
if replay_name is not None:
|
||||
self._cleanup_db(replay_name)
|
||||
self._cleanup_files(replay_name)
|
||||
|
||||
# Keep a reference so we can join the worker after we've finished cleanup.
|
||||
worker = self._worker_thread
|
||||
self._clear_locked()
|
||||
|
||||
# Only publish the remove event if the camera was actually published.
|
||||
if (
|
||||
not was_preparing
|
||||
and replay_name is not None
|
||||
and replay_name in frigate_config.cameras
|
||||
):
|
||||
config_publisher.publish_update(
|
||||
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name),
|
||||
frigate_config.cameras[replay_name],
|
||||
)
|
||||
|
||||
if replay_name is not None:
|
||||
self._cleanup_db(replay_name)
|
||||
self._cleanup_files(replay_name)
|
||||
|
||||
self.replay_camera_name = None
|
||||
self.source_camera = None
|
||||
self.clip_path = None
|
||||
self.start_ts = None
|
||||
self.end_ts = None
|
||||
self._set_state(ReplayState.idle)
|
||||
|
||||
logger.info("Debug replay stopped and cleaned up: %s", replay_name)
|
||||
|
||||
# Bounded worker join so the API never hangs.
|
||||
if worker is not None and worker.is_alive():
|
||||
worker.join(timeout=2.0)
|
||||
logger.info("Debug replay stopped and cleaned up: %s", replay_name)
|
||||
|
||||
def _build_camera_config_dict(
|
||||
self,
|
||||
@ -404,16 +170,7 @@ class DebugReplayManager:
|
||||
replay_name: str,
|
||||
clip_path: str,
|
||||
) -> dict:
|
||||
"""Build a camera config dictionary for the replay camera.
|
||||
|
||||
Args:
|
||||
source_config: Source camera's CameraConfig
|
||||
replay_name: Name for the replay camera
|
||||
clip_path: Path to the replay clip file
|
||||
|
||||
Returns:
|
||||
Camera config as a dictionary
|
||||
"""
|
||||
"""Build a camera config dictionary for the replay camera."""
|
||||
# Extract detect config (exclude computed fields)
|
||||
detect_dict = source_config.detect.model_dump(
|
||||
exclude={"min_initialized", "max_disappeared", "enabled_in_config"}
|
||||
@ -448,7 +205,6 @@ class DebugReplayManager:
|
||||
zone_dump = zone_config.model_dump(
|
||||
exclude={"contour", "color"}, exclude_defaults=True
|
||||
)
|
||||
# Always include required fields
|
||||
zone_dump.setdefault("coordinates", zone_config.coordinates)
|
||||
zones_dict[zone_name] = zone_dump
|
||||
|
||||
|
||||
384
frigate/jobs/debug_replay.py
Normal file
384
frigate/jobs/debug_replay.py
Normal file
@ -0,0 +1,384 @@
|
||||
"""Debug replay startup job: ffmpeg concat + camera config publish.
|
||||
|
||||
The runner orchestrates the async portion of starting a debug replay
|
||||
session. The :class:`DebugReplayManager` (in :mod:`frigate.debug_replay`)
|
||||
owns session presence so the status bar can keep reading a single
|
||||
``active`` flag from ``/debug_replay/status`` for the entire session
|
||||
window — which is broader than this job's lifetime.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess as sp
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.config.camera.updater import CameraConfigUpdatePublisher
|
||||
from frigate.const import REPLAY_CAMERA_PREFIX, REPLAY_DIR
|
||||
from frigate.jobs.export import JobStatePublisher
|
||||
from frigate.jobs.job import Job
|
||||
from frigate.jobs.manager import job_is_running, set_current_job
|
||||
from frigate.models import Recordings
|
||||
from frigate.types import JobStatusTypesEnum
|
||||
from frigate.util.ffmpeg import run_ffmpeg_with_progress
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Coalesce frequent ffmpeg progress callbacks so the WS isn't flooded.
|
||||
PROGRESS_BROADCAST_MIN_INTERVAL = 1.0
|
||||
|
||||
JOB_TYPE = "debug_replay"
|
||||
|
||||
STEP_PREPARING_CLIP = "preparing_clip"
|
||||
STEP_STARTING_CAMERA = "starting_camera"
|
||||
|
||||
|
||||
_active_runner: Optional["DebugReplayJobRunner"] = None
|
||||
_runner_lock = threading.Lock()
|
||||
|
||||
|
||||
def _set_active_runner(runner: Optional["DebugReplayJobRunner"]) -> None:
|
||||
global _active_runner
|
||||
with _runner_lock:
|
||||
_active_runner = runner
|
||||
|
||||
|
||||
def get_active_runner() -> Optional["DebugReplayJobRunner"]:
|
||||
with _runner_lock:
|
||||
return _active_runner
|
||||
|
||||
|
||||
@dataclass
|
||||
class DebugReplayJob(Job):
|
||||
"""Job state for a debug replay startup."""
|
||||
|
||||
job_type: str = JOB_TYPE
|
||||
source_camera: str = ""
|
||||
replay_camera_name: str = ""
|
||||
start_ts: float = 0.0
|
||||
end_ts: float = 0.0
|
||||
current_step: Optional[str] = None
|
||||
progress_percent: float = 0.0
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Whitelisted payload for the job_state WS topic.
|
||||
|
||||
Replay-specific fields land in ``results`` so the frontend's
|
||||
generic ``Job<TResults>`` type can be parameterised cleanly.
|
||||
"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"job_type": self.job_type,
|
||||
"status": self.status,
|
||||
"start_time": self.start_time,
|
||||
"end_time": self.end_time,
|
||||
"error_message": self.error_message,
|
||||
"results": {
|
||||
"current_step": self.current_step,
|
||||
"progress_percent": self.progress_percent,
|
||||
"source_camera": self.source_camera,
|
||||
"replay_camera_name": self.replay_camera_name,
|
||||
"start_ts": self.start_ts,
|
||||
"end_ts": self.end_ts,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def query_recordings(source_camera: str, start_ts: float, end_ts: float):
|
||||
"""Return the Recordings query for the time range.
|
||||
|
||||
Module-level so tests can patch it without instantiating a runner.
|
||||
"""
|
||||
return (
|
||||
Recordings.select(
|
||||
Recordings.path,
|
||||
Recordings.start_time,
|
||||
Recordings.end_time,
|
||||
)
|
||||
.where(
|
||||
Recordings.start_time.between(start_ts, end_ts)
|
||||
| Recordings.end_time.between(start_ts, end_ts)
|
||||
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
|
||||
)
|
||||
.where(Recordings.camera == source_camera)
|
||||
.order_by(Recordings.start_time.asc())
|
||||
)
|
||||
|
||||
|
||||
class DebugReplayJobRunner(threading.Thread):
|
||||
"""Worker thread that drives the startup job to completion.
|
||||
|
||||
Owns the live ffmpeg ``Popen`` reference for cancellation. Cancellation
|
||||
is two-step (``threading.Event`` + ``proc.terminate()``) so the runner
|
||||
both knows it should stop and is unblocked from its blocking subprocess
|
||||
wait.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
job: DebugReplayJob,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
replay_manager: "DebugReplayManager",
|
||||
publisher: Optional[JobStatePublisher] = None,
|
||||
) -> None:
|
||||
super().__init__(daemon=True, name=f"debug_replay_{job.id}")
|
||||
self.job = job
|
||||
self.frigate_config = frigate_config
|
||||
self.config_publisher = config_publisher
|
||||
self.replay_manager = replay_manager
|
||||
self.publisher = publisher if publisher is not None else JobStatePublisher()
|
||||
self._cancel_event = threading.Event()
|
||||
self._active_process: sp.Popen | None = None
|
||||
self._proc_lock = threading.Lock()
|
||||
self._last_broadcast_monotonic: float = 0.0
|
||||
|
||||
def cancel(self) -> None:
|
||||
"""Request cancellation. Idempotent."""
|
||||
self._cancel_event.set()
|
||||
with self._proc_lock:
|
||||
proc = self._active_process
|
||||
if proc is not None:
|
||||
try:
|
||||
proc.terminate()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to terminate ffmpeg subprocess: %s", exc)
|
||||
|
||||
def is_cancelled(self) -> bool:
|
||||
return self._cancel_event.is_set()
|
||||
|
||||
def _record_proc(self, proc: sp.Popen) -> None:
|
||||
with self._proc_lock:
|
||||
self._active_process = proc
|
||||
# Race: cancel arrived between Popen and _record_proc.
|
||||
if self._cancel_event.is_set():
|
||||
try:
|
||||
proc.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _broadcast(self, force: bool = False) -> None:
|
||||
now = time.monotonic()
|
||||
if (
|
||||
not force
|
||||
and now - self._last_broadcast_monotonic < PROGRESS_BROADCAST_MIN_INTERVAL
|
||||
):
|
||||
return
|
||||
self._last_broadcast_monotonic = now
|
||||
|
||||
try:
|
||||
self.publisher.publish(self.job.to_dict())
|
||||
except Exception as err:
|
||||
logger.warning("Publisher raised during job state broadcast: %s", err)
|
||||
|
||||
def run(self) -> None:
|
||||
replay_name = self.job.replay_camera_name
|
||||
os.makedirs(REPLAY_DIR, exist_ok=True)
|
||||
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
|
||||
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
|
||||
|
||||
self.job.status = JobStatusTypesEnum.running
|
||||
self.job.start_time = time.time()
|
||||
self.job.current_step = STEP_PREPARING_CLIP
|
||||
self._broadcast(force=True)
|
||||
|
||||
try:
|
||||
recordings = query_recordings(
|
||||
self.job.source_camera, self.job.start_ts, self.job.end_ts
|
||||
)
|
||||
with open(concat_file, "w") as f:
|
||||
for recording in recordings:
|
||||
f.write(f"file '{recording.path}'\n")
|
||||
|
||||
ffmpeg_cmd = [
|
||||
self.frigate_config.ffmpeg.ffmpeg_path,
|
||||
"-hide_banner",
|
||||
"-y",
|
||||
"-f",
|
||||
"concat",
|
||||
"-safe",
|
||||
"0",
|
||||
"-i",
|
||||
concat_file,
|
||||
"-c",
|
||||
"copy",
|
||||
"-movflags",
|
||||
"+faststart",
|
||||
clip_path,
|
||||
]
|
||||
|
||||
logger.info(
|
||||
"Generating replay clip for %s (%.1f - %.1f)",
|
||||
self.job.source_camera,
|
||||
self.job.start_ts,
|
||||
self.job.end_ts,
|
||||
)
|
||||
|
||||
def _on_progress(percent: float) -> None:
|
||||
self.job.progress_percent = percent
|
||||
self._broadcast()
|
||||
|
||||
try:
|
||||
returncode, stderr = run_ffmpeg_with_progress(
|
||||
ffmpeg_cmd,
|
||||
expected_duration_seconds=max(
|
||||
0.0, self.job.end_ts - self.job.start_ts
|
||||
),
|
||||
on_progress=_on_progress,
|
||||
process_started=self._record_proc,
|
||||
use_low_priority=True,
|
||||
)
|
||||
finally:
|
||||
with self._proc_lock:
|
||||
self._active_process = None
|
||||
|
||||
if self._cancel_event.is_set():
|
||||
self._finalize_cancelled(clip_path)
|
||||
return
|
||||
|
||||
if returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}")
|
||||
|
||||
if not os.path.exists(clip_path):
|
||||
raise RuntimeError("Clip file was not created")
|
||||
|
||||
self.job.current_step = STEP_STARTING_CAMERA
|
||||
self.job.progress_percent = 100.0
|
||||
self._broadcast(force=True)
|
||||
|
||||
if self._cancel_event.is_set():
|
||||
self._finalize_cancelled(clip_path)
|
||||
return
|
||||
|
||||
self.replay_manager.publish_camera(
|
||||
source_camera=self.job.source_camera,
|
||||
replay_name=replay_name,
|
||||
clip_path=clip_path,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.config_publisher,
|
||||
)
|
||||
self.replay_manager.mark_session_ready(clip_path)
|
||||
|
||||
self.job.status = JobStatusTypesEnum.success
|
||||
self.job.end_time = time.time()
|
||||
self._broadcast(force=True)
|
||||
logger.info(
|
||||
"Debug replay started: %s -> %s",
|
||||
self.job.source_camera,
|
||||
replay_name,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Debug replay startup failed")
|
||||
self.job.status = JobStatusTypesEnum.failed
|
||||
self.job.error_message = str(exc)
|
||||
self.job.end_time = time.time()
|
||||
self._broadcast(force=True)
|
||||
self.replay_manager.clear_session()
|
||||
_remove_silent(clip_path)
|
||||
finally:
|
||||
_remove_silent(concat_file)
|
||||
_set_active_runner(None)
|
||||
|
||||
def _finalize_cancelled(self, clip_path: str) -> None:
|
||||
logger.info("Debug replay startup cancelled")
|
||||
self.job.status = JobStatusTypesEnum.cancelled
|
||||
self.job.end_time = time.time()
|
||||
self._broadcast(force=True)
|
||||
# Manager session pointers are cleared by stop() on the API side
|
||||
# (it already holds the cleanup contract). On any other cancellation
|
||||
# path, also clear so /start can run again.
|
||||
self.replay_manager.clear_session()
|
||||
_remove_silent(clip_path)
|
||||
|
||||
|
||||
def _remove_silent(path: str) -> None:
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def start_debug_replay_job(
|
||||
*,
|
||||
source_camera: str,
|
||||
start_ts: float,
|
||||
end_ts: float,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
replay_manager: "DebugReplayManager",
|
||||
) -> str:
|
||||
"""Validate, create job, start runner. Returns the job id.
|
||||
|
||||
Raises ``ValueError`` for bad params (camera missing, time range
|
||||
invalid, no recordings) and ``RuntimeError`` if a session is already
|
||||
active.
|
||||
"""
|
||||
if job_is_running(JOB_TYPE) or replay_manager.active:
|
||||
raise RuntimeError("A replay session is already active")
|
||||
|
||||
if source_camera not in frigate_config.cameras:
|
||||
raise ValueError(f"Camera '{source_camera}' not found")
|
||||
|
||||
if end_ts <= start_ts:
|
||||
raise ValueError("End time must be after start time")
|
||||
|
||||
recordings = query_recordings(source_camera, start_ts, end_ts)
|
||||
if not recordings.count():
|
||||
raise ValueError(
|
||||
f"No recordings found for camera '{source_camera}' in the specified time range"
|
||||
)
|
||||
|
||||
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
|
||||
replay_manager.mark_starting(
|
||||
source_camera=source_camera,
|
||||
replay_camera_name=replay_name,
|
||||
start_ts=start_ts,
|
||||
end_ts=end_ts,
|
||||
)
|
||||
|
||||
job = DebugReplayJob(
|
||||
source_camera=source_camera,
|
||||
replay_camera_name=replay_name,
|
||||
start_ts=start_ts,
|
||||
end_ts=end_ts,
|
||||
)
|
||||
set_current_job(job)
|
||||
|
||||
runner = DebugReplayJobRunner(
|
||||
job=job,
|
||||
frigate_config=frigate_config,
|
||||
config_publisher=config_publisher,
|
||||
replay_manager=replay_manager,
|
||||
)
|
||||
_set_active_runner(runner)
|
||||
runner.start()
|
||||
|
||||
return job.id
|
||||
|
||||
|
||||
def cancel_debug_replay_job() -> bool:
|
||||
"""Signal the active runner to cancel.
|
||||
|
||||
Returns True if a runner was signalled, False if no job was active.
|
||||
"""
|
||||
runner = get_active_runner()
|
||||
if runner is None:
|
||||
return False
|
||||
runner.cancel()
|
||||
return True
|
||||
|
||||
|
||||
def wait_for_runner(timeout: float = 2.0) -> bool:
|
||||
"""Join the active runner. Returns True if the runner ended in time."""
|
||||
runner = get_active_runner()
|
||||
if runner is None:
|
||||
return True
|
||||
runner.join(timeout=timeout)
|
||||
return not runner.is_alive()
|
||||
@ -44,6 +44,7 @@ import { getTranslatedLabel } from "@/utils/i18n";
|
||||
import { Card } from "@/components/ui/card";
|
||||
import { Progress } from "@/components/ui/progress";
|
||||
import { ObjectType } from "@/types/ws";
|
||||
import { useJobStatus } from "@/api/ws";
|
||||
import WsMessageFeed from "@/components/ws/WsMessageFeed";
|
||||
import { ConfigSectionTemplate } from "@/components/config-form/sections/ConfigSectionTemplate";
|
||||
|
||||
@ -57,18 +58,8 @@ import { useDocDomain } from "@/hooks/use-doc-domain";
|
||||
import DebugDrawingLayer from "@/components/overlay/DebugDrawingLayer";
|
||||
import { IoMdArrowRoundBack } from "react-icons/io";
|
||||
|
||||
type ReplayState =
|
||||
| "idle"
|
||||
| "preparing_clip"
|
||||
| "starting_camera"
|
||||
| "active"
|
||||
| "error";
|
||||
|
||||
type DebugReplayStatus = {
|
||||
active: boolean;
|
||||
state: ReplayState;
|
||||
progress_percent: number | null;
|
||||
error_message: string | null;
|
||||
replay_camera: string | null;
|
||||
source_camera: string | null;
|
||||
start_time: number | null;
|
||||
@ -76,6 +67,15 @@ type DebugReplayStatus = {
|
||||
live_ready: boolean;
|
||||
};
|
||||
|
||||
type DebugReplayJobResults = {
|
||||
current_step: "preparing_clip" | "starting_camera" | null;
|
||||
progress_percent: number | null;
|
||||
source_camera: string | null;
|
||||
replay_camera_name: string | null;
|
||||
start_ts: number | null;
|
||||
end_ts: number | null;
|
||||
};
|
||||
|
||||
type DebugOptions = {
|
||||
bbox: boolean;
|
||||
timestamp: boolean;
|
||||
@ -130,6 +130,8 @@ export default function Replay() {
|
||||
} = useSWR<DebugReplayStatus>("debug_replay/status", {
|
||||
refreshInterval: 1000,
|
||||
});
|
||||
const { payload: replayJob } =
|
||||
useJobStatus<DebugReplayJobResults>("debug_replay");
|
||||
const [isInitializing, setIsInitializing] = useState(true);
|
||||
|
||||
// Refresh status immediately on mount to avoid showing "no session" briefly
|
||||
@ -248,34 +250,17 @@ export default function Replay() {
|
||||
);
|
||||
}
|
||||
|
||||
// No active session
|
||||
if (!status?.active && status?.state !== "error") {
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
<MdReplay className="size-12" />
|
||||
<Heading as="h2" className="text-center">
|
||||
{t("page.noSession")}
|
||||
</Heading>
|
||||
<p className="max-w-md text-center text-muted-foreground">
|
||||
{t("page.noSessionDesc")}
|
||||
</p>
|
||||
<Button variant="default" onClick={() => navigate("/review")}>
|
||||
{t("page.goToRecordings")}
|
||||
</Button>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// Startup error
|
||||
if (status?.state === "error") {
|
||||
// Startup error (job failed). Only show when status.active is also true so
|
||||
// we don't surface stale failed jobs after a session ended cleanly.
|
||||
if (replayJob?.status === "failed" && status?.active) {
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
<Heading as="h2" className="text-center">
|
||||
{t("page.startError.title")}
|
||||
</Heading>
|
||||
{status.error_message && (
|
||||
{replayJob.error_message && (
|
||||
<p className="max-w-xl text-center text-sm text-muted-foreground">
|
||||
{status.error_message}
|
||||
{replayJob.error_message}
|
||||
</p>
|
||||
)}
|
||||
<Button
|
||||
@ -293,24 +278,45 @@ export default function Replay() {
|
||||
);
|
||||
}
|
||||
|
||||
// Preparing or starting
|
||||
if (
|
||||
status?.state === "preparing_clip" ||
|
||||
status?.state === "starting_camera"
|
||||
) {
|
||||
// No active session.
|
||||
if (!status?.active) {
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
<MdReplay className="size-12" />
|
||||
<Heading as="h2" className="text-center">
|
||||
{t("page.noSession")}
|
||||
</Heading>
|
||||
<p className="max-w-md text-center text-muted-foreground">
|
||||
{t("page.noSessionDesc")}
|
||||
</p>
|
||||
<Button variant="default" onClick={() => navigate("/review")}>
|
||||
{t("page.goToRecordings")}
|
||||
</Button>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// Startup in progress (job is running). The session is active but the
|
||||
// replay camera isn't ready yet; show progress / phase from the job.
|
||||
const startupStep =
|
||||
replayJob?.status === "running"
|
||||
? (replayJob.results?.current_step ?? null)
|
||||
: null;
|
||||
if (startupStep === "preparing_clip" || startupStep === "starting_camera") {
|
||||
const phaseTitle =
|
||||
status.state === "preparing_clip"
|
||||
startupStep === "preparing_clip"
|
||||
? t("page.preparingClip")
|
||||
: t("page.startingCamera");
|
||||
const progressPercent = replayJob?.results?.progress_percent ?? null;
|
||||
const showProgressBar =
|
||||
status.state === "preparing_clip" && status.progress_percent != null;
|
||||
startupStep === "preparing_clip" && progressPercent != null;
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
{showProgressBar ? (
|
||||
<div className="flex w-64 flex-col items-center gap-2">
|
||||
<Progress value={status.progress_percent ?? 0} />
|
||||
<Progress value={progressPercent ?? 0} />
|
||||
<div className="text-xs text-muted-foreground">
|
||||
{Math.round(status.progress_percent ?? 0)}%
|
||||
{Math.round(progressPercent ?? 0)}%
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
@ -319,7 +325,7 @@ export default function Replay() {
|
||||
<Heading as="h3" className="text-center">
|
||||
{phaseTitle}
|
||||
</Heading>
|
||||
{status.state === "preparing_clip" && (
|
||||
{startupStep === "preparing_clip" && (
|
||||
<p className="max-w-md text-center text-sm text-muted-foreground">
|
||||
{t("page.preparingClipDesc")}
|
||||
</p>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user