mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-04-05 22:57:40 +03:00
646 lines
24 KiB
Python
646 lines
24 KiB
Python
"""Manages ffmpeg processes for camera frame capture."""
|
|
|
|
import logging
|
|
import queue
|
|
import subprocess as sp
|
|
import threading
|
|
import time
|
|
from collections import deque
|
|
from datetime import datetime, timedelta, timezone
|
|
from multiprocessing import Queue, Value
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
from typing import Any
|
|
|
|
from frigate.camera import CameraMetrics
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.comms.recordings_updater import (
|
|
RecordingsDataSubscriber,
|
|
RecordingsDataTypeEnum,
|
|
)
|
|
from frigate.config import CameraConfig, LoggerConfig
|
|
from frigate.config.camera.updater import (
|
|
CameraConfigUpdateEnum,
|
|
CameraConfigUpdateSubscriber,
|
|
)
|
|
from frigate.const import PROCESS_PRIORITY_HIGH
|
|
from frigate.log import LogPipe
|
|
from frigate.util.builtin import EventsPerSecond
|
|
from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg
|
|
from frigate.util.image import (
|
|
FrameManager,
|
|
SharedMemoryFrameManager,
|
|
)
|
|
from frigate.util.process import FrigateProcess
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def capture_frames(
|
|
ffmpeg_process: sp.Popen[Any],
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
frame_index: int,
|
|
frame_shape: tuple[int, int],
|
|
frame_manager: FrameManager,
|
|
frame_queue,
|
|
fps: Value,
|
|
skipped_fps: Value,
|
|
current_frame: Value,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
frame_size = frame_shape[0] * frame_shape[1]
|
|
frame_rate = EventsPerSecond()
|
|
frame_rate.start()
|
|
skipped_eps = EventsPerSecond()
|
|
skipped_eps.start()
|
|
|
|
config_subscriber = CameraConfigUpdateSubscriber(
|
|
None, {config.name: config}, [CameraConfigUpdateEnum.enabled]
|
|
)
|
|
|
|
def get_enabled_state():
|
|
"""Fetch the latest enabled state from ZMQ."""
|
|
config_subscriber.check_for_updates()
|
|
return config.enabled
|
|
|
|
try:
|
|
while not stop_event.is_set():
|
|
if not get_enabled_state():
|
|
logger.debug(f"Stopping capture thread for disabled {config.name}")
|
|
break
|
|
|
|
fps.value = frame_rate.eps()
|
|
skipped_fps.value = skipped_eps.eps()
|
|
current_frame.value = datetime.now().timestamp()
|
|
frame_name = f"{config.name}_frame{frame_index}"
|
|
frame_buffer = frame_manager.write(frame_name)
|
|
try:
|
|
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
|
|
except Exception:
|
|
# shutdown has been initiated
|
|
if stop_event.is_set():
|
|
break
|
|
|
|
logger.error(
|
|
f"{config.name}: Unable to read frames from ffmpeg process."
|
|
)
|
|
|
|
if ffmpeg_process.poll() is not None:
|
|
logger.error(
|
|
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
|
|
)
|
|
break
|
|
|
|
continue
|
|
|
|
frame_rate.update()
|
|
|
|
# don't lock the queue to check, just try since it should rarely be full
|
|
try:
|
|
# add to the queue
|
|
frame_queue.put((frame_name, current_frame.value), False)
|
|
frame_manager.close(frame_name)
|
|
except queue.Full:
|
|
# if the queue is full, skip this frame
|
|
skipped_eps.update()
|
|
|
|
frame_index = 0 if frame_index == shm_frame_count - 1 else frame_index + 1
|
|
finally:
|
|
config_subscriber.stop()
|
|
|
|
|
|
class CameraWatchdog(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
frame_queue: Queue,
|
|
camera_fps,
|
|
skipped_fps,
|
|
ffmpeg_pid,
|
|
stalls,
|
|
reconnects,
|
|
detection_frame,
|
|
stop_event,
|
|
):
|
|
threading.Thread.__init__(self)
|
|
self.logger = logging.getLogger(f"watchdog.{config.name}")
|
|
self.config = config
|
|
self.shm_frame_count = shm_frame_count
|
|
self.capture_thread = None
|
|
self.ffmpeg_detect_process = None
|
|
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.detect")
|
|
self.ffmpeg_other_processes: list[dict[str, Any]] = []
|
|
self.camera_fps = camera_fps
|
|
self.skipped_fps = skipped_fps
|
|
self.ffmpeg_pid = ffmpeg_pid
|
|
self.frame_queue = frame_queue
|
|
self.frame_shape = self.config.frame_shape_yuv
|
|
self.frame_size = self.frame_shape[0] * self.frame_shape[1]
|
|
self.fps_overflow_count = 0
|
|
self.frame_index = 0
|
|
self.stop_event = stop_event
|
|
self.sleeptime = self.config.ffmpeg.retry_interval
|
|
self.reconnect_timestamps = deque()
|
|
self.stalls = stalls
|
|
self.reconnects = reconnects
|
|
self.detection_frame = detection_frame
|
|
|
|
self.config_subscriber = CameraConfigUpdateSubscriber(
|
|
None,
|
|
{config.name: config},
|
|
[
|
|
CameraConfigUpdateEnum.enabled,
|
|
CameraConfigUpdateEnum.ffmpeg,
|
|
CameraConfigUpdateEnum.record,
|
|
],
|
|
)
|
|
self.requestor = InterProcessRequestor()
|
|
self.was_enabled = self.config.enabled
|
|
|
|
self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all)
|
|
self.latest_valid_segment_time: float = 0
|
|
self.latest_invalid_segment_time: float = 0
|
|
self.latest_cache_segment_time: float = 0
|
|
self.record_enable_time: datetime | None = None
|
|
|
|
# Stall tracking (based on last processed frame)
|
|
self._stall_timestamps: deque[float] = deque()
|
|
self._stall_active: bool = False
|
|
|
|
# Status caching to reduce message volume
|
|
self._last_detect_status: str | None = None
|
|
self._last_record_status: str | None = None
|
|
self._last_status_update_time: float = 0.0
|
|
|
|
def _send_detect_status(self, status: str, now: float) -> None:
|
|
"""Send detect status only if changed or retry_interval has elapsed."""
|
|
if (
|
|
status != self._last_detect_status
|
|
or (now - self._last_status_update_time) >= self.sleeptime
|
|
):
|
|
self.requestor.send_data(f"{self.config.name}/status/detect", status)
|
|
self._last_detect_status = status
|
|
self._last_status_update_time = now
|
|
|
|
def _send_record_status(self, status: str, now: float) -> None:
|
|
"""Send record status only if changed or retry_interval has elapsed."""
|
|
if (
|
|
status != self._last_record_status
|
|
or (now - self._last_status_update_time) >= self.sleeptime
|
|
):
|
|
self.requestor.send_data(f"{self.config.name}/status/record", status)
|
|
self._last_record_status = status
|
|
self._last_status_update_time = now
|
|
|
|
def _check_config_updates(self) -> dict[str, list[str]]:
|
|
"""Check for config updates and return the update dict."""
|
|
return self.config_subscriber.check_for_updates()
|
|
|
|
def _update_enabled_state(self) -> bool:
|
|
"""Fetch the latest config and update enabled state."""
|
|
self._check_config_updates()
|
|
return self.config.enabled
|
|
|
|
def reset_capture_thread(
|
|
self, terminate: bool = True, drain_output: bool = True
|
|
) -> None:
|
|
if terminate:
|
|
self.ffmpeg_detect_process.terminate()
|
|
try:
|
|
self.logger.info("Waiting for ffmpeg to exit gracefully...")
|
|
|
|
if drain_output:
|
|
self.ffmpeg_detect_process.communicate(timeout=30)
|
|
else:
|
|
self.ffmpeg_detect_process.wait(timeout=30)
|
|
except sp.TimeoutExpired:
|
|
self.logger.info("FFmpeg did not exit. Force killing...")
|
|
self.ffmpeg_detect_process.kill()
|
|
|
|
if drain_output:
|
|
self.ffmpeg_detect_process.communicate()
|
|
else:
|
|
self.ffmpeg_detect_process.wait()
|
|
|
|
# Update reconnects
|
|
now = datetime.now().timestamp()
|
|
self.reconnect_timestamps.append(now)
|
|
while self.reconnect_timestamps and self.reconnect_timestamps[0] < now - 3600:
|
|
self.reconnect_timestamps.popleft()
|
|
if self.reconnects:
|
|
self.reconnects.value = len(self.reconnect_timestamps)
|
|
|
|
# Wait for old capture thread to fully exit before starting a new one
|
|
if self.capture_thread is not None and self.capture_thread.is_alive():
|
|
self.logger.info("Waiting for capture thread to exit...")
|
|
self.capture_thread.join(timeout=5)
|
|
|
|
if self.capture_thread.is_alive():
|
|
self.logger.warning(
|
|
f"Capture thread for {self.config.name} did not exit in time"
|
|
)
|
|
|
|
self.logger.error(
|
|
"The following ffmpeg logs include the last 100 lines prior to exit."
|
|
)
|
|
self.logpipe.dump()
|
|
self.logger.info("Restarting ffmpeg...")
|
|
self.start_ffmpeg_detect()
|
|
|
|
def run(self) -> None:
|
|
if self._update_enabled_state():
|
|
self.start_all_ffmpeg()
|
|
# If recording is enabled at startup, set the grace period timer
|
|
if self.config.record.enabled:
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
|
|
time.sleep(self.sleeptime)
|
|
last_restart_time = datetime.now().timestamp()
|
|
|
|
# 1 second watchdog loop
|
|
while not self.stop_event.wait(1):
|
|
updates = self._check_config_updates()
|
|
|
|
# Handle ffmpeg config changes by restarting all ffmpeg processes
|
|
if "ffmpeg" in updates and self.config.enabled:
|
|
self.logger.debug(
|
|
"FFmpeg config updated for %s, restarting ffmpeg processes",
|
|
self.config.name,
|
|
)
|
|
self.stop_all_ffmpeg()
|
|
self.start_all_ffmpeg()
|
|
self.latest_valid_segment_time = 0
|
|
self.latest_invalid_segment_time = 0
|
|
self.latest_cache_segment_time = 0
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
last_restart_time = datetime.now().timestamp()
|
|
continue
|
|
|
|
enabled = self.config.enabled
|
|
if enabled != self.was_enabled:
|
|
if enabled:
|
|
self.logger.debug(f"Enabling camera {self.config.name}")
|
|
self.start_all_ffmpeg()
|
|
|
|
# reset all timestamps and record the enable time for grace period
|
|
self.latest_valid_segment_time = 0
|
|
self.latest_invalid_segment_time = 0
|
|
self.latest_cache_segment_time = 0
|
|
self.record_enable_time = datetime.now().astimezone(timezone.utc)
|
|
else:
|
|
self.logger.debug(f"Disabling camera {self.config.name}")
|
|
self.stop_all_ffmpeg()
|
|
self.record_enable_time = None
|
|
|
|
# update camera status
|
|
now = datetime.now().timestamp()
|
|
self._send_detect_status("disabled", now)
|
|
self._send_record_status("disabled", now)
|
|
self.was_enabled = enabled
|
|
continue
|
|
|
|
if not enabled:
|
|
continue
|
|
|
|
while True:
|
|
update = self.segment_subscriber.check_for_update(timeout=0)
|
|
|
|
if update == (None, None):
|
|
break
|
|
|
|
raw_topic, payload = update
|
|
if raw_topic and payload:
|
|
topic = str(raw_topic)
|
|
camera, segment_time, _ = payload
|
|
|
|
if camera != self.config.name:
|
|
continue
|
|
|
|
if topic.endswith(RecordingsDataTypeEnum.valid.value):
|
|
self.logger.debug(
|
|
f"Latest valid recording segment time on {camera}: {segment_time}"
|
|
)
|
|
self.latest_valid_segment_time = segment_time
|
|
elif topic.endswith(RecordingsDataTypeEnum.invalid.value):
|
|
self.logger.warning(
|
|
f"Invalid recording segment detected for {camera} at {segment_time}"
|
|
)
|
|
self.latest_invalid_segment_time = segment_time
|
|
elif topic.endswith(RecordingsDataTypeEnum.latest.value):
|
|
if segment_time is not None:
|
|
self.latest_cache_segment_time = segment_time
|
|
else:
|
|
self.latest_cache_segment_time = 0
|
|
|
|
now = datetime.now().timestamp()
|
|
|
|
# Check if enough time has passed to allow ffmpeg restart (backoff pacing)
|
|
time_since_last_restart = now - last_restart_time
|
|
can_restart = time_since_last_restart >= self.sleeptime
|
|
|
|
if not self.capture_thread.is_alive():
|
|
self._send_detect_status("offline", now)
|
|
self.camera_fps.value = 0
|
|
self.logger.error(
|
|
f"Ffmpeg process crashed unexpectedly for {self.config.name}."
|
|
)
|
|
if can_restart:
|
|
self.reset_capture_thread(terminate=False)
|
|
last_restart_time = now
|
|
elif self.camera_fps.value >= (self.config.detect.fps + 10):
|
|
self.fps_overflow_count += 1
|
|
|
|
if self.fps_overflow_count == 3:
|
|
self._send_detect_status("offline", now)
|
|
self.fps_overflow_count = 0
|
|
self.camera_fps.value = 0
|
|
self.logger.info(
|
|
f"{self.config.name} exceeded fps limit. Exiting ffmpeg..."
|
|
)
|
|
if can_restart:
|
|
self.reset_capture_thread(drain_output=False)
|
|
last_restart_time = now
|
|
elif now - self.capture_thread.current_frame.value > 20:
|
|
self._send_detect_status("offline", now)
|
|
self.camera_fps.value = 0
|
|
self.logger.info(
|
|
f"No frames received from {self.config.name} in 20 seconds. Exiting ffmpeg..."
|
|
)
|
|
if can_restart:
|
|
self.reset_capture_thread()
|
|
last_restart_time = now
|
|
else:
|
|
# process is running normally
|
|
self._send_detect_status("online", now)
|
|
self.fps_overflow_count = 0
|
|
|
|
for p in self.ffmpeg_other_processes:
|
|
poll = p["process"].poll()
|
|
|
|
if self.config.record.enabled and "record" in p["roles"]:
|
|
now_utc = datetime.now().astimezone(timezone.utc)
|
|
|
|
# Check if we're within the grace period after enabling recording
|
|
# Grace period: 90 seconds allows time for ffmpeg to start and create first segment
|
|
in_grace_period = self.record_enable_time is not None and (
|
|
now_utc - self.record_enable_time
|
|
) < timedelta(seconds=90)
|
|
|
|
latest_cache_dt = (
|
|
datetime.fromtimestamp(
|
|
self.latest_cache_segment_time, tz=timezone.utc
|
|
)
|
|
if self.latest_cache_segment_time > 0
|
|
else now_utc - timedelta(seconds=1)
|
|
)
|
|
|
|
latest_valid_dt = (
|
|
datetime.fromtimestamp(
|
|
self.latest_valid_segment_time, tz=timezone.utc
|
|
)
|
|
if self.latest_valid_segment_time > 0
|
|
else now_utc - timedelta(seconds=1)
|
|
)
|
|
|
|
latest_invalid_dt = (
|
|
datetime.fromtimestamp(
|
|
self.latest_invalid_segment_time, tz=timezone.utc
|
|
)
|
|
if self.latest_invalid_segment_time > 0
|
|
else now_utc - timedelta(seconds=1)
|
|
)
|
|
|
|
# ensure segments are still being created and that they have valid video data
|
|
# Skip checks during grace period to allow segments to start being created
|
|
cache_stale = not in_grace_period and now_utc > (
|
|
latest_cache_dt + timedelta(seconds=120)
|
|
)
|
|
valid_stale = not in_grace_period and now_utc > (
|
|
latest_valid_dt + timedelta(seconds=120)
|
|
)
|
|
invalid_stale_condition = (
|
|
self.latest_invalid_segment_time > 0
|
|
and not in_grace_period
|
|
and now_utc > (latest_invalid_dt + timedelta(seconds=120))
|
|
and self.latest_valid_segment_time
|
|
<= self.latest_invalid_segment_time
|
|
)
|
|
invalid_stale = invalid_stale_condition
|
|
|
|
if cache_stale or valid_stale or invalid_stale:
|
|
if cache_stale:
|
|
reason = "No new recording segments were created"
|
|
elif valid_stale:
|
|
reason = "No new valid recording segments were created"
|
|
else: # invalid_stale
|
|
reason = (
|
|
"No valid segments created since last invalid segment"
|
|
)
|
|
|
|
self.logger.error(
|
|
f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..."
|
|
)
|
|
p["process"] = start_or_restart_ffmpeg(
|
|
p["cmd"],
|
|
self.logger,
|
|
p["logpipe"],
|
|
ffmpeg_process=p["process"],
|
|
)
|
|
|
|
for role in p["roles"]:
|
|
self.requestor.send_data(
|
|
f"{self.config.name}/status/{role.value}", "offline"
|
|
)
|
|
|
|
continue
|
|
else:
|
|
self._send_record_status("online", now)
|
|
p["latest_segment_time"] = self.latest_cache_segment_time
|
|
|
|
if poll is None:
|
|
continue
|
|
|
|
for role in p["roles"]:
|
|
self.requestor.send_data(
|
|
f"{self.config.name}/status/{role.value}", "offline"
|
|
)
|
|
|
|
p["logpipe"].dump()
|
|
p["process"] = start_or_restart_ffmpeg(
|
|
p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
|
|
)
|
|
|
|
# Update stall metrics based on last processed frame timestamp
|
|
now = datetime.now().timestamp()
|
|
processed_ts = (
|
|
float(self.detection_frame.value) if self.detection_frame else 0.0
|
|
)
|
|
if processed_ts > 0:
|
|
delta = now - processed_ts
|
|
observed_fps = (
|
|
self.camera_fps.value
|
|
if self.camera_fps.value > 0
|
|
else self.config.detect.fps
|
|
)
|
|
interval = 1.0 / max(observed_fps, 0.1)
|
|
stall_threshold = max(2.0 * interval, 2.0)
|
|
|
|
if delta > stall_threshold:
|
|
if not self._stall_active:
|
|
self._stall_timestamps.append(now)
|
|
self._stall_active = True
|
|
else:
|
|
self._stall_active = False
|
|
|
|
while self._stall_timestamps and self._stall_timestamps[0] < now - 3600:
|
|
self._stall_timestamps.popleft()
|
|
|
|
if self.stalls:
|
|
self.stalls.value = len(self._stall_timestamps)
|
|
|
|
self.stop_all_ffmpeg()
|
|
self.logpipe.close()
|
|
self.config_subscriber.stop()
|
|
self.segment_subscriber.stop()
|
|
|
|
def start_ffmpeg_detect(self):
|
|
ffmpeg_cmd = [
|
|
c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"]
|
|
][0]
|
|
self.ffmpeg_detect_process = start_or_restart_ffmpeg(
|
|
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
|
|
)
|
|
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
|
|
self.capture_thread = CameraCaptureRunner(
|
|
self.config,
|
|
self.shm_frame_count,
|
|
self.frame_index,
|
|
self.ffmpeg_detect_process,
|
|
self.frame_shape,
|
|
self.frame_queue,
|
|
self.camera_fps,
|
|
self.skipped_fps,
|
|
self.stop_event,
|
|
)
|
|
self.capture_thread.start()
|
|
|
|
def start_all_ffmpeg(self):
|
|
"""Start all ffmpeg processes (detection and others)."""
|
|
logger.debug(f"Starting all ffmpeg processes for {self.config.name}")
|
|
self.start_ffmpeg_detect()
|
|
for c in self.config.ffmpeg_cmds:
|
|
if "detect" in c["roles"]:
|
|
continue
|
|
logpipe = LogPipe(
|
|
f"ffmpeg.{self.config.name}.{'_'.join(sorted(c['roles']))}"
|
|
)
|
|
self.ffmpeg_other_processes.append(
|
|
{
|
|
"cmd": c["cmd"],
|
|
"roles": c["roles"],
|
|
"logpipe": logpipe,
|
|
"process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe),
|
|
}
|
|
)
|
|
|
|
def stop_all_ffmpeg(self):
|
|
"""Stop all ffmpeg processes (detection and others)."""
|
|
logger.debug(f"Stopping all ffmpeg processes for {self.config.name}")
|
|
if self.capture_thread is not None and self.capture_thread.is_alive():
|
|
self.capture_thread.join(timeout=5)
|
|
if self.capture_thread.is_alive():
|
|
self.logger.warning(
|
|
f"Capture thread for {self.config.name} did not stop gracefully."
|
|
)
|
|
if self.ffmpeg_detect_process is not None:
|
|
stop_ffmpeg(self.ffmpeg_detect_process, self.logger)
|
|
self.ffmpeg_detect_process = None
|
|
for p in self.ffmpeg_other_processes[:]:
|
|
if p["process"] is not None:
|
|
stop_ffmpeg(p["process"], self.logger)
|
|
p["logpipe"].close()
|
|
self.ffmpeg_other_processes.clear()
|
|
|
|
|
|
class CameraCaptureRunner(threading.Thread):
|
|
def __init__(
|
|
self,
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
frame_index: int,
|
|
ffmpeg_process,
|
|
frame_shape: tuple[int, int],
|
|
frame_queue: Queue,
|
|
fps: Value,
|
|
skipped_fps: Value,
|
|
stop_event: MpEvent,
|
|
):
|
|
threading.Thread.__init__(self)
|
|
self.name = f"capture:{config.name}"
|
|
self.config = config
|
|
self.shm_frame_count = shm_frame_count
|
|
self.frame_index = frame_index
|
|
self.frame_shape = frame_shape
|
|
self.frame_queue = frame_queue
|
|
self.fps = fps
|
|
self.stop_event = stop_event
|
|
self.skipped_fps = skipped_fps
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
self.ffmpeg_process = ffmpeg_process
|
|
self.current_frame = Value("d", 0.0)
|
|
self.last_frame = 0
|
|
|
|
def run(self):
|
|
capture_frames(
|
|
self.ffmpeg_process,
|
|
self.config,
|
|
self.shm_frame_count,
|
|
self.frame_index,
|
|
self.frame_shape,
|
|
self.frame_manager,
|
|
self.frame_queue,
|
|
self.fps,
|
|
self.skipped_fps,
|
|
self.current_frame,
|
|
self.stop_event,
|
|
)
|
|
|
|
|
|
class CameraCapture(FrigateProcess):
|
|
def __init__(
|
|
self,
|
|
config: CameraConfig,
|
|
shm_frame_count: int,
|
|
camera_metrics: CameraMetrics,
|
|
stop_event: MpEvent,
|
|
log_config: LoggerConfig | None = None,
|
|
) -> None:
|
|
super().__init__(
|
|
stop_event,
|
|
PROCESS_PRIORITY_HIGH,
|
|
name=f"frigate.capture:{config.name}",
|
|
daemon=True,
|
|
)
|
|
self.config = config
|
|
self.shm_frame_count = shm_frame_count
|
|
self.camera_metrics = camera_metrics
|
|
self.log_config = log_config
|
|
|
|
def run(self) -> None:
|
|
self.pre_run_setup(self.log_config)
|
|
camera_watchdog = CameraWatchdog(
|
|
self.config,
|
|
self.shm_frame_count,
|
|
self.camera_metrics.frame_queue,
|
|
self.camera_metrics.camera_fps,
|
|
self.camera_metrics.skipped_fps,
|
|
self.camera_metrics.ffmpeg_pid,
|
|
self.camera_metrics.stalls_last_hour,
|
|
self.camera_metrics.reconnects_last_hour,
|
|
self.camera_metrics.detection_frame,
|
|
self.stop_event,
|
|
)
|
|
camera_watchdog.start()
|
|
camera_watchdog.join()
|