mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-04 04:27:42 +03:00
Use Job infrastructure for Debug Replay (#23099)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* use ReplayState enum * extract shared ffmpeg progress helper * make start call non-blocking with worker thread * expose replay state on status endpoint and return 202 from start * cancel in-flight ffmpeg when stop is called during preparation * add replay i18n strings for preparing and error states * show status in replay UI * navigate immediately on 202 from debug replay menus and dialog * remove unused * simplify to use Job infrastructure * tests * cleanup and tweaks * fetch schema * update api spec * formatting * fix e2e test * mypy * clean up * formatting * fix * fix test * don't try to show camera image until status reports ready * simplify loading logic * fix race in latest_frame on debug replay shutdown * remove toast when successfully stopping it gets hidden almost immediately
This commit is contained in:
parent
5bc15d4aa9
commit
814c497bef
15
docs/static/frigate-api.yaml
vendored
15
docs/static/frigate-api.yaml
vendored
@ -5997,7 +5997,10 @@ paths:
|
||||
tags:
|
||||
- App
|
||||
summary: Start debug replay
|
||||
description: Start a debug replay session from camera recordings.
|
||||
description:
|
||||
Start a debug replay session from camera recordings. Returns
|
||||
immediately while clip generation runs as a background job; subscribe
|
||||
to the 'debug_replay' job_state WS topic to track progress.
|
||||
operationId: start_debug_replay_debug_replay_start_post
|
||||
requestBody:
|
||||
required: true
|
||||
@ -6006,12 +6009,16 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/DebugReplayStartBody"
|
||||
responses:
|
||||
"200":
|
||||
"202":
|
||||
description: Successful Response
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/DebugReplayStartResponse"
|
||||
"400":
|
||||
description: Invalid camera, time range, or no recordings
|
||||
"409":
|
||||
description: A replay session is already active
|
||||
"422":
|
||||
description: Validation Error
|
||||
content:
|
||||
@ -6272,10 +6279,14 @@ components:
|
||||
replay_camera:
|
||||
type: string
|
||||
title: Replay Camera
|
||||
job_id:
|
||||
type: string
|
||||
title: Job Id
|
||||
type: object
|
||||
required:
|
||||
- success
|
||||
- replay_camera
|
||||
- job_id
|
||||
title: DebugReplayStartResponse
|
||||
description: Response for starting a debug replay session.
|
||||
DebugReplayStatusResponse:
|
||||
|
||||
@ -10,6 +10,7 @@ from pydantic import BaseModel, Field
|
||||
|
||||
from frigate.api.auth import require_role
|
||||
from frigate.api.defs.tags import Tags
|
||||
from frigate.jobs.debug_replay import start_debug_replay_job
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -29,10 +30,17 @@ class DebugReplayStartResponse(BaseModel):
|
||||
|
||||
success: bool
|
||||
replay_camera: str
|
||||
job_id: str
|
||||
|
||||
|
||||
class DebugReplayStatusResponse(BaseModel):
|
||||
"""Response for debug replay status."""
|
||||
"""Response for debug replay status.
|
||||
|
||||
Returns only session-presence fields. Startup progress and error
|
||||
details flow through the job_state WebSocket topic via the
|
||||
debug_replay job (see frigate.jobs.debug_replay); the
|
||||
Replay page subscribes there with useJobStatus("debug_replay").
|
||||
"""
|
||||
|
||||
active: bool
|
||||
replay_camera: str | None = None
|
||||
@ -51,15 +59,32 @@ class DebugReplayStopResponse(BaseModel):
|
||||
@router.post(
|
||||
"/debug_replay/start",
|
||||
response_model=DebugReplayStartResponse,
|
||||
status_code=202,
|
||||
responses={
|
||||
400: {"description": "Invalid camera, time range, or no recordings"},
|
||||
409: {"description": "A replay session is already active"},
|
||||
},
|
||||
dependencies=[Depends(require_role(["admin"]))],
|
||||
summary="Start debug replay",
|
||||
description="Start a debug replay session from camera recordings.",
|
||||
description="Start a debug replay session from camera recordings. Returns "
|
||||
"immediately while clip generation runs as a background job; subscribe "
|
||||
"to the 'debug_replay' job_state WS topic to track progress.",
|
||||
)
|
||||
async def start_debug_replay(request: Request, body: DebugReplayStartBody):
|
||||
"""Start a debug replay session."""
|
||||
"""Start a debug replay session asynchronously."""
|
||||
replay_manager = request.app.replay_manager
|
||||
|
||||
if replay_manager.active:
|
||||
try:
|
||||
job_id = await asyncio.to_thread(
|
||||
start_debug_replay_job,
|
||||
source_camera=body.camera,
|
||||
start_ts=body.start_time,
|
||||
end_ts=body.end_time,
|
||||
frigate_config=request.app.frigate_config,
|
||||
config_publisher=request.app.config_publisher,
|
||||
replay_manager=replay_manager,
|
||||
)
|
||||
except RuntimeError:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
@ -67,38 +92,23 @@ async def start_debug_replay(request: Request, body: DebugReplayStartBody):
|
||||
},
|
||||
status_code=409,
|
||||
)
|
||||
|
||||
try:
|
||||
replay_camera = await asyncio.to_thread(
|
||||
replay_manager.start,
|
||||
source_camera=body.camera,
|
||||
start_ts=body.start_time,
|
||||
end_ts=body.end_time,
|
||||
frigate_config=request.app.frigate_config,
|
||||
config_publisher=request.app.config_publisher,
|
||||
)
|
||||
except ValueError:
|
||||
logger.exception("Invalid parameters for debug replay start request")
|
||||
logger.exception("Rejected debug replay start request")
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "Invalid debug replay request parameters",
|
||||
"message": "Invalid debug replay parameters",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
except RuntimeError:
|
||||
logger.exception("Error while starting debug replay session")
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "An internal error occurred while starting debug replay",
|
||||
},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
return DebugReplayStartResponse(
|
||||
success=True,
|
||||
replay_camera=replay_camera,
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": True,
|
||||
"replay_camera": replay_manager.replay_camera_name,
|
||||
"job_id": job_id,
|
||||
},
|
||||
status_code=202,
|
||||
)
|
||||
|
||||
|
||||
@ -118,12 +128,16 @@ def get_debug_replay_status(request: Request):
|
||||
|
||||
if replay_manager.active and replay_camera:
|
||||
frame_processor = request.app.detected_frames_processor
|
||||
frame = frame_processor.get_current_frame(replay_camera)
|
||||
frame = (
|
||||
frame_processor.get_current_frame(replay_camera)
|
||||
if frame_processor is not None
|
||||
else None
|
||||
)
|
||||
|
||||
if frame is not None:
|
||||
frame_time = frame_processor.get_current_frame_time(replay_camera)
|
||||
camera_config = request.app.frigate_config.cameras.get(replay_camera)
|
||||
retry_interval = 10
|
||||
retry_interval = 10.0
|
||||
|
||||
if camera_config is not None:
|
||||
retry_interval = float(camera_config.ffmpeg.retry_interval or 10)
|
||||
|
||||
@ -174,12 +174,10 @@ async def latest_frame(
|
||||
}
|
||||
quality_params = get_image_quality_params(extension.value, params.quality)
|
||||
|
||||
if camera_name in request.app.frigate_config.cameras:
|
||||
camera_config = request.app.frigate_config.cameras.get(camera_name)
|
||||
if camera_config is not None:
|
||||
frame = frame_processor.get_current_frame(camera_name, draw_options)
|
||||
retry_interval = float(
|
||||
request.app.frigate_config.cameras.get(camera_name).ffmpeg.retry_interval
|
||||
or 10
|
||||
)
|
||||
retry_interval = float(camera_config.ffmpeg.retry_interval or 10)
|
||||
|
||||
is_offline = False
|
||||
if frame is None or datetime.now().timestamp() > (
|
||||
|
||||
@ -1,9 +1,13 @@
|
||||
"""Debug replay camera management for replaying recordings with detection overlays."""
|
||||
"""Debug replay camera management for replaying recordings with detection overlays.
|
||||
|
||||
The startup work (ffmpeg concat + camera config publish) lives in
|
||||
frigate.jobs.debug_replay. This module owns only session presence
|
||||
(active), session metadata, and post-session cleanup.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess as sp
|
||||
import threading
|
||||
|
||||
from ruamel.yaml import YAML
|
||||
@ -21,7 +25,7 @@ from frigate.const import (
|
||||
REPLAY_DIR,
|
||||
THUMB_DIR,
|
||||
)
|
||||
from frigate.models import Recordings
|
||||
from frigate.jobs.debug_replay import cancel_debug_replay_job, wait_for_runner
|
||||
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
|
||||
from frigate.util.config import find_config_file
|
||||
|
||||
@ -29,7 +33,14 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DebugReplayManager:
|
||||
"""Manages a single debug replay session."""
|
||||
"""Owns the lifecycle pointers for a single debug replay session.
|
||||
|
||||
A session exists from the moment mark_starting is called (synchronously,
|
||||
inside the API handler) until clear_session runs (on success cleanup,
|
||||
failure, or stop). The active property is the source of truth that the
|
||||
status bar consumes — broader than the startup job, which only covers the
|
||||
preparing_clip / starting_camera window.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._lock = threading.Lock()
|
||||
@ -41,144 +52,66 @@ class DebugReplayManager:
|
||||
|
||||
@property
|
||||
def active(self) -> bool:
|
||||
"""Whether a replay session is currently active."""
|
||||
"""True from mark_starting until clear_session."""
|
||||
return self.replay_camera_name is not None
|
||||
|
||||
def start(
|
||||
def mark_starting(
|
||||
self,
|
||||
source_camera: str,
|
||||
replay_camera_name: str,
|
||||
start_ts: float,
|
||||
end_ts: float,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> str:
|
||||
"""Start a debug replay session.
|
||||
) -> None:
|
||||
"""Synchronously claim the session before the job runner starts.
|
||||
|
||||
Args:
|
||||
source_camera: Name of the source camera to replay
|
||||
start_ts: Start timestamp
|
||||
end_ts: End timestamp
|
||||
frigate_config: Current Frigate configuration
|
||||
config_publisher: Publisher for camera config updates
|
||||
|
||||
Returns:
|
||||
The replay camera name
|
||||
|
||||
Raises:
|
||||
ValueError: If a session is already active or parameters are invalid
|
||||
RuntimeError: If clip generation fails
|
||||
Called inside the API handler so the status bar sees active=True
|
||||
immediately, before the worker thread does any ffmpeg work.
|
||||
"""
|
||||
with self._lock:
|
||||
return self._start_locked(
|
||||
source_camera, start_ts, end_ts, frigate_config, config_publisher
|
||||
)
|
||||
self.replay_camera_name = replay_camera_name
|
||||
self.source_camera = source_camera
|
||||
self.start_ts = start_ts
|
||||
self.end_ts = end_ts
|
||||
self.clip_path = None
|
||||
|
||||
def _start_locked(
|
||||
def mark_session_ready(self, clip_path: str) -> None:
|
||||
"""Record the on-disk clip path after the camera has been published."""
|
||||
with self._lock:
|
||||
self.clip_path = clip_path
|
||||
|
||||
def clear_session(self) -> None:
|
||||
"""Reset session pointers without publishing camera removal.
|
||||
|
||||
Used by the job runner on failure paths. stop() does the camera
|
||||
teardown plus this clear in one step.
|
||||
"""
|
||||
with self._lock:
|
||||
self._clear_locked()
|
||||
|
||||
def _clear_locked(self) -> None:
|
||||
self.replay_camera_name = None
|
||||
self.source_camera = None
|
||||
self.clip_path = None
|
||||
self.start_ts = None
|
||||
self.end_ts = None
|
||||
|
||||
def publish_camera(
|
||||
self,
|
||||
source_camera: str,
|
||||
start_ts: float,
|
||||
end_ts: float,
|
||||
replay_name: str,
|
||||
clip_path: str,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> str:
|
||||
if self.active:
|
||||
raise ValueError("A replay session is already active")
|
||||
) -> None:
|
||||
"""Build the in-memory replay camera config and publish the add event.
|
||||
|
||||
if source_camera not in frigate_config.cameras:
|
||||
raise ValueError(f"Camera '{source_camera}' not found")
|
||||
|
||||
if end_ts <= start_ts:
|
||||
raise ValueError("End time must be after start time")
|
||||
|
||||
# Query recordings for the source camera in the time range
|
||||
recordings = (
|
||||
Recordings.select(
|
||||
Recordings.path,
|
||||
Recordings.start_time,
|
||||
Recordings.end_time,
|
||||
)
|
||||
.where(
|
||||
Recordings.start_time.between(start_ts, end_ts)
|
||||
| Recordings.end_time.between(start_ts, end_ts)
|
||||
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
|
||||
)
|
||||
.where(Recordings.camera == source_camera)
|
||||
.order_by(Recordings.start_time.asc())
|
||||
)
|
||||
|
||||
if not recordings.count():
|
||||
raise ValueError(
|
||||
f"No recordings found for camera '{source_camera}' in the specified time range"
|
||||
)
|
||||
|
||||
# Create replay directory
|
||||
os.makedirs(REPLAY_DIR, exist_ok=True)
|
||||
|
||||
# Generate replay camera name
|
||||
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
|
||||
|
||||
# Build concat file for ffmpeg
|
||||
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
|
||||
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
|
||||
|
||||
with open(concat_file, "w") as f:
|
||||
for recording in recordings:
|
||||
f.write(f"file '{recording.path}'\n")
|
||||
|
||||
# Concatenate recordings into a single clip with -c copy (fast)
|
||||
ffmpeg_cmd = [
|
||||
frigate_config.ffmpeg.ffmpeg_path,
|
||||
"-hide_banner",
|
||||
"-y",
|
||||
"-f",
|
||||
"concat",
|
||||
"-safe",
|
||||
"0",
|
||||
"-i",
|
||||
concat_file,
|
||||
"-c",
|
||||
"copy",
|
||||
"-movflags",
|
||||
"+faststart",
|
||||
clip_path,
|
||||
]
|
||||
|
||||
logger.info(
|
||||
"Generating replay clip for %s (%.1f - %.1f)",
|
||||
source_camera,
|
||||
start_ts,
|
||||
end_ts,
|
||||
)
|
||||
|
||||
try:
|
||||
result = sp.run(
|
||||
ffmpeg_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=120,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.error("FFmpeg error: %s", result.stderr)
|
||||
raise RuntimeError(
|
||||
f"Failed to generate replay clip: {result.stderr[-500:]}"
|
||||
)
|
||||
except sp.TimeoutExpired:
|
||||
raise RuntimeError("Clip generation timed out")
|
||||
finally:
|
||||
# Clean up concat file
|
||||
if os.path.exists(concat_file):
|
||||
os.remove(concat_file)
|
||||
|
||||
if not os.path.exists(clip_path):
|
||||
raise RuntimeError("Clip file was not created")
|
||||
|
||||
# Build camera config dict for the replay camera
|
||||
Called by the job runner during the starting_camera phase.
|
||||
"""
|
||||
source_config = frigate_config.cameras[source_camera]
|
||||
camera_dict = self._build_camera_config_dict(
|
||||
source_config, replay_name, clip_path
|
||||
)
|
||||
|
||||
# Build an in-memory config with the replay camera added
|
||||
config_file = find_config_file()
|
||||
yaml_parser = YAML()
|
||||
with open(config_file, "r") as f:
|
||||
@ -191,75 +124,48 @@ class DebugReplayManager:
|
||||
try:
|
||||
new_config = FrigateConfig.parse_object(config_data)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to validate replay camera config: {e}")
|
||||
|
||||
# Update the running config
|
||||
raise RuntimeError(f"Failed to validate replay camera config: {e}") from e
|
||||
frigate_config.cameras[replay_name] = new_config.cameras[replay_name]
|
||||
|
||||
# Publish the add event
|
||||
config_publisher.publish_update(
|
||||
CameraConfigUpdateTopic(CameraConfigUpdateEnum.add, replay_name),
|
||||
new_config.cameras[replay_name],
|
||||
)
|
||||
|
||||
# Store session state
|
||||
self.replay_camera_name = replay_name
|
||||
self.source_camera = source_camera
|
||||
self.clip_path = clip_path
|
||||
self.start_ts = start_ts
|
||||
self.end_ts = end_ts
|
||||
|
||||
logger.info("Debug replay started: %s -> %s", source_camera, replay_name)
|
||||
return replay_name
|
||||
|
||||
def stop(
|
||||
self,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> None:
|
||||
"""Stop the active replay session and clean up all artifacts.
|
||||
"""Cancel any in-flight startup job and tear down the active session.
|
||||
|
||||
Args:
|
||||
frigate_config: Current Frigate configuration
|
||||
config_publisher: Publisher for camera config updates
|
||||
Safe to call when no session is active (no-op with a warning).
|
||||
"""
|
||||
cancel_debug_replay_job()
|
||||
wait_for_runner(timeout=2.0)
|
||||
|
||||
with self._lock:
|
||||
self._stop_locked(frigate_config, config_publisher)
|
||||
if not self.active:
|
||||
logger.warning("No active replay session to stop")
|
||||
return
|
||||
|
||||
def _stop_locked(
|
||||
self,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
) -> None:
|
||||
if not self.active:
|
||||
logger.warning("No active replay session to stop")
|
||||
return
|
||||
replay_name = self.replay_camera_name
|
||||
|
||||
replay_name = self.replay_camera_name
|
||||
# Only publish remove if the camera was actually added to the live
|
||||
# config (i.e. the runner reached the starting_camera phase).
|
||||
if replay_name is not None and replay_name in frigate_config.cameras:
|
||||
config_publisher.publish_update(
|
||||
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name),
|
||||
frigate_config.cameras[replay_name],
|
||||
)
|
||||
|
||||
# Publish remove event so subscribers stop and remove from their config
|
||||
if replay_name in frigate_config.cameras:
|
||||
config_publisher.publish_update(
|
||||
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name),
|
||||
frigate_config.cameras[replay_name],
|
||||
)
|
||||
# Do NOT pop here — let subscribers handle removal from the shared
|
||||
# config dict when they process the ZMQ message to avoid race conditions
|
||||
if replay_name is not None:
|
||||
self._cleanup_db(replay_name)
|
||||
self._cleanup_files(replay_name)
|
||||
|
||||
# Defensive DB cleanup
|
||||
self._cleanup_db(replay_name)
|
||||
self._clear_locked()
|
||||
|
||||
# Remove filesystem artifacts
|
||||
self._cleanup_files(replay_name)
|
||||
|
||||
# Reset state
|
||||
self.replay_camera_name = None
|
||||
self.source_camera = None
|
||||
self.clip_path = None
|
||||
self.start_ts = None
|
||||
self.end_ts = None
|
||||
|
||||
logger.info("Debug replay stopped and cleaned up: %s", replay_name)
|
||||
logger.info("Debug replay stopped and cleaned up: %s", replay_name)
|
||||
|
||||
def _build_camera_config_dict(
|
||||
self,
|
||||
@ -267,16 +173,7 @@ class DebugReplayManager:
|
||||
replay_name: str,
|
||||
clip_path: str,
|
||||
) -> dict:
|
||||
"""Build a camera config dictionary for the replay camera.
|
||||
|
||||
Args:
|
||||
source_config: Source camera's CameraConfig
|
||||
replay_name: Name for the replay camera
|
||||
clip_path: Path to the replay clip file
|
||||
|
||||
Returns:
|
||||
Camera config as a dictionary
|
||||
"""
|
||||
"""Build a camera config dictionary for the replay camera."""
|
||||
# Extract detect config (exclude computed fields)
|
||||
detect_dict = source_config.detect.model_dump(
|
||||
exclude={"min_initialized", "max_disappeared", "enabled_in_config"}
|
||||
@ -311,7 +208,6 @@ class DebugReplayManager:
|
||||
zone_dump = zone_config.model_dump(
|
||||
exclude={"contour", "color"}, exclude_defaults=True
|
||||
)
|
||||
# Always include required fields
|
||||
zone_dump.setdefault("coordinates", zone_config.coordinates)
|
||||
zones_dict[zone_name] = zone_dump
|
||||
|
||||
|
||||
386
frigate/jobs/debug_replay.py
Normal file
386
frigate/jobs/debug_replay.py
Normal file
@ -0,0 +1,386 @@
|
||||
"""Debug replay startup job: ffmpeg concat + camera config publish.
|
||||
|
||||
The runner orchestrates the async portion of starting a debug replay
|
||||
session. The DebugReplayManager (in frigate.debug_replay) owns session
|
||||
presence so the status bar can keep reading a single `active` flag from
|
||||
/debug_replay/status for the entire session window — which is broader
|
||||
than this job's lifetime.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess as sp
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING, Any, Optional, cast
|
||||
|
||||
from peewee import ModelSelect
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.config.camera.updater import CameraConfigUpdatePublisher
|
||||
from frigate.const import REPLAY_CAMERA_PREFIX, REPLAY_DIR
|
||||
from frigate.jobs.export import JobStatePublisher
|
||||
from frigate.jobs.job import Job
|
||||
from frigate.jobs.manager import job_is_running, set_current_job
|
||||
from frigate.models import Recordings
|
||||
from frigate.types import JobStatusTypesEnum
|
||||
from frigate.util.ffmpeg import run_ffmpeg_with_progress
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Coalesce frequent ffmpeg progress callbacks so the WS isn't flooded.
|
||||
PROGRESS_BROADCAST_MIN_INTERVAL = 1.0
|
||||
|
||||
JOB_TYPE = "debug_replay"
|
||||
|
||||
STEP_PREPARING_CLIP = "preparing_clip"
|
||||
STEP_STARTING_CAMERA = "starting_camera"
|
||||
|
||||
|
||||
_active_runner: Optional["DebugReplayJobRunner"] = None
|
||||
_runner_lock = threading.Lock()
|
||||
|
||||
|
||||
def _set_active_runner(runner: Optional["DebugReplayJobRunner"]) -> None:
|
||||
global _active_runner
|
||||
with _runner_lock:
|
||||
_active_runner = runner
|
||||
|
||||
|
||||
def get_active_runner() -> Optional["DebugReplayJobRunner"]:
|
||||
with _runner_lock:
|
||||
return _active_runner
|
||||
|
||||
|
||||
@dataclass
|
||||
class DebugReplayJob(Job):
|
||||
"""Job state for a debug replay startup."""
|
||||
|
||||
job_type: str = JOB_TYPE
|
||||
source_camera: str = ""
|
||||
replay_camera_name: str = ""
|
||||
start_ts: float = 0.0
|
||||
end_ts: float = 0.0
|
||||
current_step: Optional[str] = None
|
||||
progress_percent: float = 0.0
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Whitelisted payload for the job_state WS topic.
|
||||
|
||||
Replay-specific fields land in results so the frontend's
|
||||
generic Job<TResults> type can be parameterised cleanly.
|
||||
"""
|
||||
return {
|
||||
"id": self.id,
|
||||
"job_type": self.job_type,
|
||||
"status": self.status,
|
||||
"start_time": self.start_time,
|
||||
"end_time": self.end_time,
|
||||
"error_message": self.error_message,
|
||||
"results": {
|
||||
"current_step": self.current_step,
|
||||
"progress_percent": self.progress_percent,
|
||||
"source_camera": self.source_camera,
|
||||
"replay_camera_name": self.replay_camera_name,
|
||||
"start_ts": self.start_ts,
|
||||
"end_ts": self.end_ts,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def query_recordings(source_camera: str, start_ts: float, end_ts: float) -> ModelSelect:
|
||||
"""Return the Recordings query for the time range.
|
||||
|
||||
Module-level so tests can patch it without instantiating a runner.
|
||||
"""
|
||||
query = (
|
||||
Recordings.select(
|
||||
Recordings.path,
|
||||
Recordings.start_time,
|
||||
Recordings.end_time,
|
||||
)
|
||||
.where(
|
||||
Recordings.start_time.between(start_ts, end_ts)
|
||||
| Recordings.end_time.between(start_ts, end_ts)
|
||||
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
|
||||
)
|
||||
.where(Recordings.camera == source_camera)
|
||||
.order_by(Recordings.start_time.asc())
|
||||
)
|
||||
return cast(ModelSelect, query)
|
||||
|
||||
|
||||
class DebugReplayJobRunner(threading.Thread):
|
||||
"""Worker thread that drives the startup job to completion.
|
||||
|
||||
Owns the live ffmpeg Popen reference for cancellation. Cancellation
|
||||
is two-step (threading.Event + proc.terminate()) so the runner
|
||||
both knows it should stop and is unblocked from its blocking subprocess
|
||||
wait.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
job: DebugReplayJob,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
replay_manager: "DebugReplayManager",
|
||||
publisher: Optional[JobStatePublisher] = None,
|
||||
) -> None:
|
||||
super().__init__(daemon=True, name=f"debug_replay_{job.id}")
|
||||
self.job = job
|
||||
self.frigate_config = frigate_config
|
||||
self.config_publisher = config_publisher
|
||||
self.replay_manager = replay_manager
|
||||
self.publisher = publisher if publisher is not None else JobStatePublisher()
|
||||
self._cancel_event = threading.Event()
|
||||
self._active_process: sp.Popen | None = None
|
||||
self._proc_lock = threading.Lock()
|
||||
self._last_broadcast_monotonic: float = 0.0
|
||||
|
||||
def cancel(self) -> None:
|
||||
"""Request cancellation. Idempotent."""
|
||||
self._cancel_event.set()
|
||||
with self._proc_lock:
|
||||
proc = self._active_process
|
||||
if proc is not None:
|
||||
try:
|
||||
proc.terminate()
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to terminate ffmpeg subprocess: %s", exc)
|
||||
|
||||
def is_cancelled(self) -> bool:
|
||||
return self._cancel_event.is_set()
|
||||
|
||||
def _record_proc(self, proc: sp.Popen) -> None:
|
||||
with self._proc_lock:
|
||||
self._active_process = proc
|
||||
# Race: cancel arrived between Popen and _record_proc.
|
||||
if self._cancel_event.is_set():
|
||||
try:
|
||||
proc.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _broadcast(self, force: bool = False) -> None:
|
||||
now = time.monotonic()
|
||||
if (
|
||||
not force
|
||||
and now - self._last_broadcast_monotonic < PROGRESS_BROADCAST_MIN_INTERVAL
|
||||
):
|
||||
return
|
||||
self._last_broadcast_monotonic = now
|
||||
|
||||
try:
|
||||
self.publisher.publish(self.job.to_dict())
|
||||
except Exception as err:
|
||||
logger.warning("Publisher raised during job state broadcast: %s", err)
|
||||
|
||||
def run(self) -> None:
|
||||
replay_name = self.job.replay_camera_name
|
||||
os.makedirs(REPLAY_DIR, exist_ok=True)
|
||||
concat_file = os.path.join(REPLAY_DIR, f"{replay_name}_concat.txt")
|
||||
clip_path = os.path.join(REPLAY_DIR, f"{replay_name}.mp4")
|
||||
|
||||
self.job.status = JobStatusTypesEnum.running
|
||||
self.job.start_time = time.time()
|
||||
self.job.current_step = STEP_PREPARING_CLIP
|
||||
self._broadcast(force=True)
|
||||
|
||||
try:
|
||||
recordings = query_recordings(
|
||||
self.job.source_camera, self.job.start_ts, self.job.end_ts
|
||||
)
|
||||
with open(concat_file, "w") as f:
|
||||
for recording in recordings:
|
||||
f.write(f"file '{recording.path}'\n")
|
||||
|
||||
ffmpeg_cmd = [
|
||||
self.frigate_config.ffmpeg.ffmpeg_path,
|
||||
"-hide_banner",
|
||||
"-y",
|
||||
"-f",
|
||||
"concat",
|
||||
"-safe",
|
||||
"0",
|
||||
"-i",
|
||||
concat_file,
|
||||
"-c",
|
||||
"copy",
|
||||
"-movflags",
|
||||
"+faststart",
|
||||
clip_path,
|
||||
]
|
||||
|
||||
logger.info(
|
||||
"Generating replay clip for %s (%.1f - %.1f)",
|
||||
self.job.source_camera,
|
||||
self.job.start_ts,
|
||||
self.job.end_ts,
|
||||
)
|
||||
|
||||
def _on_progress(percent: float) -> None:
|
||||
self.job.progress_percent = percent
|
||||
self._broadcast()
|
||||
|
||||
try:
|
||||
returncode, stderr = run_ffmpeg_with_progress(
|
||||
ffmpeg_cmd,
|
||||
expected_duration_seconds=max(
|
||||
0.0, self.job.end_ts - self.job.start_ts
|
||||
),
|
||||
on_progress=_on_progress,
|
||||
process_started=self._record_proc,
|
||||
use_low_priority=True,
|
||||
)
|
||||
finally:
|
||||
with self._proc_lock:
|
||||
self._active_process = None
|
||||
|
||||
if self._cancel_event.is_set():
|
||||
self._finalize_cancelled(clip_path)
|
||||
return
|
||||
|
||||
if returncode != 0:
|
||||
raise RuntimeError(f"FFmpeg failed: {stderr[-500:]}")
|
||||
|
||||
if not os.path.exists(clip_path):
|
||||
raise RuntimeError("Clip file was not created")
|
||||
|
||||
self.job.current_step = STEP_STARTING_CAMERA
|
||||
self.job.progress_percent = 100.0
|
||||
self._broadcast(force=True)
|
||||
|
||||
if self._cancel_event.is_set():
|
||||
self._finalize_cancelled(clip_path)
|
||||
return
|
||||
|
||||
self.replay_manager.publish_camera(
|
||||
source_camera=self.job.source_camera,
|
||||
replay_name=replay_name,
|
||||
clip_path=clip_path,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.config_publisher,
|
||||
)
|
||||
self.replay_manager.mark_session_ready(clip_path)
|
||||
|
||||
self.job.status = JobStatusTypesEnum.success
|
||||
self.job.end_time = time.time()
|
||||
self._broadcast(force=True)
|
||||
logger.info(
|
||||
"Debug replay started: %s -> %s",
|
||||
self.job.source_camera,
|
||||
replay_name,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Debug replay startup failed")
|
||||
self.job.status = JobStatusTypesEnum.failed
|
||||
self.job.error_message = str(exc)
|
||||
self.job.end_time = time.time()
|
||||
self._broadcast(force=True)
|
||||
self.replay_manager.clear_session()
|
||||
_remove_silent(clip_path)
|
||||
finally:
|
||||
_remove_silent(concat_file)
|
||||
_set_active_runner(None)
|
||||
|
||||
def _finalize_cancelled(self, clip_path: str) -> None:
|
||||
logger.info("Debug replay startup cancelled")
|
||||
self.job.status = JobStatusTypesEnum.cancelled
|
||||
self.job.end_time = time.time()
|
||||
self._broadcast(force=True)
|
||||
# The caller of cancel_debug_replay_job (DebugReplayManager.stop) owns
|
||||
# session cleanup — db rows, filesystem artifacts, clear_session. We
|
||||
# only clean up the partial concat output we created.
|
||||
_remove_silent(clip_path)
|
||||
|
||||
|
||||
def _remove_silent(path: str) -> None:
|
||||
try:
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def start_debug_replay_job(
|
||||
*,
|
||||
source_camera: str,
|
||||
start_ts: float,
|
||||
end_ts: float,
|
||||
frigate_config: FrigateConfig,
|
||||
config_publisher: CameraConfigUpdatePublisher,
|
||||
replay_manager: "DebugReplayManager",
|
||||
) -> str:
|
||||
"""Validate, create job, start runner. Returns the job id.
|
||||
|
||||
Raises ValueError for bad params (camera missing, time range
|
||||
invalid, no recordings) and RuntimeError if a session is already
|
||||
active.
|
||||
"""
|
||||
if job_is_running(JOB_TYPE) or replay_manager.active:
|
||||
raise RuntimeError("A replay session is already active")
|
||||
|
||||
if source_camera not in frigate_config.cameras:
|
||||
raise ValueError(f"Camera '{source_camera}' not found")
|
||||
|
||||
if end_ts <= start_ts:
|
||||
raise ValueError("End time must be after start time")
|
||||
|
||||
recordings = query_recordings(source_camera, start_ts, end_ts)
|
||||
if not recordings.count():
|
||||
raise ValueError(
|
||||
f"No recordings found for camera '{source_camera}' in the specified time range"
|
||||
)
|
||||
|
||||
replay_name = f"{REPLAY_CAMERA_PREFIX}{source_camera}"
|
||||
replay_manager.mark_starting(
|
||||
source_camera=source_camera,
|
||||
replay_camera_name=replay_name,
|
||||
start_ts=start_ts,
|
||||
end_ts=end_ts,
|
||||
)
|
||||
|
||||
job = DebugReplayJob(
|
||||
source_camera=source_camera,
|
||||
replay_camera_name=replay_name,
|
||||
start_ts=start_ts,
|
||||
end_ts=end_ts,
|
||||
)
|
||||
set_current_job(job)
|
||||
|
||||
runner = DebugReplayJobRunner(
|
||||
job=job,
|
||||
frigate_config=frigate_config,
|
||||
config_publisher=config_publisher,
|
||||
replay_manager=replay_manager,
|
||||
)
|
||||
_set_active_runner(runner)
|
||||
runner.start()
|
||||
|
||||
return job.id
|
||||
|
||||
|
||||
def cancel_debug_replay_job() -> bool:
|
||||
"""Signal the active runner to cancel.
|
||||
|
||||
Returns True if a runner was signalled, False if no job was active.
|
||||
"""
|
||||
runner = get_active_runner()
|
||||
if runner is None:
|
||||
return False
|
||||
runner.cancel()
|
||||
return True
|
||||
|
||||
|
||||
def wait_for_runner(timeout: float = 2.0) -> bool:
|
||||
"""Join the active runner. Returns True if the runner ended in time."""
|
||||
runner = get_active_runner()
|
||||
if runner is None:
|
||||
return True
|
||||
runner.join(timeout=timeout)
|
||||
return not runner.is_alive()
|
||||
@ -23,13 +23,13 @@ from frigate.const import (
|
||||
EXPORT_DIR,
|
||||
MAX_PLAYLIST_SECONDS,
|
||||
PREVIEW_FRAME_TYPE,
|
||||
PROCESS_PRIORITY_LOW,
|
||||
)
|
||||
from frigate.ffmpeg_presets import (
|
||||
EncodeTypeEnum,
|
||||
parse_preset_hardware_acceleration_encode,
|
||||
)
|
||||
from frigate.models import Export, Previews, Recordings, ReviewSegment
|
||||
from frigate.util.ffmpeg import run_ffmpeg_with_progress
|
||||
from frigate.util.time import is_current_hour
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -243,107 +243,29 @@ class RecordingExporter(threading.Thread):
|
||||
|
||||
return total
|
||||
|
||||
def _inject_progress_flags(self, ffmpeg_cmd: list[str]) -> list[str]:
|
||||
"""Insert FFmpeg progress reporting flags before the output path.
|
||||
|
||||
``-progress pipe:2`` writes structured key=value lines to stderr,
|
||||
``-nostats`` suppresses the noisy default stats output.
|
||||
"""
|
||||
if not ffmpeg_cmd:
|
||||
return ffmpeg_cmd
|
||||
return ffmpeg_cmd[:-1] + ["-progress", "pipe:2", "-nostats", ffmpeg_cmd[-1]]
|
||||
|
||||
def _run_ffmpeg_with_progress(
|
||||
self,
|
||||
ffmpeg_cmd: list[str],
|
||||
playlist_lines: str | list[str],
|
||||
step: str = "encoding",
|
||||
) -> tuple[int, str]:
|
||||
"""Run an FFmpeg export command, parsing progress events from stderr.
|
||||
"""Delegate to the shared helper, mapping percent → (step, percent).
|
||||
|
||||
Returns ``(returncode, captured_stderr)``. Stdout is left attached to
|
||||
the parent process so we don't have to drain it (and risk a deadlock
|
||||
if the buffer fills). Progress percent is computed against the
|
||||
expected output duration; values are clamped to [0, 100] inside
|
||||
:py:meth:`_emit_progress`.
|
||||
Returns ``(returncode, captured_stderr)``.
|
||||
"""
|
||||
cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + self._inject_progress_flags(
|
||||
ffmpeg_cmd
|
||||
)
|
||||
|
||||
if isinstance(playlist_lines, list):
|
||||
stdin_payload = "\n".join(playlist_lines)
|
||||
else:
|
||||
stdin_payload = playlist_lines
|
||||
|
||||
expected_duration = self._expected_output_duration_seconds()
|
||||
|
||||
self._emit_progress(step, 0.0)
|
||||
|
||||
proc = sp.Popen(
|
||||
cmd,
|
||||
stdin=sp.PIPE,
|
||||
stderr=sp.PIPE,
|
||||
text=True,
|
||||
encoding="ascii",
|
||||
errors="replace",
|
||||
return run_ffmpeg_with_progress(
|
||||
ffmpeg_cmd,
|
||||
expected_duration_seconds=self._expected_output_duration_seconds(),
|
||||
on_progress=lambda percent: self._emit_progress(step, percent),
|
||||
stdin_payload=stdin_payload,
|
||||
use_low_priority=True,
|
||||
)
|
||||
|
||||
assert proc.stdin is not None
|
||||
assert proc.stderr is not None
|
||||
|
||||
try:
|
||||
proc.stdin.write(stdin_payload)
|
||||
except (BrokenPipeError, OSError):
|
||||
# FFmpeg may have rejected the input early; still wait for it
|
||||
# to terminate so the returncode is meaningful.
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
proc.stdin.close()
|
||||
except (BrokenPipeError, OSError):
|
||||
pass
|
||||
|
||||
captured: list[str] = []
|
||||
|
||||
try:
|
||||
for raw_line in proc.stderr:
|
||||
captured.append(raw_line)
|
||||
line = raw_line.strip()
|
||||
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith("out_time_us="):
|
||||
if expected_duration <= 0:
|
||||
continue
|
||||
try:
|
||||
out_time_us = int(line.split("=", 1)[1])
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
if out_time_us < 0:
|
||||
continue
|
||||
out_seconds = out_time_us / 1_000_000.0
|
||||
percent = (out_seconds / expected_duration) * 100.0
|
||||
self._emit_progress(step, percent)
|
||||
elif line == "progress=end":
|
||||
self._emit_progress(step, 100.0)
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("Failed reading FFmpeg progress for %s", self.export_id)
|
||||
|
||||
proc.wait()
|
||||
|
||||
# Drain any remaining stderr so callers can log it on failure.
|
||||
try:
|
||||
remaining = proc.stderr.read()
|
||||
if remaining:
|
||||
captured.append(remaining)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return proc.returncode, "".join(captured)
|
||||
|
||||
def get_datetime_from_timestamp(self, timestamp: int) -> str:
|
||||
# return in iso format using the configured ui.timezone when set,
|
||||
# so the auto-generated export name reflects local time rather
|
||||
|
||||
123
frigate/test/http_api/test_debug_replay_api.py
Normal file
123
frigate/test/http_api/test_debug_replay_api.py
Normal file
@ -0,0 +1,123 @@
|
||||
"""Tests for /debug_replay API endpoints."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
from frigate.models import Event, Recordings, ReviewSegment
|
||||
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
|
||||
|
||||
|
||||
class TestDebugReplayAPI(BaseTestHttp):
|
||||
def setUp(self):
|
||||
super().setUp([Event, Recordings, ReviewSegment])
|
||||
self.app = self.create_app()
|
||||
|
||||
def test_start_returns_202_with_job_id(self):
|
||||
# Stub the factory to skip validation/threading and just record the
|
||||
# name on the manager the way the real factory's mark_starting would.
|
||||
def fake_start(**kwargs):
|
||||
kwargs["replay_manager"].mark_starting(
|
||||
source_camera=kwargs["source_camera"],
|
||||
replay_camera_name="_replay_front",
|
||||
start_ts=kwargs["start_ts"],
|
||||
end_ts=kwargs["end_ts"],
|
||||
)
|
||||
return "job-1234"
|
||||
|
||||
with patch(
|
||||
"frigate.api.debug_replay.start_debug_replay_job",
|
||||
side_effect=fake_start,
|
||||
):
|
||||
with AuthTestClient(self.app) as client:
|
||||
resp = client.post(
|
||||
"/debug_replay/start",
|
||||
json={
|
||||
"camera": "front",
|
||||
"start_time": 100,
|
||||
"end_time": 200,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 202)
|
||||
body = resp.json()
|
||||
self.assertTrue(body["success"])
|
||||
self.assertEqual(body["job_id"], "job-1234")
|
||||
self.assertEqual(body["replay_camera"], "_replay_front")
|
||||
|
||||
def test_start_returns_400_on_validation_error(self):
|
||||
with patch(
|
||||
"frigate.api.debug_replay.start_debug_replay_job",
|
||||
side_effect=ValueError("Camera 'missing' not found"),
|
||||
):
|
||||
with AuthTestClient(self.app) as client:
|
||||
resp = client.post(
|
||||
"/debug_replay/start",
|
||||
json={
|
||||
"camera": "missing",
|
||||
"start_time": 100,
|
||||
"end_time": 200,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 400)
|
||||
body = resp.json()
|
||||
self.assertFalse(body["success"])
|
||||
# Message is hard-coded so we don't echo exception text back to clients
|
||||
# (CodeQL: information exposure through an exception).
|
||||
self.assertEqual(body["message"], "Invalid debug replay parameters")
|
||||
|
||||
def test_start_returns_409_when_session_already_active(self):
|
||||
with patch(
|
||||
"frigate.api.debug_replay.start_debug_replay_job",
|
||||
side_effect=RuntimeError("A replay session is already active"),
|
||||
):
|
||||
with AuthTestClient(self.app) as client:
|
||||
resp = client.post(
|
||||
"/debug_replay/start",
|
||||
json={
|
||||
"camera": "front",
|
||||
"start_time": 100,
|
||||
"end_time": 200,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 409)
|
||||
body = resp.json()
|
||||
self.assertFalse(body["success"])
|
||||
|
||||
def test_status_inactive_when_no_session(self):
|
||||
with AuthTestClient(self.app) as client:
|
||||
resp = client.get("/debug_replay/status")
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
body = resp.json()
|
||||
self.assertFalse(body["active"])
|
||||
self.assertIsNone(body["replay_camera"])
|
||||
self.assertIsNone(body["source_camera"])
|
||||
self.assertIsNone(body["start_time"])
|
||||
self.assertIsNone(body["end_time"])
|
||||
self.assertFalse(body["live_ready"])
|
||||
# Make sure deprecated fields are gone
|
||||
self.assertNotIn("state", body)
|
||||
self.assertNotIn("progress_percent", body)
|
||||
self.assertNotIn("error_message", body)
|
||||
|
||||
def test_status_active_after_mark_starting(self):
|
||||
manager = self.app.replay_manager
|
||||
manager.mark_starting(
|
||||
source_camera="front",
|
||||
replay_camera_name="_replay_front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
)
|
||||
|
||||
with AuthTestClient(self.app) as client:
|
||||
resp = client.get("/debug_replay/status")
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
body = resp.json()
|
||||
self.assertTrue(body["active"])
|
||||
self.assertEqual(body["replay_camera"], "_replay_front")
|
||||
self.assertEqual(body["source_camera"], "front")
|
||||
self.assertEqual(body["start_time"], 100.0)
|
||||
self.assertEqual(body["end_time"], 200.0)
|
||||
self.assertFalse(body["live_ready"])
|
||||
242
frigate/test/test_debug_replay.py
Normal file
242
frigate/test/test_debug_replay.py
Normal file
@ -0,0 +1,242 @@
|
||||
"""Tests for the simplified DebugReplayManager.
|
||||
|
||||
Startup orchestration lives in ``frigate.jobs.debug_replay`` (covered by
|
||||
``test_debug_replay_job``). The manager owns only session presence and
|
||||
cleanup.
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestDebugReplayManagerSession(unittest.TestCase):
|
||||
def test_inactive_by_default(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
|
||||
self.assertFalse(manager.active)
|
||||
self.assertIsNone(manager.replay_camera_name)
|
||||
self.assertIsNone(manager.source_camera)
|
||||
self.assertIsNone(manager.clip_path)
|
||||
self.assertIsNone(manager.start_ts)
|
||||
self.assertIsNone(manager.end_ts)
|
||||
|
||||
def test_mark_starting_sets_session_pointers_and_active(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
|
||||
manager.mark_starting(
|
||||
source_camera="front",
|
||||
replay_camera_name="_replay_front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
)
|
||||
|
||||
self.assertTrue(manager.active)
|
||||
self.assertEqual(manager.replay_camera_name, "_replay_front")
|
||||
self.assertEqual(manager.source_camera, "front")
|
||||
self.assertEqual(manager.start_ts, 100.0)
|
||||
self.assertEqual(manager.end_ts, 200.0)
|
||||
self.assertIsNone(manager.clip_path)
|
||||
|
||||
def test_mark_session_ready_sets_clip_path(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
|
||||
|
||||
manager.mark_session_ready(clip_path="/tmp/replay/_replay_front.mp4")
|
||||
|
||||
self.assertEqual(manager.clip_path, "/tmp/replay/_replay_front.mp4")
|
||||
self.assertTrue(manager.active)
|
||||
|
||||
def test_clear_session_resets_all_pointers(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
|
||||
manager.mark_session_ready("/tmp/replay/clip.mp4")
|
||||
|
||||
manager.clear_session()
|
||||
|
||||
self.assertFalse(manager.active)
|
||||
self.assertIsNone(manager.replay_camera_name)
|
||||
self.assertIsNone(manager.source_camera)
|
||||
self.assertIsNone(manager.clip_path)
|
||||
self.assertIsNone(manager.start_ts)
|
||||
self.assertIsNone(manager.end_ts)
|
||||
|
||||
|
||||
class TestDebugReplayManagerStop(unittest.TestCase):
|
||||
def test_stop_when_inactive_is_a_noop(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
frigate_config = MagicMock()
|
||||
frigate_config.cameras = {}
|
||||
publisher = MagicMock()
|
||||
|
||||
# Should not raise; should not publish any events.
|
||||
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
|
||||
|
||||
publisher.publish_update.assert_not_called()
|
||||
|
||||
def test_stop_publishes_remove_when_camera_was_published(self) -> None:
|
||||
from frigate.config.camera.updater import CameraConfigUpdateEnum
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
|
||||
manager.mark_session_ready("/tmp/replay/_replay_front.mp4")
|
||||
|
||||
camera_config = MagicMock()
|
||||
frigate_config = MagicMock()
|
||||
frigate_config.cameras = {"_replay_front": camera_config}
|
||||
publisher = MagicMock()
|
||||
|
||||
with (
|
||||
patch.object(manager, "_cleanup_db"),
|
||||
patch.object(manager, "_cleanup_files"),
|
||||
patch("frigate.debug_replay.cancel_debug_replay_job", return_value=False),
|
||||
):
|
||||
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
|
||||
|
||||
# One publish_update call with a remove topic.
|
||||
self.assertEqual(publisher.publish_update.call_count, 1)
|
||||
topic_arg = publisher.publish_update.call_args.args[0]
|
||||
self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.remove)
|
||||
self.assertFalse(manager.active)
|
||||
|
||||
def test_stop_skips_remove_publish_when_camera_not_in_config(self) -> None:
|
||||
"""Cancellation during preparing_clip: no camera was published yet."""
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
|
||||
# clip_path stays None because we cancelled before camera publish.
|
||||
|
||||
frigate_config = MagicMock()
|
||||
frigate_config.cameras = {} # _replay_front not present
|
||||
publisher = MagicMock()
|
||||
|
||||
with (
|
||||
patch.object(manager, "_cleanup_db"),
|
||||
patch.object(manager, "_cleanup_files"),
|
||||
patch("frigate.debug_replay.cancel_debug_replay_job", return_value=True),
|
||||
):
|
||||
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
|
||||
|
||||
publisher.publish_update.assert_not_called()
|
||||
self.assertFalse(manager.active)
|
||||
|
||||
def test_stop_calls_cancel_debug_replay_job(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
manager.mark_starting("front", "_replay_front", 100.0, 200.0)
|
||||
|
||||
frigate_config = MagicMock()
|
||||
frigate_config.cameras = {}
|
||||
publisher = MagicMock()
|
||||
|
||||
with (
|
||||
patch.object(manager, "_cleanup_db"),
|
||||
patch.object(manager, "_cleanup_files"),
|
||||
patch(
|
||||
"frigate.debug_replay.cancel_debug_replay_job",
|
||||
return_value=True,
|
||||
) as mock_cancel,
|
||||
):
|
||||
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
|
||||
|
||||
mock_cancel.assert_called_once()
|
||||
|
||||
|
||||
class TestDebugReplayManagerPublishCamera(unittest.TestCase):
|
||||
def test_publish_camera_invokes_publisher_with_add_topic(self) -> None:
|
||||
from frigate.config.camera.updater import CameraConfigUpdateEnum
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
|
||||
source_config = MagicMock()
|
||||
new_camera_config = MagicMock()
|
||||
frigate_config = MagicMock()
|
||||
frigate_config.cameras = {"front": source_config}
|
||||
publisher = MagicMock()
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
manager,
|
||||
"_build_camera_config_dict",
|
||||
return_value={"enabled": True},
|
||||
),
|
||||
patch("frigate.debug_replay.find_config_file", return_value="/cfg.yml"),
|
||||
patch("frigate.debug_replay.YAML") as yaml_cls,
|
||||
patch("frigate.debug_replay.FrigateConfig.parse_object") as parse_object,
|
||||
patch("builtins.open", unittest.mock.mock_open(read_data="cameras:\n")),
|
||||
):
|
||||
yaml_instance = yaml_cls.return_value
|
||||
yaml_instance.load.return_value = {"cameras": {}}
|
||||
parsed = MagicMock()
|
||||
parsed.cameras = {"_replay_front": new_camera_config}
|
||||
parse_object.return_value = parsed
|
||||
|
||||
manager.publish_camera(
|
||||
source_camera="front",
|
||||
replay_name="_replay_front",
|
||||
clip_path="/tmp/clip.mp4",
|
||||
frigate_config=frigate_config,
|
||||
config_publisher=publisher,
|
||||
)
|
||||
|
||||
# Camera registered into the live config dict
|
||||
self.assertIn("_replay_front", frigate_config.cameras)
|
||||
# Publisher invoked with an add topic
|
||||
self.assertEqual(publisher.publish_update.call_count, 1)
|
||||
topic_arg = publisher.publish_update.call_args.args[0]
|
||||
self.assertEqual(topic_arg.update_type, CameraConfigUpdateEnum.add)
|
||||
|
||||
def test_publish_camera_wraps_parse_failure_in_runtime_error(self) -> None:
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
|
||||
manager = DebugReplayManager()
|
||||
frigate_config = MagicMock()
|
||||
frigate_config.cameras = {"front": MagicMock()}
|
||||
publisher = MagicMock()
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
manager,
|
||||
"_build_camera_config_dict",
|
||||
return_value={"enabled": True},
|
||||
),
|
||||
patch("frigate.debug_replay.find_config_file", return_value="/cfg.yml"),
|
||||
patch("frigate.debug_replay.YAML") as yaml_cls,
|
||||
patch(
|
||||
"frigate.debug_replay.FrigateConfig.parse_object",
|
||||
side_effect=ValueError("zone foo has invalid coordinates"),
|
||||
),
|
||||
patch("builtins.open", unittest.mock.mock_open(read_data="cameras:\n")),
|
||||
):
|
||||
yaml_cls.return_value.load.return_value = {"cameras": {}}
|
||||
|
||||
with self.assertRaises(RuntimeError) as ctx:
|
||||
manager.publish_camera(
|
||||
source_camera="front",
|
||||
replay_name="_replay_front",
|
||||
clip_path="/tmp/clip.mp4",
|
||||
frigate_config=frigate_config,
|
||||
config_publisher=publisher,
|
||||
)
|
||||
|
||||
self.assertIn("replay camera config", str(ctx.exception))
|
||||
self.assertIn("invalid coordinates", str(ctx.exception))
|
||||
publisher.publish_update.assert_not_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
460
frigate/test/test_debug_replay_job.py
Normal file
460
frigate/test/test_debug_replay_job.py
Normal file
@ -0,0 +1,460 @@
|
||||
"""Tests for the debug replay job runner and factory."""
|
||||
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from frigate.debug_replay import DebugReplayManager
|
||||
from frigate.jobs.debug_replay import (
|
||||
DebugReplayJob,
|
||||
cancel_debug_replay_job,
|
||||
get_active_runner,
|
||||
start_debug_replay_job,
|
||||
)
|
||||
from frigate.jobs.export import JobStatePublisher
|
||||
from frigate.jobs.manager import _completed_jobs, _current_jobs
|
||||
from frigate.types import JobStatusTypesEnum
|
||||
|
||||
|
||||
def _reset_job_manager() -> None:
|
||||
"""Clear the global job manager state between tests."""
|
||||
_current_jobs.clear()
|
||||
_completed_jobs.clear()
|
||||
|
||||
|
||||
def _patch_publisher(test_case: unittest.TestCase) -> None:
|
||||
"""Replace JobStatePublisher.publish with a no-op to avoid hanging on IPC."""
|
||||
publisher_patch = patch.object(
|
||||
JobStatePublisher, "publish", lambda self, payload: None
|
||||
)
|
||||
publisher_patch.start()
|
||||
test_case.addCleanup(publisher_patch.stop)
|
||||
|
||||
|
||||
class TestDebugReplayJob(unittest.TestCase):
|
||||
def test_default_fields(self) -> None:
|
||||
job = DebugReplayJob()
|
||||
|
||||
self.assertEqual(job.job_type, "debug_replay")
|
||||
self.assertEqual(job.status, JobStatusTypesEnum.queued)
|
||||
self.assertIsNone(job.current_step)
|
||||
self.assertEqual(job.progress_percent, 0.0)
|
||||
|
||||
def test_to_dict_whitelist(self) -> None:
|
||||
job = DebugReplayJob(
|
||||
source_camera="front",
|
||||
replay_camera_name="_replay_front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
)
|
||||
job.current_step = "preparing_clip"
|
||||
job.progress_percent = 42.5
|
||||
|
||||
payload = job.to_dict()
|
||||
|
||||
# Top-level matches the standard Job<TResults> shape.
|
||||
for key in (
|
||||
"id",
|
||||
"job_type",
|
||||
"status",
|
||||
"start_time",
|
||||
"end_time",
|
||||
"error_message",
|
||||
"results",
|
||||
):
|
||||
self.assertIn(key, payload, f"missing top-level field: {key}")
|
||||
|
||||
results = payload["results"]
|
||||
self.assertEqual(results["source_camera"], "front")
|
||||
self.assertEqual(results["replay_camera_name"], "_replay_front")
|
||||
self.assertEqual(results["current_step"], "preparing_clip")
|
||||
self.assertEqual(results["progress_percent"], 42.5)
|
||||
self.assertEqual(results["start_ts"], 100.0)
|
||||
self.assertEqual(results["end_ts"], 200.0)
|
||||
|
||||
|
||||
class TestStartDebugReplayJob(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
_reset_job_manager()
|
||||
_patch_publisher(self)
|
||||
self.manager = DebugReplayManager()
|
||||
self.frigate_config = MagicMock()
|
||||
self.frigate_config.cameras = {"front": MagicMock()}
|
||||
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
|
||||
self.publisher = MagicMock()
|
||||
|
||||
self.recordings_qs = MagicMock()
|
||||
self.recordings_qs.count.return_value = 1
|
||||
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
|
||||
|
||||
def tearDown(self) -> None:
|
||||
runner = get_active_runner()
|
||||
if runner is not None:
|
||||
runner.cancel()
|
||||
runner.join(timeout=2.0)
|
||||
_reset_job_manager()
|
||||
|
||||
def test_rejects_unknown_camera(self) -> None:
|
||||
with self.assertRaises(ValueError):
|
||||
start_debug_replay_job(
|
||||
source_camera="missing",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
def test_rejects_invalid_time_range(self) -> None:
|
||||
with self.assertRaises(ValueError):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=200.0,
|
||||
end_ts=100.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
def test_rejects_when_no_recordings(self) -> None:
|
||||
empty_qs = MagicMock()
|
||||
empty_qs.count.return_value = 0
|
||||
with patch("frigate.jobs.debug_replay.query_recordings", return_value=empty_qs):
|
||||
with self.assertRaises(ValueError):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
def test_returns_job_id_and_marks_session_starting(self) -> None:
|
||||
block = threading.Event()
|
||||
|
||||
def slow_helper(cmd, **kwargs):
|
||||
block.wait(timeout=5)
|
||||
return 0, ""
|
||||
|
||||
with (
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.query_recordings",
|
||||
return_value=self.recordings_qs,
|
||||
),
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
|
||||
side_effect=slow_helper,
|
||||
),
|
||||
patch.object(self.manager, "publish_camera"),
|
||||
patch("os.path.exists", return_value=True),
|
||||
patch("os.makedirs"),
|
||||
patch("builtins.open", unittest.mock.mock_open()),
|
||||
):
|
||||
job_id = start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
self.assertIsInstance(job_id, str)
|
||||
self.assertTrue(self.manager.active)
|
||||
self.assertEqual(self.manager.replay_camera_name, "_replay_front")
|
||||
self.assertEqual(self.manager.source_camera, "front")
|
||||
|
||||
block.set()
|
||||
|
||||
def test_rejects_concurrent_calls(self) -> None:
|
||||
block = threading.Event()
|
||||
|
||||
def slow_helper(cmd, **kwargs):
|
||||
block.wait(timeout=5)
|
||||
return 0, ""
|
||||
|
||||
with (
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.query_recordings",
|
||||
return_value=self.recordings_qs,
|
||||
),
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
|
||||
side_effect=slow_helper,
|
||||
),
|
||||
patch.object(self.manager, "publish_camera"),
|
||||
patch("os.path.exists", return_value=True),
|
||||
patch("os.makedirs"),
|
||||
patch("builtins.open", unittest.mock.mock_open()),
|
||||
):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
block.set()
|
||||
|
||||
|
||||
class TestRunnerHappyPath(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
_reset_job_manager()
|
||||
_patch_publisher(self)
|
||||
self.manager = DebugReplayManager()
|
||||
self.frigate_config = MagicMock()
|
||||
self.frigate_config.cameras = {"front": MagicMock()}
|
||||
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
|
||||
self.publisher = MagicMock()
|
||||
|
||||
self.recordings_qs = MagicMock()
|
||||
self.recordings_qs.count.return_value = 1
|
||||
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
|
||||
|
||||
def tearDown(self) -> None:
|
||||
runner = get_active_runner()
|
||||
if runner is not None:
|
||||
runner.cancel()
|
||||
runner.join(timeout=2.0)
|
||||
_reset_job_manager()
|
||||
|
||||
def _wait_for(self, predicate, timeout: float = 5.0) -> bool:
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
if predicate():
|
||||
return True
|
||||
time.sleep(0.02)
|
||||
return False
|
||||
|
||||
def test_progress_callback_updates_job_percent(self) -> None:
|
||||
captured: list[float] = []
|
||||
|
||||
def fake_helper(cmd, *, on_progress=None, **kwargs):
|
||||
on_progress(0.0)
|
||||
on_progress(50.0)
|
||||
on_progress(100.0)
|
||||
return 0, ""
|
||||
|
||||
with (
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.query_recordings",
|
||||
return_value=self.recordings_qs,
|
||||
),
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
|
||||
side_effect=fake_helper,
|
||||
),
|
||||
patch.object(
|
||||
self.manager,
|
||||
"publish_camera",
|
||||
side_effect=lambda *a, **kw: captured.append("published"),
|
||||
),
|
||||
patch("os.path.exists", return_value=True),
|
||||
patch("os.makedirs"),
|
||||
patch("builtins.open", unittest.mock.mock_open()),
|
||||
):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
self.assertTrue(
|
||||
self._wait_for(lambda: get_active_runner() is None),
|
||||
"runner did not finish",
|
||||
)
|
||||
|
||||
from frigate.jobs.manager import get_current_job
|
||||
|
||||
job = get_current_job("debug_replay")
|
||||
self.assertIsNotNone(job)
|
||||
self.assertEqual(job.status, JobStatusTypesEnum.success)
|
||||
self.assertEqual(job.progress_percent, 100.0)
|
||||
self.assertEqual(captured, ["published"])
|
||||
# Manager should have been told the session is ready with the clip path.
|
||||
self.assertIsNotNone(self.manager.clip_path)
|
||||
|
||||
|
||||
class TestRunnerFailurePath(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
_reset_job_manager()
|
||||
_patch_publisher(self)
|
||||
self.manager = DebugReplayManager()
|
||||
self.frigate_config = MagicMock()
|
||||
self.frigate_config.cameras = {"front": MagicMock()}
|
||||
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
|
||||
self.publisher = MagicMock()
|
||||
self.recordings_qs = MagicMock()
|
||||
self.recordings_qs.count.return_value = 1
|
||||
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
|
||||
|
||||
def tearDown(self) -> None:
|
||||
runner = get_active_runner()
|
||||
if runner is not None:
|
||||
runner.cancel()
|
||||
runner.join(timeout=2.0)
|
||||
_reset_job_manager()
|
||||
|
||||
def _wait_for(self, predicate, timeout: float = 5.0) -> bool:
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
if predicate():
|
||||
return True
|
||||
time.sleep(0.02)
|
||||
return False
|
||||
|
||||
def test_ffmpeg_failure_marks_job_failed_and_clears_session(self) -> None:
|
||||
def failing_helper(cmd, **kwargs):
|
||||
return 1, "ffmpeg exploded"
|
||||
|
||||
with (
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.query_recordings",
|
||||
return_value=self.recordings_qs,
|
||||
),
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
|
||||
side_effect=failing_helper,
|
||||
),
|
||||
patch("os.path.exists", return_value=True),
|
||||
patch("os.makedirs"),
|
||||
patch("os.remove"),
|
||||
patch("builtins.open", unittest.mock.mock_open()),
|
||||
):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
self.assertTrue(
|
||||
self._wait_for(lambda: get_active_runner() is None),
|
||||
"runner did not finish",
|
||||
)
|
||||
|
||||
from frigate.jobs.manager import get_current_job
|
||||
|
||||
job = get_current_job("debug_replay")
|
||||
self.assertIsNotNone(job)
|
||||
self.assertEqual(job.status, JobStatusTypesEnum.failed)
|
||||
self.assertIsNotNone(job.error_message)
|
||||
self.assertIn("ffmpeg", job.error_message.lower())
|
||||
# Session cleared so a new /start is allowed
|
||||
self.assertFalse(self.manager.active)
|
||||
|
||||
|
||||
class TestRunnerCancellation(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
_reset_job_manager()
|
||||
_patch_publisher(self)
|
||||
self.manager = DebugReplayManager()
|
||||
self.frigate_config = MagicMock()
|
||||
self.frigate_config.cameras = {"front": MagicMock()}
|
||||
self.frigate_config.ffmpeg.ffmpeg_path = "/bin/true"
|
||||
self.publisher = MagicMock()
|
||||
self.recordings_qs = MagicMock()
|
||||
self.recordings_qs.count.return_value = 1
|
||||
self.recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
|
||||
|
||||
def tearDown(self) -> None:
|
||||
runner = get_active_runner()
|
||||
if runner is not None:
|
||||
runner.cancel()
|
||||
runner.join(timeout=2.0)
|
||||
_reset_job_manager()
|
||||
|
||||
def _wait_for(self, predicate, timeout: float = 5.0) -> bool:
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
if predicate():
|
||||
return True
|
||||
time.sleep(0.02)
|
||||
return False
|
||||
|
||||
def test_cancel_terminates_ffmpeg_and_marks_cancelled(self) -> None:
|
||||
terminated = threading.Event()
|
||||
fake_proc = MagicMock()
|
||||
fake_proc.terminate = MagicMock(side_effect=lambda: terminated.set())
|
||||
|
||||
def fake_helper(cmd, *, process_started=None, **kwargs):
|
||||
if process_started is not None:
|
||||
process_started(fake_proc)
|
||||
terminated.wait(timeout=5)
|
||||
return -15, "killed"
|
||||
|
||||
with (
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.query_recordings",
|
||||
return_value=self.recordings_qs,
|
||||
),
|
||||
patch(
|
||||
"frigate.jobs.debug_replay.run_ffmpeg_with_progress",
|
||||
side_effect=fake_helper,
|
||||
),
|
||||
patch("os.path.exists", return_value=True),
|
||||
patch("os.makedirs"),
|
||||
patch("os.remove"),
|
||||
patch("builtins.open", unittest.mock.mock_open()),
|
||||
):
|
||||
start_debug_replay_job(
|
||||
source_camera="front",
|
||||
start_ts=100.0,
|
||||
end_ts=200.0,
|
||||
frigate_config=self.frigate_config,
|
||||
config_publisher=self.publisher,
|
||||
replay_manager=self.manager,
|
||||
)
|
||||
|
||||
# Wait for the runner to register the active process.
|
||||
self.assertTrue(
|
||||
self._wait_for(
|
||||
lambda: (
|
||||
get_active_runner() is not None
|
||||
and get_active_runner()._active_process is fake_proc
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
cancelled = cancel_debug_replay_job()
|
||||
self.assertTrue(cancelled)
|
||||
self.assertTrue(fake_proc.terminate.called)
|
||||
|
||||
self.assertTrue(
|
||||
self._wait_for(lambda: get_active_runner() is None),
|
||||
"runner did not finish",
|
||||
)
|
||||
|
||||
from frigate.jobs.manager import get_current_job
|
||||
|
||||
job = get_current_job("debug_replay")
|
||||
self.assertEqual(job.status, JobStatusTypesEnum.cancelled)
|
||||
# Runner must not clear the manager session on cancellation —
|
||||
# that belongs to the caller of cancel_debug_replay_job (stop()).
|
||||
# If the runner cleared it, stop() would log "no active session"
|
||||
# and skip its cleanup_db / cleanup_files calls.
|
||||
self.assertTrue(self.manager.active)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@ -14,6 +14,7 @@ from frigate.jobs.export import (
|
||||
)
|
||||
from frigate.record.export import PlaybackSourceEnum, RecordingExporter
|
||||
from frigate.types import JobStatusTypesEnum
|
||||
from frigate.util.ffmpeg import inject_progress_flags
|
||||
|
||||
|
||||
def _make_exporter(
|
||||
@ -118,10 +119,9 @@ class TestExpectedOutputDuration(unittest.TestCase):
|
||||
|
||||
class TestProgressFlagInjection(unittest.TestCase):
|
||||
def test_inserts_before_output_path(self) -> None:
|
||||
exporter = _make_exporter()
|
||||
cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"]
|
||||
|
||||
result = exporter._inject_progress_flags(cmd)
|
||||
result = inject_progress_flags(cmd)
|
||||
|
||||
assert result == [
|
||||
"ffmpeg",
|
||||
@ -136,8 +136,7 @@ class TestProgressFlagInjection(unittest.TestCase):
|
||||
]
|
||||
|
||||
def test_handles_empty_cmd(self) -> None:
|
||||
exporter = _make_exporter()
|
||||
assert exporter._inject_progress_flags([]) == []
|
||||
assert inject_progress_flags([]) == []
|
||||
|
||||
|
||||
class TestFfmpegProgressParsing(unittest.TestCase):
|
||||
@ -167,7 +166,7 @@ class TestFfmpegProgressParsing(unittest.TestCase):
|
||||
fake_proc.returncode = 0
|
||||
fake_proc.wait = MagicMock(return_value=0)
|
||||
|
||||
with patch("frigate.record.export.sp.Popen", return_value=fake_proc):
|
||||
with patch("frigate.util.ffmpeg.sp.Popen", return_value=fake_proc):
|
||||
returncode, _stderr = exporter._run_ffmpeg_with_progress(
|
||||
["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding"
|
||||
)
|
||||
|
||||
111
frigate/test/test_ffmpeg_progress.py
Normal file
111
frigate/test/test_ffmpeg_progress.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""Tests for the shared ffmpeg progress helper."""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from frigate.util.ffmpeg import inject_progress_flags, run_ffmpeg_with_progress
|
||||
|
||||
|
||||
class TestInjectProgressFlags(unittest.TestCase):
|
||||
def test_inserts_flags_before_output_path(self):
|
||||
cmd = ["ffmpeg", "-i", "in.mp4", "-c", "copy", "out.mp4"]
|
||||
result = inject_progress_flags(cmd)
|
||||
self.assertEqual(
|
||||
result,
|
||||
[
|
||||
"ffmpeg",
|
||||
"-i",
|
||||
"in.mp4",
|
||||
"-c",
|
||||
"copy",
|
||||
"-progress",
|
||||
"pipe:2",
|
||||
"-nostats",
|
||||
"out.mp4",
|
||||
],
|
||||
)
|
||||
|
||||
def test_empty_cmd_returns_empty(self):
|
||||
self.assertEqual(inject_progress_flags([]), [])
|
||||
|
||||
|
||||
class TestRunFfmpegWithProgress(unittest.TestCase):
|
||||
def _make_fake_proc(self, stderr_lines, returncode=0):
|
||||
proc = MagicMock()
|
||||
proc.stderr = iter(stderr_lines)
|
||||
proc.stdin = MagicMock()
|
||||
proc.returncode = returncode
|
||||
proc.wait = MagicMock()
|
||||
return proc
|
||||
|
||||
def test_emits_percent_from_out_time_us_lines(self):
|
||||
captured: list[float] = []
|
||||
|
||||
def on_progress(percent: float) -> None:
|
||||
captured.append(percent)
|
||||
|
||||
stderr_lines = [
|
||||
"out_time_us=1000000\n",
|
||||
"out_time_us=5000000\n",
|
||||
"progress=end\n",
|
||||
]
|
||||
proc = self._make_fake_proc(stderr_lines)
|
||||
proc.stderr = MagicMock()
|
||||
proc.stderr.__iter__ = lambda self: iter(stderr_lines)
|
||||
proc.stderr.read = MagicMock(return_value="")
|
||||
|
||||
with patch("subprocess.Popen", return_value=proc):
|
||||
returncode, _stderr = run_ffmpeg_with_progress(
|
||||
["ffmpeg", "-i", "in", "out"],
|
||||
expected_duration_seconds=10.0,
|
||||
on_progress=on_progress,
|
||||
use_low_priority=False,
|
||||
)
|
||||
|
||||
self.assertEqual(returncode, 0)
|
||||
self.assertEqual(len(captured), 4) # initial 0.0 + two parsed + final 100.0
|
||||
self.assertAlmostEqual(captured[0], 0.0)
|
||||
self.assertAlmostEqual(captured[1], 10.0)
|
||||
self.assertAlmostEqual(captured[2], 50.0)
|
||||
self.assertAlmostEqual(captured[3], 100.0)
|
||||
|
||||
def test_passes_started_process_to_callback(self):
|
||||
proc = self._make_fake_proc([])
|
||||
proc.stderr = MagicMock()
|
||||
proc.stderr.__iter__ = lambda self: iter([])
|
||||
proc.stderr.read = MagicMock(return_value="")
|
||||
|
||||
seen: list = []
|
||||
|
||||
with patch("subprocess.Popen", return_value=proc):
|
||||
run_ffmpeg_with_progress(
|
||||
["ffmpeg", "out"],
|
||||
expected_duration_seconds=1.0,
|
||||
process_started=lambda p: seen.append(p),
|
||||
use_low_priority=False,
|
||||
)
|
||||
|
||||
self.assertEqual(seen, [proc])
|
||||
|
||||
def test_clamps_percent_to_0_100(self):
|
||||
captured: list[float] = []
|
||||
|
||||
def on_progress(percent: float) -> None:
|
||||
captured.append(percent)
|
||||
|
||||
stderr_lines = ["out_time_us=999999999999\n"]
|
||||
proc = self._make_fake_proc(stderr_lines)
|
||||
proc.stderr = MagicMock()
|
||||
proc.stderr.__iter__ = lambda self: iter(stderr_lines)
|
||||
proc.stderr.read = MagicMock(return_value="")
|
||||
|
||||
with patch("subprocess.Popen", return_value=proc):
|
||||
run_ffmpeg_with_progress(
|
||||
["ffmpeg", "out"],
|
||||
expected_duration_seconds=10.0,
|
||||
on_progress=on_progress,
|
||||
use_low_priority=False,
|
||||
)
|
||||
|
||||
# initial 0.0 then a clamped reading
|
||||
self.assertEqual(captured[-1], 100.0)
|
||||
@ -2,8 +2,9 @@
|
||||
|
||||
import logging
|
||||
import subprocess as sp
|
||||
from typing import Any
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from frigate.const import PROCESS_PRIORITY_LOW
|
||||
from frigate.log import LogPipe
|
||||
|
||||
|
||||
@ -46,3 +47,124 @@ def start_or_restart_ffmpeg(
|
||||
start_new_session=True,
|
||||
)
|
||||
return process
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def inject_progress_flags(cmd: list[str]) -> list[str]:
|
||||
"""Insert `-progress pipe:2 -nostats` immediately before the output path.
|
||||
|
||||
`-progress pipe:2` writes structured key=value lines to stderr;
|
||||
`-nostats` suppresses the noisy default stats output. The output path
|
||||
is conventionally the last token in an FFmpeg argv.
|
||||
"""
|
||||
if not cmd:
|
||||
return cmd
|
||||
return cmd[:-1] + ["-progress", "pipe:2", "-nostats", cmd[-1]]
|
||||
|
||||
|
||||
def run_ffmpeg_with_progress(
|
||||
cmd: list[str],
|
||||
*,
|
||||
expected_duration_seconds: float,
|
||||
on_progress: Optional[Callable[[float], None]] = None,
|
||||
stdin_payload: Optional[str] = None,
|
||||
process_started: Optional[Callable[[sp.Popen], None]] = None,
|
||||
use_low_priority: bool = True,
|
||||
) -> tuple[int, str]:
|
||||
"""Run an ffmpeg command, streaming progress via `-progress pipe:2`.
|
||||
|
||||
Args:
|
||||
cmd: ffmpeg argv. Output path must be the last token.
|
||||
expected_duration_seconds: Duration of the expected output clip in
|
||||
seconds. Used to convert ffmpeg's `out_time_us` into a percent.
|
||||
on_progress: Optional callback invoked with a percent in [0, 100].
|
||||
Called once with 0.0 at start, again on each `out_time_us=`
|
||||
stderr line, and once with 100.0 on `progress=end`.
|
||||
stdin_payload: Optional string written to ffmpeg stdin (used by
|
||||
export for concat playlists).
|
||||
process_started: Optional callback invoked with the live `Popen`
|
||||
once spawned — lets callers store the ref for cancellation.
|
||||
use_low_priority: When True, prepend `nice -n PROCESS_PRIORITY_LOW`
|
||||
so concat doesn't starve detection.
|
||||
|
||||
Returns:
|
||||
Tuple of `(returncode, captured_stderr)`. Stdout is left attached
|
||||
to the parent process to avoid buffer-full deadlocks.
|
||||
"""
|
||||
full_cmd = inject_progress_flags(cmd)
|
||||
if use_low_priority:
|
||||
full_cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + full_cmd
|
||||
|
||||
def emit(percent: float) -> None:
|
||||
if on_progress is None:
|
||||
return
|
||||
try:
|
||||
on_progress(max(0.0, min(100.0, percent)))
|
||||
except Exception:
|
||||
logger.exception("FFmpeg progress callback failed")
|
||||
|
||||
emit(0.0)
|
||||
|
||||
proc = sp.Popen(
|
||||
full_cmd,
|
||||
stdin=sp.PIPE if stdin_payload is not None else None,
|
||||
stderr=sp.PIPE,
|
||||
text=True,
|
||||
encoding="ascii",
|
||||
errors="replace",
|
||||
)
|
||||
if process_started is not None:
|
||||
try:
|
||||
process_started(proc)
|
||||
except Exception:
|
||||
logger.exception("FFmpeg process_started callback failed")
|
||||
|
||||
if stdin_payload is not None and proc.stdin is not None:
|
||||
try:
|
||||
proc.stdin.write(stdin_payload)
|
||||
except (BrokenPipeError, OSError):
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
proc.stdin.close()
|
||||
except (BrokenPipeError, OSError):
|
||||
pass
|
||||
|
||||
captured: list[str] = []
|
||||
if proc.stderr is not None:
|
||||
try:
|
||||
for raw_line in proc.stderr:
|
||||
captured.append(raw_line)
|
||||
line = raw_line.strip()
|
||||
if not line:
|
||||
continue
|
||||
if line.startswith("out_time_us="):
|
||||
if expected_duration_seconds <= 0:
|
||||
continue
|
||||
try:
|
||||
out_time_us = int(line.split("=", 1)[1])
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
if out_time_us < 0:
|
||||
continue
|
||||
out_seconds = out_time_us / 1_000_000.0
|
||||
emit((out_seconds / expected_duration_seconds) * 100.0)
|
||||
elif line == "progress=end":
|
||||
emit(100.0)
|
||||
break
|
||||
except Exception:
|
||||
logger.exception("Failed reading FFmpeg progress stream")
|
||||
|
||||
proc.wait()
|
||||
|
||||
if proc.stderr is not None:
|
||||
try:
|
||||
remaining = proc.stderr.read()
|
||||
if remaining:
|
||||
captured.append(remaining)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return proc.returncode or 0, "".join(captured)
|
||||
|
||||
@ -31,7 +31,7 @@ test.describe("Replay — no active session @medium", () => {
|
||||
await expect(
|
||||
frigateApp.page.getByRole("heading", {
|
||||
level: 2,
|
||||
name: /No Active Replay Session/i,
|
||||
name: /No Active Debug Replay Session/i,
|
||||
}),
|
||||
).toBeVisible({ timeout: 10_000 });
|
||||
const goButton = frigateApp.page.getByRole("button", {
|
||||
@ -48,7 +48,7 @@ test.describe("Replay — no active session @medium", () => {
|
||||
await expect(
|
||||
frigateApp.page.getByRole("heading", {
|
||||
level: 2,
|
||||
name: /No Active Replay Session/i,
|
||||
name: /No Active Debug Replay Session/i,
|
||||
}),
|
||||
).toBeVisible({ timeout: 10_000 });
|
||||
await frigateApp.page
|
||||
@ -297,7 +297,7 @@ test.describe("Replay — mobile @medium @mobile", () => {
|
||||
await expect(
|
||||
frigateApp.page.getByRole("heading", {
|
||||
level: 2,
|
||||
name: /No Active Replay Session/i,
|
||||
name: /No Active Debug Replay Session/i,
|
||||
}),
|
||||
).toBeVisible({ timeout: 10_000 });
|
||||
});
|
||||
|
||||
@ -19,26 +19,31 @@
|
||||
"startLabel": "Start",
|
||||
"endLabel": "End",
|
||||
"toast": {
|
||||
"success": "Debug replay started successfully",
|
||||
"error": "Failed to start debug replay: {{error}}",
|
||||
"alreadyActive": "A replay session is already active",
|
||||
"stopped": "Debug replay stopped",
|
||||
"stopError": "Failed to stop debug replay: {{error}}",
|
||||
"goToReplay": "Go to Replay"
|
||||
}
|
||||
},
|
||||
"page": {
|
||||
"noSession": "No Active Replay Session",
|
||||
"noSessionDesc": "Start a debug replay from the History view by clicking the Debug Replay button in the toolbar.",
|
||||
"noSession": "No Active Debug Replay Session",
|
||||
"noSessionDesc": "Start a Debug Replay from History view by clicking the Actions button in the toolbar and choosing Debug Replay.",
|
||||
"goToRecordings": "Go to History",
|
||||
"preparingClip": "Preparing clip…",
|
||||
"preparingClipDesc": "Frigate is stitching together recordings for the selected time range. This can take a minute for longer ranges.",
|
||||
"startingCamera": "Starting Debug Replay…",
|
||||
"startError": {
|
||||
"title": "Failed to start Debug Replay",
|
||||
"back": "Back to History"
|
||||
},
|
||||
"sourceCamera": "Source Camera",
|
||||
"replayCamera": "Replay Camera",
|
||||
"initializingReplay": "Initializing replay...",
|
||||
"stoppingReplay": "Stopping replay...",
|
||||
"initializingReplay": "Initializing Debug Replay...",
|
||||
"stoppingReplay": "Stopping Debug Replay...",
|
||||
"stopReplay": "Stop Replay",
|
||||
"confirmStop": {
|
||||
"title": "Stop Debug Replay?",
|
||||
"description": "This will stop the replay session and clean up all temporary data. Are you sure?",
|
||||
"description": "This will stop the session and clean up all temporary data. Are you sure?",
|
||||
"confirm": "Stop Replay",
|
||||
"cancel": "Cancel"
|
||||
},
|
||||
@ -49,6 +54,6 @@
|
||||
"activeTracking": "Active tracking",
|
||||
"noActiveTracking": "No active tracking",
|
||||
"configuration": "Configuration",
|
||||
"configurationDesc": "Fine tune motion detection and object tracking settings for the debug replay camera. No changes are saved to your Frigate configuration file."
|
||||
"configurationDesc": "Fine tune motion detection and object tracking settings for the Debug Replay camera. No changes are saved to your Frigate configuration file."
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,10 +90,6 @@ export default function SearchResultActions({
|
||||
const handleDebugReplay = useCallback(
|
||||
(event: SearchResult) => {
|
||||
setIsStarting(true);
|
||||
const toastId = toast.loading(
|
||||
t("dialog.starting", { ns: "views/replay" }),
|
||||
{ position: "top-center" },
|
||||
);
|
||||
|
||||
axios
|
||||
.post("debug_replay/start", {
|
||||
@ -102,11 +98,7 @@ export default function SearchResultActions({
|
||||
end_time: event.end_time,
|
||||
})
|
||||
.then((response) => {
|
||||
if (response.status === 200) {
|
||||
toast.success(t("dialog.toast.success", { ns: "views/replay" }), {
|
||||
id: toastId,
|
||||
position: "top-center",
|
||||
});
|
||||
if (response.status === 202 || response.status === 200) {
|
||||
navigate("/replay");
|
||||
}
|
||||
})
|
||||
@ -120,7 +112,6 @@ export default function SearchResultActions({
|
||||
toast.error(
|
||||
t("dialog.toast.alreadyActive", { ns: "views/replay" }),
|
||||
{
|
||||
id: toastId,
|
||||
position: "top-center",
|
||||
closeButton: true,
|
||||
dismissible: false,
|
||||
@ -135,7 +126,6 @@ export default function SearchResultActions({
|
||||
);
|
||||
} else {
|
||||
toast.error(t("dialog.toast.error", { error: errorMessage }), {
|
||||
id: toastId,
|
||||
position: "top-center",
|
||||
});
|
||||
}
|
||||
|
||||
@ -209,10 +209,7 @@ export default function DebugReplayDialog({
|
||||
end_time: range.before,
|
||||
})
|
||||
.then((response) => {
|
||||
if (response.status === 200) {
|
||||
toast.success(t("dialog.toast.success"), {
|
||||
position: "top-center",
|
||||
});
|
||||
if (response.status === 202 || response.status === 200) {
|
||||
setMode("none");
|
||||
setRange(undefined);
|
||||
navigate("/replay");
|
||||
|
||||
@ -262,10 +262,7 @@ export default function MobileReviewSettingsDrawer({
|
||||
end_time: debugReplayRange.before,
|
||||
});
|
||||
|
||||
if (response.status === 200) {
|
||||
toast.success(t("dialog.toast.success", { ns: "views/replay" }), {
|
||||
position: "top-center",
|
||||
});
|
||||
if (response.status === 202 || response.status === 200) {
|
||||
setDebugReplayMode("none");
|
||||
setDebugReplayRange(undefined);
|
||||
setDrawerMode("none");
|
||||
|
||||
@ -53,10 +53,6 @@ export default function EventMenu({
|
||||
const handleDebugReplay = useCallback(
|
||||
(event: Event) => {
|
||||
setIsStarting(true);
|
||||
const toastId = toast.loading(
|
||||
t("dialog.starting", { ns: "views/replay" }),
|
||||
{ position: "top-center" },
|
||||
);
|
||||
|
||||
axios
|
||||
.post("debug_replay/start", {
|
||||
@ -65,11 +61,7 @@ export default function EventMenu({
|
||||
end_time: event.end_time,
|
||||
})
|
||||
.then((response) => {
|
||||
if (response.status === 200) {
|
||||
toast.success(t("dialog.toast.success", { ns: "views/replay" }), {
|
||||
id: toastId,
|
||||
position: "top-center",
|
||||
});
|
||||
if (response.status === 202 || response.status === 200) {
|
||||
navigate("/replay");
|
||||
}
|
||||
})
|
||||
@ -83,7 +75,6 @@ export default function EventMenu({
|
||||
toast.error(
|
||||
t("dialog.toast.alreadyActive", { ns: "views/replay" }),
|
||||
{
|
||||
id: toastId,
|
||||
position: "top-center",
|
||||
closeButton: true,
|
||||
dismissible: false,
|
||||
@ -98,7 +89,6 @@ export default function EventMenu({
|
||||
);
|
||||
} else {
|
||||
toast.error(t("dialog.toast.error", { error: errorMessage }), {
|
||||
id: toastId,
|
||||
position: "top-center",
|
||||
});
|
||||
}
|
||||
|
||||
@ -42,7 +42,9 @@ import { CameraConfig, FrigateConfig } from "@/types/frigateConfig";
|
||||
import { getIconForLabel } from "@/utils/iconUtil";
|
||||
import { getTranslatedLabel } from "@/utils/i18n";
|
||||
import { Card } from "@/components/ui/card";
|
||||
import { Progress } from "@/components/ui/progress";
|
||||
import { ObjectType } from "@/types/ws";
|
||||
import { useJobStatus } from "@/api/ws";
|
||||
import WsMessageFeed from "@/components/ws/WsMessageFeed";
|
||||
import { ConfigSectionTemplate } from "@/components/config-form/sections/ConfigSectionTemplate";
|
||||
|
||||
@ -53,6 +55,7 @@ import { isDesktop, isMobile } from "react-device-detect";
|
||||
import Logo from "@/components/Logo";
|
||||
import { Separator } from "@/components/ui/separator";
|
||||
import { useDocDomain } from "@/hooks/use-doc-domain";
|
||||
import { useConfigSchema } from "@/hooks/use-config-schema";
|
||||
import DebugDrawingLayer from "@/components/overlay/DebugDrawingLayer";
|
||||
import { IoMdArrowRoundBack } from "react-icons/io";
|
||||
|
||||
@ -65,6 +68,15 @@ type DebugReplayStatus = {
|
||||
live_ready: boolean;
|
||||
};
|
||||
|
||||
type DebugReplayJobResults = {
|
||||
current_step: "preparing_clip" | "starting_camera" | null;
|
||||
progress_percent: number | null;
|
||||
source_camera: string | null;
|
||||
replay_camera_name: string | null;
|
||||
start_ts: number | null;
|
||||
end_ts: number | null;
|
||||
};
|
||||
|
||||
type DebugOptions = {
|
||||
bbox: boolean;
|
||||
timestamp: boolean;
|
||||
@ -105,8 +117,6 @@ const DEBUG_OPTION_I18N_KEY: Record<keyof DebugOptions, string> = {
|
||||
paths: "paths",
|
||||
};
|
||||
|
||||
const REPLAY_INIT_SKELETON_TIMEOUT_MS = 8000;
|
||||
|
||||
export default function Replay() {
|
||||
const { t } = useTranslation(["views/replay", "views/settings", "common"]);
|
||||
const navigate = useNavigate();
|
||||
@ -119,6 +129,9 @@ export default function Replay() {
|
||||
} = useSWR<DebugReplayStatus>("debug_replay/status", {
|
||||
refreshInterval: 1000,
|
||||
});
|
||||
const { payload: replayJob } =
|
||||
useJobStatus<DebugReplayJobResults>("debug_replay");
|
||||
const configSchema = useConfigSchema();
|
||||
const [isInitializing, setIsInitializing] = useState(true);
|
||||
|
||||
// Refresh status immediately on mount to avoid showing "no session" briefly
|
||||
@ -130,12 +143,6 @@ export default function Replay() {
|
||||
initializeStatus();
|
||||
}, [refreshStatus]);
|
||||
|
||||
useEffect(() => {
|
||||
if (status?.live_ready) {
|
||||
setShowReplayInitSkeleton(false);
|
||||
}
|
||||
}, [status?.live_ready]);
|
||||
|
||||
const [options, setOptions] = useState<DebugOptions>(DEFAULT_OPTIONS);
|
||||
const [isStopping, setIsStopping] = useState(false);
|
||||
const [configDialogOpen, setConfigDialogOpen] = useState(false);
|
||||
@ -160,11 +167,7 @@ export default function Replay() {
|
||||
axios
|
||||
.post("debug_replay/stop")
|
||||
.then(() => {
|
||||
toast.success(t("dialog.toast.stopped"), {
|
||||
position: "top-center",
|
||||
});
|
||||
refreshStatus();
|
||||
navigate("/review");
|
||||
})
|
||||
.catch((error) => {
|
||||
const errorMessage =
|
||||
@ -178,7 +181,7 @@ export default function Replay() {
|
||||
.finally(() => {
|
||||
setIsStopping(false);
|
||||
});
|
||||
}, [navigate, refreshStatus, t]);
|
||||
}, [refreshStatus, t]);
|
||||
|
||||
// Camera activity for the replay camera
|
||||
const { data: config } = useSWR<FrigateConfig>("config", {
|
||||
@ -191,35 +194,10 @@ export default function Replay() {
|
||||
|
||||
const { objects } = useCameraActivity(replayCameraConfig);
|
||||
|
||||
const [showReplayInitSkeleton, setShowReplayInitSkeleton] = useState(false);
|
||||
|
||||
// debug draw
|
||||
const containerRef = useRef<HTMLDivElement>(null);
|
||||
const [debugDraw, setDebugDraw] = useState(false);
|
||||
|
||||
useEffect(() => {
|
||||
if (!status?.active || !status.replay_camera) {
|
||||
setShowReplayInitSkeleton(false);
|
||||
return;
|
||||
}
|
||||
|
||||
setShowReplayInitSkeleton(true);
|
||||
|
||||
const timeout = window.setTimeout(() => {
|
||||
setShowReplayInitSkeleton(false);
|
||||
}, REPLAY_INIT_SKELETON_TIMEOUT_MS);
|
||||
|
||||
return () => {
|
||||
window.clearTimeout(timeout);
|
||||
};
|
||||
}, [status?.active, status?.replay_camera]);
|
||||
|
||||
useEffect(() => {
|
||||
if (status?.live_ready) {
|
||||
setShowReplayInitSkeleton(false);
|
||||
}
|
||||
}, [status?.live_ready]);
|
||||
|
||||
// Format time range for display
|
||||
const timeRangeDisplay = useMemo(() => {
|
||||
if (!status?.start_time || !status?.end_time) return "";
|
||||
@ -237,8 +215,39 @@ export default function Replay() {
|
||||
);
|
||||
}
|
||||
|
||||
// No active session
|
||||
if (!status?.active) {
|
||||
// Startup error (job failed). Only show when status.active is also true so
|
||||
// we don't surface stale failed jobs after a session ended cleanly.
|
||||
if (replayJob?.status === "failed" && status?.active) {
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
<Heading as="h2" className="text-center">
|
||||
{t("page.startError.title")}
|
||||
</Heading>
|
||||
{replayJob.error_message && (
|
||||
<p className="max-w-xl text-center text-sm text-muted-foreground">
|
||||
{replayJob.error_message}
|
||||
</p>
|
||||
)}
|
||||
<Button
|
||||
variant="default"
|
||||
onClick={() => {
|
||||
axios
|
||||
.post("debug_replay/stop")
|
||||
.catch(() => {})
|
||||
.finally(() => navigate("/review"));
|
||||
}}
|
||||
>
|
||||
{t("page.startError.back")}
|
||||
</Button>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// No active session. Also covers the brief window between the runner
|
||||
// pushing job.status = "cancelled" via WS and the next SWR refresh
|
||||
// flipping status.active to false — without this, render falls through
|
||||
// to the full replay UI and you see a flash of it before stop completes.
|
||||
if (!status?.active || replayJob?.status === "cancelled") {
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
<MdReplay className="size-12" />
|
||||
@ -255,6 +264,52 @@ export default function Replay() {
|
||||
);
|
||||
}
|
||||
|
||||
// Startup in progress (job is running). The session is active but the
|
||||
// replay camera isn't ready yet; show progress / phase from the job.
|
||||
const startupStep =
|
||||
replayJob?.status === "running"
|
||||
? (replayJob.results?.current_step ?? null)
|
||||
: null;
|
||||
if (startupStep === "preparing_clip" || startupStep === "starting_camera") {
|
||||
const phaseTitle =
|
||||
startupStep === "preparing_clip"
|
||||
? t("page.preparingClip")
|
||||
: t("page.startingCamera");
|
||||
const progressPercent = replayJob?.results?.progress_percent ?? null;
|
||||
const showProgressBar =
|
||||
startupStep === "preparing_clip" && progressPercent != null;
|
||||
return (
|
||||
<div className="flex size-full flex-col items-center justify-center gap-4 p-8">
|
||||
{showProgressBar ? (
|
||||
<div className="flex w-64 flex-col items-center gap-2">
|
||||
<Progress value={progressPercent ?? 0} />
|
||||
<div className="text-xs text-muted-foreground">
|
||||
{Math.round(progressPercent ?? 0)}%
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
<ActivityIndicator className="size-8" />
|
||||
)}
|
||||
<Heading as="h3" className="text-center">
|
||||
{phaseTitle}
|
||||
</Heading>
|
||||
{startupStep === "preparing_clip" && (
|
||||
<p className="max-w-md text-center text-sm text-muted-foreground">
|
||||
{t("page.preparingClipDesc")}
|
||||
</p>
|
||||
)}
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
disabled={isStopping}
|
||||
onClick={handleStop}
|
||||
>
|
||||
{t("button.cancel", { ns: "common" })}
|
||||
</Button>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="flex size-full flex-col overflow-hidden">
|
||||
<Toaster position="top-center" closeButton={true} />
|
||||
@ -345,27 +400,30 @@ export default function Replay() {
|
||||
) : (
|
||||
status.replay_camera && (
|
||||
<div className="relative size-full min-h-10" ref={containerRef}>
|
||||
<AutoUpdatingCameraImage
|
||||
className="size-full"
|
||||
cameraClasses="relative w-full h-full flex flex-col justify-start"
|
||||
searchParams={searchParams}
|
||||
camera={status.replay_camera}
|
||||
showFps={false}
|
||||
/>
|
||||
{debugDraw && (
|
||||
<DebugDrawingLayer
|
||||
containerRef={containerRef}
|
||||
cameraWidth={
|
||||
config?.cameras?.[status.source_camera ?? ""]?.detect
|
||||
.width ?? 1280
|
||||
}
|
||||
cameraHeight={
|
||||
config?.cameras?.[status.source_camera ?? ""]?.detect
|
||||
.height ?? 720
|
||||
}
|
||||
/>
|
||||
)}
|
||||
{showReplayInitSkeleton && (
|
||||
{status.live_ready ? (
|
||||
<>
|
||||
<AutoUpdatingCameraImage
|
||||
className="size-full"
|
||||
cameraClasses="relative w-full h-full flex flex-col justify-start"
|
||||
searchParams={searchParams}
|
||||
camera={status.replay_camera}
|
||||
showFps={false}
|
||||
/>
|
||||
{debugDraw && (
|
||||
<DebugDrawingLayer
|
||||
containerRef={containerRef}
|
||||
cameraWidth={
|
||||
config?.cameras?.[status.source_camera ?? ""]?.detect
|
||||
.width ?? 1280
|
||||
}
|
||||
cameraHeight={
|
||||
config?.cameras?.[status.source_camera ?? ""]?.detect
|
||||
.height ?? 720
|
||||
}
|
||||
/>
|
||||
)}
|
||||
</>
|
||||
) : (
|
||||
<div className="pointer-events-none absolute inset-0 z-10 size-full rounded-lg bg-background">
|
||||
<Skeleton className="size-full rounded-lg" />
|
||||
<div className="absolute left-1/2 top-1/2 flex -translate-x-1/2 -translate-y-1/2 flex-col items-center justify-center gap-2">
|
||||
@ -595,32 +653,38 @@ export default function Replay() {
|
||||
{t("page.configurationDesc")}
|
||||
</DialogDescription>
|
||||
</DialogHeader>
|
||||
<div className="space-y-6">
|
||||
<ConfigSectionTemplate
|
||||
sectionKey="motion"
|
||||
level="replay"
|
||||
cameraName={status.replay_camera ?? undefined}
|
||||
skipSave
|
||||
noStickyButtons
|
||||
requiresRestart={false}
|
||||
collapsible
|
||||
defaultCollapsed={false}
|
||||
showTitle
|
||||
showOverrideIndicator={false}
|
||||
/>
|
||||
<ConfigSectionTemplate
|
||||
sectionKey="objects"
|
||||
level="replay"
|
||||
cameraName={status.replay_camera ?? undefined}
|
||||
skipSave
|
||||
noStickyButtons
|
||||
requiresRestart={false}
|
||||
collapsible
|
||||
defaultCollapsed={false}
|
||||
showTitle
|
||||
showOverrideIndicator={false}
|
||||
/>
|
||||
</div>
|
||||
{configSchema == null ? (
|
||||
<div className="flex h-40 items-center justify-center">
|
||||
<ActivityIndicator />
|
||||
</div>
|
||||
) : (
|
||||
<div className="space-y-6">
|
||||
<ConfigSectionTemplate
|
||||
sectionKey="motion"
|
||||
level="replay"
|
||||
cameraName={status.replay_camera ?? undefined}
|
||||
skipSave
|
||||
noStickyButtons
|
||||
requiresRestart={false}
|
||||
collapsible
|
||||
defaultCollapsed={false}
|
||||
showTitle
|
||||
showOverrideIndicator={false}
|
||||
/>
|
||||
<ConfigSectionTemplate
|
||||
sectionKey="objects"
|
||||
level="replay"
|
||||
cameraName={status.replay_camera ?? undefined}
|
||||
skipSave
|
||||
noStickyButtons
|
||||
requiresRestart={false}
|
||||
collapsible
|
||||
defaultCollapsed={false}
|
||||
showTitle
|
||||
showOverrideIndicator={false}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
</div>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user