mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-06-21 03:41:55 +03:00
The record watchdog treats a stale maintainer heartbeat as a dead recorder. But the heartbeat is published by the recording maintainer, so whenever the maintainer lags (e.g. "Unable to keep up with recording segments in cache", #9661) every camera looks stale at once and all record processes restart together - while recording was actually healthy. The restart churn then produces more, shorter segments, making the maintainer fall further behind. Before restarting on staleness, check the camera's newest cache segment on disk: if a segment is fresher than the staleness threshold, the recorder is demonstrably writing - log a warning, adopt the disk mtime as the heartbeat, and skip the restart. The invalid-segment path is untouched. Validated on a 26-camera production deployment (0.17.1 backport of this change): synchronized mass restarts went from 52/hour to zero, with heartbeat-stale events still occurring ~2/hour but now correctly identified as maintainer lag instead of recording failure. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
711 lines
28 KiB
Python
711 lines
28 KiB
Python
"""Manages ffmpeg processes for camera frame capture."""
|
|
|
|
import logging
|
|
import queue
|
|
import subprocess as sp
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from datetime import datetime, timedelta, timezone
|
|
from multiprocessing import Queue, Value
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from frigate.camera import CameraMetrics
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.comms.recordings_updater import (
|
|
RecordingsDataSubscriber,
|
|
RecordingsDataTypeEnum,
|
|
)
|
|
from frigate.config import CameraConfig, LoggerConfig
|
|
from frigate.config.camera.updater import (
|
|
CameraConfigUpdateEnum,
|
|
CameraConfigUpdateSubscriber,
|
|
)
|
|
from frigate.const import CACHE_DIR, PROCESS_PRIORITY_HIGH
|
|
from frigate.log import LogPipe
|
|
from frigate.util.builtin import EventsPerSecond, get_record_segment_time
|
|
from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg
|
|
from frigate.util.image import (
|
|
FrameManager,
|
|
SharedMemoryFrameManager,
|
|
)
|
|
from frigate.util.process import FrigateProcess
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def capture_frames(
|
|
ffmpeg_process: sp.Popen[Any],
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
frame_index: int,
|
|
frame_shape: tuple[int, int],
|
|
frame_manager: FrameManager,
|
|
frame_queue,
|
|
fps: Value,
|
|
skipped_fps: Value,
|
|
current_frame: Value,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
frame_size = frame_shape[0] * frame_shape[1]
|
|
frame_rate = EventsPerSecond()
|
|
frame_rate.start()
|
|
skipped_eps = EventsPerSecond()
|
|
skipped_eps.start()
|
|
|
|
config_subscriber = CameraConfigUpdateSubscriber(
|
|
None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
|
|
)
|
|
|
|
def get_enabled_state():
|
|
"""Fetch the latest enabled state from ZMQ."""
|
|
config_subscriber.check_for_updates()
|
|
return config.enabled
|
|
|
|
try:
|
|
while not stop_event.is_set():
|
|
if not get_enabled_state():
|
|
logger.debug(f"Stopping capture thread for disabled {config.name}")
|
|
break
|
|
|
|
fps.value = frame_rate.eps()
|
|
skipped_fps.value = skipped_eps.eps()
|
|
current_frame.value = datetime.now().timestamp()
|
|
frame_name = f"{config.name}_frame{frame_index}"
|
|
frame_buffer = frame_manager.write(frame_name)
|
|
try:
|
|
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
|
|
except Exception:
|
|
# shutdown has been initiated
|
|
if stop_event.is_set():
|
|
break
|
|
|
|
logger.error(
|
|
f"{config.name}: Unable to read frames from ffmpeg process."
|
|
)
|
|
|
|
if ffmpeg_process.poll() is not None:
|
|
logger.error(
|
|
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
|
|
)
|
|
break
|
|
|
|
continue
|
|
|
|
frame_rate.update()
|
|
|
|
# don't lock the queue to check, just try since it should rarely be full
|
|
try:
|
|
# add to the queue
|
|
frame_queue.put((frame_name, current_frame.value), False)
|
|
frame_manager.close(frame_name)
|
|
except queue.Full:
|
|
# if the queue is full, skip this frame
|
|
skipped_eps.update()
|
|
|
|
frame_index = 0 if frame_index == shm_frame_count - 1 else frame_index + 1
|
|
finally:
|
|
config_subscriber.stop()
|
|
|
|
|
|
class CameraWatchdog(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
frame_queue: Queue,
|
|
camera_fps,
|
|
skipped_fps,
|
|
ffmpeg_pid,
|
|
stalls,
|
|
reconnects,
|
|
detection_frame,
|
|
stop_event,
|
|
):
|
|
threading.Thread.__init__(self)
|
|
self.logger = logging.getLogger(f"watchdog.{config.name}")
|
|
self.config = config
|
|
self.shm_frame_count = shm_frame_count
|
|
self.capture_thread = None
|
|
self.ffmpeg_detect_process = None
|
|
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.detect")
|
|
self.ffmpeg_other_processes: list[dict[str, Any]] = []
|
|
self.camera_fps = camera_fps
|
|
self.skipped_fps = skipped_fps
|
|
self.ffmpeg_pid = ffmpeg_pid
|
|
self.frame_queue = frame_queue
|
|
self.frame_shape = self.config.frame_shape_yuv
|
|
self.frame_size = self.frame_shape[0] * self.frame_shape[1]
|
|
self.fps_overflow_count = 0
|
|
self.frame_index = 0
|
|
self.stop_event = stop_event
|
|
self.sleeptime = self.config.ffmpeg.retry_interval
|
|
self.reconnect_timestamps = deque()
|
|
self.stalls = stalls
|
|
self.reconnects = reconnects
|
|
self.detection_frame = detection_frame
|
|
|
|
self.config_subscriber = CameraConfigUpdateSubscriber(
|
|
None,
|
|
{config.name: config},
|
|
[
|
|
CameraConfigUpdateEnum.enabled,
|
|
CameraConfigUpdateEnum.ffmpeg,
|
|
CameraConfigUpdateEnum.record,
|
|
],
|
|
)
|
|
self.requestor = InterProcessRequestor()
|
|
self.was_enabled = self.config.enabled
|
|
self.was_record_enabled_in_config = self.config.record.enabled_in_config
|
|
|
|
self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all)
|
|
self.latest_valid_segment_time: float = 0
|
|
self.latest_invalid_segment_time: float = 0
|
|
self.latest_cache_segment_time: float = 0
|
|
self.record_enable_time: datetime | None = None
|
|
|
|
# `valid` segments are published with the segment's start time, so the
|
|
# gap between consecutive publishes can reach 2 * segment_time. Pad the
|
|
# staleness threshold so it's never tighter than that worst case.
|
|
segment_time = get_record_segment_time(self.config)
|
|
self.record_stale_threshold = max(120, 2 * segment_time + 30)
|
|
|
|
# Stall tracking (based on last processed frame)
|
|
self._stall_timestamps: deque[float] = deque()
|
|
self._stall_active: bool = False
|
|
|
|
# Status caching to reduce message volume
|
|
self._last_detect_status: str | None = None
|
|
self._last_record_status: str | None = None
|
|
self._last_status_update_time: float = 0.0
|
|
|
|
def _send_detect_status(self, status: str, now: float) -> None:
|
|
"""Send detect status only if changed or retry_interval has elapsed."""
|
|
if (
|
|
status != self._last_detect_status
|
|
or (now - self._last_status_update_time) >= self.sleeptime
|
|
):
|
|
self.requestor.send_data(f"{self.config.name}/status/detect", status)
|
|
self._last_detect_status = status
|
|
self._last_status_update_time = now
|
|
|
|
def _send_record_status(self, status: str, now: float) -> None:
|
|
"""Send record status only if changed or retry_interval has elapsed."""
|
|
if (
|
|
status != self._last_record_status
|
|
or (now - self._last_status_update_time) >= self.sleeptime
|
|
):
|
|
self.requestor.send_data(f"{self.config.name}/status/record", status)
|
|
self._last_record_status = status
|
|
self._last_status_update_time = now
|
|
|
|
def _check_config_updates(self) -> dict[str, list[str]]:
|
|
"""Check for config updates and return the update dict."""
|
|
return self.config_subscriber.check_for_updates()
|
|
|
|
def _update_enabled_state(self) -> bool:
|
|
"""Fetch the latest config and update enabled state."""
|
|
self._check_config_updates()
|
|
return self.config.enabled
|
|
|
|
def reset_capture_thread(
|
|
self, terminate: bool = True, drain_output: bool = True
|
|
) -> None:
|
|
if terminate:
|
|
self.ffmpeg_detect_process.terminate()
|
|
try:
|
|
self.logger.info("Waiting for ffmpeg to exit gracefully...")
|
|
|
|
if drain_output:
|
|
self.ffmpeg_detect_process.communicate(timeout=30)
|
|
else:
|
|
self.ffmpeg_detect_process.wait(timeout=30)
|
|
except sp.TimeoutExpired:
|
|
self.logger.info("FFmpeg did not exit. Force killing...")
|
|
self.ffmpeg_detect_process.kill()
|
|
|
|
if drain_output:
|
|
self.ffmpeg_detect_process.communicate()
|
|
else:
|
|
self.ffmpeg_detect_process.wait()
|
|
|
|
# Update reconnects
|
|
now = datetime.now().timestamp()
|
|
self.reconnect_timestamps.append(now)
|
|
while self.reconnect_timestamps and self.reconnect_timestamps[0] < now - 3600:
|
|
self.reconnect_timestamps.popleft()
|
|
if self.reconnects:
|
|
self.reconnects.value = len(self.reconnect_timestamps)
|
|
|
|
# Wait for old capture thread to fully exit before starting a new one
|
|
if self.capture_thread is not None and self.capture_thread.is_alive():
|
|
self.logger.info("Waiting for capture thread to exit...")
|
|
self.capture_thread.join(timeout=5)
|
|
|
|
if self.capture_thread.is_alive():
|
|
self.logger.warning(
|
|
f"Capture thread for {self.config.name} did not exit in time"
|
|
)
|
|
|
|
self.logger.error(
|
|
"The following ffmpeg logs include the last 100 lines prior to exit."
|
|
)
|
|
self.logpipe.dump()
|
|
self.logger.info("Restarting ffmpeg...")
|
|
self.start_ffmpeg_detect()
|
|
|
|
def run(self) -> None:
|
|
if self._update_enabled_state():
|
|
self.start_all_ffmpeg()
|
|
# If recording is enabled at startup, set the grace period timer
|
|
if self.config.record.enabled:
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
|
|
time.sleep(self.sleeptime)
|
|
last_restart_time = datetime.now().timestamp()
|
|
|
|
# 1 second watchdog loop
|
|
while not self.stop_event.wait(1):
|
|
updates = self._check_config_updates()
|
|
|
|
# Handle ffmpeg config changes by restarting all ffmpeg processes
|
|
if "ffmpeg" in updates and self.config.enabled:
|
|
self.logger.debug(
|
|
"FFmpeg config updated for %s, restarting ffmpeg processes",
|
|
self.config.name,
|
|
)
|
|
self.stop_all_ffmpeg()
|
|
self.start_all_ffmpeg()
|
|
self.latest_valid_segment_time = 0
|
|
self.latest_invalid_segment_time = 0
|
|
self.latest_cache_segment_time = 0
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
last_restart_time = datetime.now().timestamp()
|
|
continue
|
|
|
|
enabled = self.config.enabled
|
|
if enabled != self.was_enabled:
|
|
if enabled:
|
|
self.logger.debug(f"Enabling camera {self.config.name}")
|
|
self.start_all_ffmpeg()
|
|
|
|
# reset all timestamps and record the enable time for grace period
|
|
self.latest_valid_segment_time = 0
|
|
self.latest_invalid_segment_time = 0
|
|
self.latest_cache_segment_time = 0
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
else:
|
|
self.logger.debug(f"Disabling camera {self.config.name}")
|
|
self.stop_all_ffmpeg()
|
|
self.record_enable_time = None
|
|
|
|
# update camera status
|
|
now = datetime.now().timestamp()
|
|
self._send_detect_status("disabled", now)
|
|
self._send_record_status("disabled", now)
|
|
self.was_enabled = enabled
|
|
continue
|
|
|
|
record_enabled_in_config = self.config.record.enabled_in_config
|
|
if record_enabled_in_config != self.was_record_enabled_in_config:
|
|
if record_enabled_in_config and enabled:
|
|
self.logger.debug(
|
|
f"Record enabled in config for {self.config.name}, restarting ffmpeg"
|
|
)
|
|
self.stop_all_ffmpeg()
|
|
self.start_all_ffmpeg()
|
|
self.latest_valid_segment_time = 0
|
|
self.latest_invalid_segment_time = 0
|
|
self.latest_cache_segment_time = 0
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
last_restart_time = datetime.now().timestamp()
|
|
self.was_record_enabled_in_config = record_enabled_in_config
|
|
continue
|
|
|
|
if not enabled:
|
|
continue
|
|
|
|
while True:
|
|
update = self.segment_subscriber.check_for_update(timeout=0)
|
|
|
|
if update == (None, None):
|
|
break
|
|
|
|
raw_topic, payload = update
|
|
if raw_topic and payload:
|
|
topic = str(raw_topic)
|
|
camera, segment_time, _ = payload
|
|
|
|
if camera != self.config.name:
|
|
continue
|
|
|
|
if topic.endswith(RecordingsDataTypeEnum.invalid.value):
|
|
self.logger.warning(
|
|
f"Invalid recording segment detected for {camera} at {segment_time}"
|
|
)
|
|
self.latest_invalid_segment_time = segment_time
|
|
elif topic.endswith(RecordingsDataTypeEnum.valid.value):
|
|
self.logger.debug(
|
|
f"Latest valid recording segment time on {camera}: {segment_time}"
|
|
)
|
|
self.latest_valid_segment_time = segment_time
|
|
elif topic.endswith(RecordingsDataTypeEnum.latest.value):
|
|
if segment_time is not None:
|
|
self.latest_cache_segment_time = segment_time
|
|
else:
|
|
self.latest_cache_segment_time = 0
|
|
|
|
now = datetime.now().timestamp()
|
|
|
|
# Check if enough time has passed to allow ffmpeg restart (backoff pacing)
|
|
time_since_last_restart = now - last_restart_time
|
|
can_restart = time_since_last_restart >= self.sleeptime
|
|
|
|
if not self.capture_thread.is_alive():
|
|
self._send_detect_status("offline", now)
|
|
self.camera_fps.value = 0
|
|
self.logger.error(
|
|
f"Ffmpeg process crashed unexpectedly for {self.config.name}."
|
|
)
|
|
if can_restart:
|
|
self.reset_capture_thread(terminate=False)
|
|
last_restart_time = now
|
|
elif self.camera_fps.value >= (self.config.detect.fps + 10):
|
|
self.fps_overflow_count += 1
|
|
|
|
if self.fps_overflow_count == 3:
|
|
self._send_detect_status("offline", now)
|
|
self.fps_overflow_count = 0
|
|
self.camera_fps.value = 0
|
|
self.logger.info(
|
|
f"{self.config.name} exceeded fps limit. Exiting ffmpeg..."
|
|
)
|
|
if can_restart:
|
|
self.reset_capture_thread(drain_output=False)
|
|
last_restart_time = now
|
|
elif now - self.capture_thread.current_frame.value > 20:
|
|
self._send_detect_status("offline", now)
|
|
self.camera_fps.value = 0
|
|
self.logger.info(
|
|
f"No frames received from {self.config.name} in 20 seconds. Exiting ffmpeg..."
|
|
)
|
|
if can_restart:
|
|
self.reset_capture_thread()
|
|
last_restart_time = now
|
|
else:
|
|
# process is running normally
|
|
self._send_detect_status("online", now)
|
|
self.fps_overflow_count = 0
|
|
|
|
for p in self.ffmpeg_other_processes:
|
|
poll = p["process"].poll()
|
|
|
|
if self.config.record.enabled and "record" in p["roles"]:
|
|
now_utc = datetime.now().astimezone(timezone.utc)
|
|
|
|
# Check if we're within the grace period after enabling recording
|
|
# Grace period: 90 seconds allows time for ffmpeg to start and create first segment
|
|
in_grace_period = self.record_enable_time is not None and (
|
|
now_utc - self.record_enable_time
|
|
) < timedelta(seconds=90)
|
|
|
|
latest_cache_dt = (
|
|
datetime.fromtimestamp(
|
|
self.latest_cache_segment_time, tz=timezone.utc
|
|
)
|
|
if self.latest_cache_segment_time > 0
|
|
else now_utc - timedelta(seconds=1)
|
|
)
|
|
|
|
latest_valid_dt = (
|
|
datetime.fromtimestamp(
|
|
self.latest_valid_segment_time, tz=timezone.utc
|
|
)
|
|
if self.latest_valid_segment_time > 0
|
|
else now_utc - timedelta(seconds=1)
|
|
)
|
|
|
|
latest_invalid_dt = (
|
|
datetime.fromtimestamp(
|
|
self.latest_invalid_segment_time, tz=timezone.utc
|
|
)
|
|
if self.latest_invalid_segment_time > 0
|
|
else now_utc - timedelta(seconds=1)
|
|
)
|
|
|
|
# ensure segments are still being created and that they have valid video data
|
|
# Skip checks during grace period to allow segments to start being created
|
|
stale_window = timedelta(seconds=self.record_stale_threshold)
|
|
cache_stale = not in_grace_period and now_utc > (
|
|
latest_cache_dt + stale_window
|
|
)
|
|
valid_stale = not in_grace_period and now_utc > (
|
|
latest_valid_dt + stale_window
|
|
)
|
|
invalid_stale_condition = (
|
|
self.latest_invalid_segment_time > 0
|
|
and not in_grace_period
|
|
and now_utc > (latest_invalid_dt + stale_window)
|
|
and self.latest_valid_segment_time
|
|
<= self.latest_invalid_segment_time
|
|
)
|
|
invalid_stale = invalid_stale_condition
|
|
|
|
if cache_stale or valid_stale or invalid_stale:
|
|
# The staleness above is measured from the recording
|
|
# maintainer's IPC heartbeat, which lags whenever the
|
|
# maintainer falls behind (e.g. "Unable to keep up with
|
|
# recording segments in cache"). A late message is not
|
|
# a dead recorder: corroborate against the cache dir
|
|
# before restarting, otherwise every camera restarts
|
|
# together on maintainer lag and the resulting segment
|
|
# churn makes the overload worse.
|
|
if not invalid_stale:
|
|
newest_on_disk = max(
|
|
(
|
|
f.stat().st_mtime
|
|
for f in Path(CACHE_DIR).glob(
|
|
f"{self.config.name}@*"
|
|
)
|
|
),
|
|
default=0.0,
|
|
)
|
|
if (
|
|
newest_on_disk > 0
|
|
and now_utc.timestamp() - newest_on_disk
|
|
< self.record_stale_threshold
|
|
):
|
|
self.logger.warning(
|
|
f"Recording heartbeat for {self.config.name} is stale but a cache "
|
|
f"segment is only {now_utc.timestamp() - newest_on_disk:.0f}s old — "
|
|
"skipping the record process restart (maintainer heartbeat lag, "
|
|
"not a recording failure)."
|
|
)
|
|
self.latest_cache_segment_time = newest_on_disk
|
|
continue
|
|
|
|
if cache_stale:
|
|
reason = "No new recording segments were created"
|
|
elif valid_stale:
|
|
reason = "No new valid recording segments were created"
|
|
else: # invalid_stale
|
|
reason = (
|
|
"No valid segments created since last invalid segment"
|
|
)
|
|
|
|
self.logger.error(
|
|
f"{reason} for {self.config.name} in the last {self.record_stale_threshold}s. Restarting the ffmpeg record process..."
|
|
)
|
|
p["process"] = start_or_restart_ffmpeg(
|
|
p["cmd"],
|
|
self.logger,
|
|
p["logpipe"],
|
|
ffmpeg_process=p["process"],
|
|
)
|
|
|
|
for role in p["roles"]:
|
|
self.requestor.send_data(
|
|
f"{self.config.name}/status/{role.value}", "offline"
|
|
)
|
|
|
|
continue
|
|
else:
|
|
self._send_record_status("online", now)
|
|
p["latest_segment_time"] = self.latest_cache_segment_time
|
|
|
|
if poll is None:
|
|
continue
|
|
|
|
for role in p["roles"]:
|
|
self.requestor.send_data(
|
|
f"{self.config.name}/status/{role.value}", "offline"
|
|
)
|
|
|
|
p["logpipe"].dump()
|
|
p["process"] = start_or_restart_ffmpeg(
|
|
p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
|
|
)
|
|
|
|
# Prune expired reconnect timestamps
|
|
now = datetime.now().timestamp()
|
|
while (
|
|
self.reconnect_timestamps and self.reconnect_timestamps[0] < now - 3600
|
|
):
|
|
self.reconnect_timestamps.popleft()
|
|
if self.reconnects:
|
|
self.reconnects.value = len(self.reconnect_timestamps)
|
|
|
|
# Update stall metrics based on last processed frame timestamp
|
|
processed_ts = (
|
|
float(self.detection_frame.value) if self.detection_frame else 0.0
|
|
)
|
|
if processed_ts > 0:
|
|
delta = now - processed_ts
|
|
observed_fps = (
|
|
self.camera_fps.value
|
|
if self.camera_fps.value > 0
|
|
else self.config.detect.fps
|
|
)
|
|
interval = 1.0 / max(observed_fps, 0.1)
|
|
stall_threshold = max(2.0 * interval, 2.0)
|
|
|
|
if delta > stall_threshold:
|
|
if not self._stall_active:
|
|
self._stall_timestamps.append(now)
|
|
self._stall_active = True
|
|
else:
|
|
self._stall_active = False
|
|
|
|
while self._stall_timestamps and self._stall_timestamps[0] < now - 3600:
|
|
self._stall_timestamps.popleft()
|
|
|
|
if self.stalls:
|
|
self.stalls.value = len(self._stall_timestamps)
|
|
|
|
self.stop_all_ffmpeg()
|
|
self.logpipe.close()
|
|
self.config_subscriber.stop()
|
|
self.segment_subscriber.stop()
|
|
|
|
def start_ffmpeg_detect(self):
|
|
ffmpeg_cmd = [
|
|
c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"]
|
|
][0]
|
|
self.ffmpeg_detect_process = start_or_restart_ffmpeg(
|
|
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
|
|
)
|
|
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
|
|
self.capture_thread = CameraCaptureRunner(
|
|
self.config,
|
|
self.shm_frame_count,
|
|
self.frame_index,
|
|
self.ffmpeg_detect_process,
|
|
self.frame_shape,
|
|
self.frame_queue,
|
|
self.camera_fps,
|
|
self.skipped_fps,
|
|
self.stop_event,
|
|
)
|
|
self.capture_thread.start()
|
|
|
|
def start_all_ffmpeg(self):
|
|
"""Start all ffmpeg processes (detection and others)."""
|
|
logger.debug(f"Starting all ffmpeg processes for {self.config.name}")
|
|
self.start_ffmpeg_detect()
|
|
for c in self.config.ffmpeg_cmds:
|
|
if "detect" in c["roles"]:
|
|
continue
|
|
logpipe = LogPipe(
|
|
f"ffmpeg.{self.config.name}.{'_'.join(sorted(c['roles']))}"
|
|
)
|
|
self.ffmpeg_other_processes.append(
|
|
{
|
|
"cmd": c["cmd"],
|
|
"roles": c["roles"],
|
|
"logpipe": logpipe,
|
|
"process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe),
|
|
}
|
|
)
|
|
|
|
def stop_all_ffmpeg(self):
|
|
"""Stop all ffmpeg processes (detection and others)."""
|
|
logger.debug(f"Stopping all ffmpeg processes for {self.config.name}")
|
|
if self.capture_thread is not None and self.capture_thread.is_alive():
|
|
self.capture_thread.join(timeout=5)
|
|
if self.capture_thread.is_alive():
|
|
self.logger.warning(
|
|
f"Capture thread for {self.config.name} did not stop gracefully."
|
|
)
|
|
if self.ffmpeg_detect_process is not None:
|
|
stop_ffmpeg(self.ffmpeg_detect_process, self.logger)
|
|
self.ffmpeg_detect_process = None
|
|
for p in self.ffmpeg_other_processes[:]:
|
|
if p["process"] is not None:
|
|
stop_ffmpeg(p["process"], self.logger)
|
|
p["logpipe"].close()
|
|
self.ffmpeg_other_processes.clear()
|
|
|
|
|
|
class CameraCaptureRunner(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
frame_index: int,
|
|
ffmpeg_process,
|
|
frame_shape: tuple[int, int],
|
|
frame_queue: Queue,
|
|
fps: Value,
|
|
skipped_fps: Value,
|
|
stop_event: MpEvent,
|
|
):
|
|
threading.Thread.__init__(self)
|
|
self.name = f"capture:{config.name}"
|
|
self.config = config
|
|
self.shm_frame_count = shm_frame_count
|
|
self.frame_index = frame_index
|
|
self.frame_shape = frame_shape
|
|
self.frame_queue = frame_queue
|
|
self.fps = fps
|
|
self.stop_event = stop_event
|
|
self.skipped_fps = skipped_fps
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
self.ffmpeg_process = ffmpeg_process
|
|
self.current_frame = Value("d", 0.0)
|
|
self.last_frame = 0
|
|
|
|
def run(self):
|
|
capture_frames(
|
|
self.ffmpeg_process,
|
|
self.config,
|
|
self.shm_frame_count,
|
|
self.frame_index,
|
|
self.frame_shape,
|
|
self.frame_manager,
|
|
self.frame_queue,
|
|
self.fps,
|
|
self.skipped_fps,
|
|
self.current_frame,
|
|
self.stop_event,
|
|
)
|
|
|
|
|
|
class CameraCapture(FrigateProcess):
|
|
def __init__(
|
|
self,
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
camera_metrics: CameraMetrics,
|
|
stop_event: MpEvent,
|
|
log_config: LoggerConfig | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
stop_event,
|
|
PROCESS_PRIORITY_HIGH,
|
|
name=f"frigate.capture:{config.name}",
|
|
daemon=True,
|
|
)
|
|
self.config = config
|
|
self.shm_frame_count = shm_frame_count
|
|
self.camera_metrics = camera_metrics
|
|
self.log_config = log_config
|
|
|
|
def run(self) -> None:
|
|
self.pre_run_setup(self.log_config)
|
|
camera_watchdog = CameraWatchdog(
|
|
self.config,
|
|
self.shm_frame_count,
|
|
self.camera_metrics.frame_queue,
|
|
self.camera_metrics.camera_fps,
|
|
self.camera_metrics.skipped_fps,
|
|
self.camera_metrics.ffmpeg_pid,
|
|
self.camera_metrics.stalls_last_hour,
|
|
self.camera_metrics.reconnects_last_hour,
|
|
self.camera_metrics.detection_frame,
|
|
self.stop_event,
|
|
)
|
|
camera_watchdog.start()
|
|
camera_watchdog.join()
|