diff --git a/frigate/camera/camera.py b/frigate/camera/camera.py index 10a31b0a0..151f0775f 100644 --- a/frigate/camera/camera.py +++ b/frigate/camera/camera.py @@ -6,7 +6,7 @@ from typing import Optional from frigate import util from frigate.config import FrigateConfig from frigate.util.object import get_camera_regions_grid -from frigate.video import capture_camera, track_camera +from frigate.video import CameraWatchdog, track_camera from .metrics import CameraMetrics, PTZMetrics @@ -71,15 +71,11 @@ class Camera: logger.info(f"Capture process not started for disabled camera {self.name}") return - capture_process = util.Process( - target=capture_camera, - name=f"camera_capture:{self.name}", - args=( - self.name, - self.config.cameras[self.name], - shm_frame_count, - self.camera_metrics, - ), + capture_process = CameraWatchdog( + self.name, + self.config.cameras[self.name], + shm_frame_count, + self.camera_metrics, ) capture_process.daemon = True self.capture_process = capture_process diff --git a/frigate/video.py b/frigate/video.py index 87c322a73..18160597f 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -6,11 +6,11 @@ import queue import signal import subprocess as sp import threading -import time import cv2 from setproctitle import setproctitle +from frigate import util from frigate.camera.metrics import CameraMetrics, PTZMetrics from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor @@ -90,108 +90,35 @@ def start_or_restart_ffmpeg( return process -def capture_frames( - ffmpeg_process, - config: CameraConfig, - shm_frame_count: int, - frame_shape, - frame_manager: FrameManager, - frame_queue, - fps: mp.Value, - skipped_fps: mp.Value, - current_frame: mp.Value, - stop_event: mp.Event, -): - frame_size = frame_shape[0] * frame_shape[1] - frame_rate = EventsPerSecond() - frame_rate.start() - skipped_eps = EventsPerSecond() - skipped_eps.start() - - shm_frames: list[str] = [] - - while True: - fps.value = frame_rate.eps() - skipped_fps.value = skipped_eps.eps() - current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{config.name}{current_frame.value}" - frame_buffer = frame_manager.create(frame_name, frame_size) - try: - frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) - - # update frame cache and cleanup existing frames - shm_frames.append(frame_name) - - if len(shm_frames) > shm_frame_count: - expired_frame_name = shm_frames.pop(0) - frame_manager.delete(expired_frame_name) - except Exception: - # always delete the frame - frame_manager.delete(frame_name) - - # shutdown has been initiated - if stop_event.is_set(): - break - - logger.error(f"{config.name}: Unable to read frames from ffmpeg process.") - - if ffmpeg_process.poll() is not None: - logger.error( - f"{config.name}: ffmpeg process is not running. exiting capture thread..." - ) - break - - continue - - frame_rate.update() - - # don't lock the queue to check, just try since it should rarely be full - try: - # add to the queue - frame_queue.put(current_frame.value, False) - frame_manager.close(frame_name) - except queue.Full: - # if the queue is full, skip this frame - skipped_eps.update() - - # clear out frames - for frame in shm_frames: - frame_manager.delete(frame) - - -class CameraWatchdog(threading.Thread): +class CameraWatchdog(util.Process): def __init__( self, - camera_name, + camera_name: str, config: CameraConfig, shm_frame_count: int, - frame_queue, - camera_fps, - skipped_fps, - ffmpeg_pid, - stop_event, + camera_metrics: CameraMetrics, ): - threading.Thread.__init__(self) - self.logger = logging.getLogger(f"watchdog.{camera_name}") + super().__init__(name=f"frigate.watchdog:{camera_name}") + self.camera_name = camera_name self.config = config self.shm_frame_count = shm_frame_count - self.capture_thread = None + self.camera_fps = camera_metrics.camera_fps + self.skipped_fps = camera_metrics.skipped_fps + self.ffmpeg_pid = camera_metrics.ffmpeg_pid + self.frame_queue = camera_metrics.frame_queue + + def run(self): self.ffmpeg_detect_process = None self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") self.ffmpeg_other_processes: list[dict[str, any]] = [] - self.camera_fps = camera_fps - self.skipped_fps = skipped_fps - self.ffmpeg_pid = ffmpeg_pid - self.frame_queue = frame_queue self.frame_shape = self.config.frame_shape_yuv self.frame_size = self.frame_shape[0] * self.frame_shape[1] - self.fps_overflow_count = 0 - self.stop_event = stop_event - self.sleeptime = self.config.ffmpeg.retry_interval - def run(self): - self.start_ffmpeg_detect() + fps_overflow_count = 0 + sleeptime = self.config.ffmpeg.retry_interval + + capture_thread = self.start_ffmpeg_detect() for c in self.config.ffmpeg_cmds: if "detect" in c["roles"]: @@ -208,11 +135,10 @@ class CameraWatchdog(threading.Thread): } ) - time.sleep(self.sleeptime) - while not self.stop_event.wait(self.sleeptime): + while not self.stop_event.wait(sleeptime): now = datetime.datetime.now().timestamp() - if not self.capture_thread.is_alive(): + if not capture_thread.is_alive(): self.camera_fps.value = 0 self.logger.error( f"Ffmpeg process crashed unexpectedly for {self.camera_name}." @@ -221,8 +147,8 @@ class CameraWatchdog(threading.Thread): "The following ffmpeg logs include the last 100 lines prior to exit." ) self.logpipe.dump() - self.start_ffmpeg_detect() - elif now - self.capture_thread.current_frame.value > 20: + capture_thread = self.start_ffmpeg_detect() + elif now - capture_thread.current_frame.value > 20: self.camera_fps.value = 0 self.logger.info( f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." @@ -236,10 +162,10 @@ class CameraWatchdog(threading.Thread): self.ffmpeg_detect_process.kill() self.ffmpeg_detect_process.communicate() elif self.camera_fps.value >= (self.config.detect.fps + 10): - self.fps_overflow_count += 1 + fps_overflow_count += 1 - if self.fps_overflow_count == 3: - self.fps_overflow_count = 0 + if fps_overflow_count == 3: + fps_overflow_count = 0 self.camera_fps.value = 0 self.logger.info( f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." @@ -254,7 +180,7 @@ class CameraWatchdog(threading.Thread): self.ffmpeg_detect_process.communicate() else: # process is running normally - self.fps_overflow_count = 0 + fps_overflow_count = 0 for p in self.ffmpeg_other_processes: poll = p["process"].poll() @@ -305,7 +231,7 @@ class CameraWatchdog(threading.Thread): ffmpeg_cmd, self.logger, self.logpipe, self.frame_size ) self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid - self.capture_thread = CameraCapture( + capture_thread = CameraCapture( self.config, self.shm_frame_count, self.ffmpeg_detect_process, @@ -315,7 +241,9 @@ class CameraWatchdog(threading.Thread): self.skipped_fps, self.stop_event, ) - self.capture_thread.start() + capture_thread.start() + + return capture_thread def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: """Checks if ffmpeg is still writing recording segments to cache.""" @@ -355,8 +283,8 @@ class CameraCapture(threading.Thread): skipped_fps, stop_event, ): - threading.Thread.__init__(self) - self.name = f"capture:{config.name}" + super().__init__(name=f"capture:{config.name}") + self.config = config self.shm_frame_count = shm_frame_count self.frame_shape = frame_shape @@ -367,49 +295,65 @@ class CameraCapture(threading.Thread): self.frame_manager = SharedMemoryFrameManager() self.ffmpeg_process = ffmpeg_process self.current_frame = mp.Value("d", 0.0) - self.last_frame = 0 def run(self): - capture_frames( - self.ffmpeg_process, - self.config, - self.shm_frame_count, - self.frame_shape, - self.frame_manager, - self.frame_queue, - self.fps, - self.skipped_fps, - self.current_frame, - self.stop_event, - ) + frame_size = self.frame_shape[0] * self.frame_shape[1] + frame_rate = EventsPerSecond() + frame_rate.start() + skipped_eps = EventsPerSecond() + skipped_eps.start() + shm_frames: list[str] = [] -def capture_camera( - name, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics -): - stop_event = mp.Event() + while True: + self.fps.value = frame_rate.eps() + self.skipped_fps.value = skipped_eps.eps() + self.current_frame.value = datetime.datetime.now().timestamp() + frame_name = f"{self.config.name}{self.current_frame.value}" + frame_buffer = self.frame_manager.create(frame_name, frame_size) + try: + frame_buffer[:] = self.ffmpeg_process.stdout.read(frame_size) - def receiveSignal(signalNumber, frame): - stop_event.set() + # update frame cache and cleanup existing frames + shm_frames.append(frame_name) - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) + if len(shm_frames) > self.shm_frame_count: + expired_frame_name = shm_frames.pop(0) + self.frame_manager.delete(expired_frame_name) + except Exception: + # always delete the frame + self.frame_manager.delete(frame_name) - threading.current_thread().name = f"capture:{name}" - setproctitle(f"frigate.capture:{name}") + # shutdown has been initiated + if self.stop_event.is_set(): + break - camera_watchdog = CameraWatchdog( - name, - config, - shm_frame_count, - camera_metrics.frame_queue, - camera_metrics.camera_fps, - camera_metrics.skipped_fps, - camera_metrics.ffmpeg_pid, - stop_event, - ) - camera_watchdog.start() - camera_watchdog.join() + logger.error( + f"{self.config.name}: Unable to read frames from ffmpeg process." + ) + + if self.ffmpeg_process.poll() is not None: + logger.error( + f"{self.config.name}: ffmpeg process is not running. exiting capture thread..." + ) + break + + continue + + frame_rate.update() + + # don't lock the queue to check, just try since it should rarely be full + try: + # add to the queue + self.frame_queue.put(self.current_frame.value, False) + self.frame_manager.close(frame_name) + except queue.Full: + # if the queue is full, skip this frame + skipped_eps.update() + + # clear out frames + for frame in shm_frames: + self.frame_manager.delete(frame) def track_camera(