Remove redundant capture methods in video.py

This commit is contained in:
George Tsiamasiotis 2024-09-30 15:16:00 +03:00
parent 7bf21bd533
commit 0fcba9b676
2 changed files with 87 additions and 147 deletions

View File

@ -6,7 +6,7 @@ from typing import Optional
from frigate import util from frigate import util
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.util.object import get_camera_regions_grid from frigate.util.object import get_camera_regions_grid
from frigate.video import capture_camera, track_camera from frigate.video import CameraWatchdog, track_camera
from .metrics import CameraMetrics, PTZMetrics from .metrics import CameraMetrics, PTZMetrics
@ -71,15 +71,11 @@ class Camera:
logger.info(f"Capture process not started for disabled camera {self.name}") logger.info(f"Capture process not started for disabled camera {self.name}")
return return
capture_process = util.Process( capture_process = CameraWatchdog(
target=capture_camera,
name=f"camera_capture:{self.name}",
args=(
self.name, self.name,
self.config.cameras[self.name], self.config.cameras[self.name],
shm_frame_count, shm_frame_count,
self.camera_metrics, self.camera_metrics,
),
) )
capture_process.daemon = True capture_process.daemon = True
self.capture_process = capture_process self.capture_process = capture_process

View File

@ -6,11 +6,11 @@ import queue
import signal import signal
import subprocess as sp import subprocess as sp
import threading import threading
import time
import cv2 import cv2
from setproctitle import setproctitle from setproctitle import setproctitle
from frigate import util
from frigate.camera.metrics import CameraMetrics, PTZMetrics from frigate.camera.metrics import CameraMetrics, PTZMetrics
from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
@ -90,108 +90,35 @@ def start_or_restart_ffmpeg(
return process return process
def capture_frames( class CameraWatchdog(util.Process):
ffmpeg_process,
config: CameraConfig,
shm_frame_count: int,
frame_shape,
frame_manager: FrameManager,
frame_queue,
fps: mp.Value,
skipped_fps: mp.Value,
current_frame: mp.Value,
stop_event: mp.Event,
):
frame_size = frame_shape[0] * frame_shape[1]
frame_rate = EventsPerSecond()
frame_rate.start()
skipped_eps = EventsPerSecond()
skipped_eps.start()
shm_frames: list[str] = []
while True:
fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{config.name}{current_frame.value}"
frame_buffer = frame_manager.create(frame_name, frame_size)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
# update frame cache and cleanup existing frames
shm_frames.append(frame_name)
if len(shm_frames) > shm_frame_count:
expired_frame_name = shm_frames.pop(0)
frame_manager.delete(expired_frame_name)
except Exception:
# always delete the frame
frame_manager.delete(frame_name)
# shutdown has been initiated
if stop_event.is_set():
break
logger.error(f"{config.name}: Unable to read frames from ffmpeg process.")
if ffmpeg_process.poll() is not None:
logger.error(
f"{config.name}: ffmpeg process is not running. exiting capture thread..."
)
break
continue
frame_rate.update()
# don't lock the queue to check, just try since it should rarely be full
try:
# add to the queue
frame_queue.put(current_frame.value, False)
frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()
# clear out frames
for frame in shm_frames:
frame_manager.delete(frame)
class CameraWatchdog(threading.Thread):
def __init__( def __init__(
self, self,
camera_name, camera_name: str,
config: CameraConfig, config: CameraConfig,
shm_frame_count: int, shm_frame_count: int,
frame_queue, camera_metrics: CameraMetrics,
camera_fps,
skipped_fps,
ffmpeg_pid,
stop_event,
): ):
threading.Thread.__init__(self) super().__init__(name=f"frigate.watchdog:{camera_name}")
self.logger = logging.getLogger(f"watchdog.{camera_name}")
self.camera_name = camera_name self.camera_name = camera_name
self.config = config self.config = config
self.shm_frame_count = shm_frame_count self.shm_frame_count = shm_frame_count
self.capture_thread = None self.camera_fps = camera_metrics.camera_fps
self.skipped_fps = camera_metrics.skipped_fps
self.ffmpeg_pid = camera_metrics.ffmpeg_pid
self.frame_queue = camera_metrics.frame_queue
def run(self):
self.ffmpeg_detect_process = None self.ffmpeg_detect_process = None
self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect")
self.ffmpeg_other_processes: list[dict[str, any]] = [] self.ffmpeg_other_processes: list[dict[str, any]] = []
self.camera_fps = camera_fps
self.skipped_fps = skipped_fps
self.ffmpeg_pid = ffmpeg_pid
self.frame_queue = frame_queue
self.frame_shape = self.config.frame_shape_yuv self.frame_shape = self.config.frame_shape_yuv
self.frame_size = self.frame_shape[0] * self.frame_shape[1] self.frame_size = self.frame_shape[0] * self.frame_shape[1]
self.fps_overflow_count = 0
self.stop_event = stop_event
self.sleeptime = self.config.ffmpeg.retry_interval
def run(self): fps_overflow_count = 0
self.start_ffmpeg_detect() sleeptime = self.config.ffmpeg.retry_interval
capture_thread = self.start_ffmpeg_detect()
for c in self.config.ffmpeg_cmds: for c in self.config.ffmpeg_cmds:
if "detect" in c["roles"]: if "detect" in c["roles"]:
@ -208,11 +135,10 @@ class CameraWatchdog(threading.Thread):
} }
) )
time.sleep(self.sleeptime) while not self.stop_event.wait(sleeptime):
while not self.stop_event.wait(self.sleeptime):
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
if not self.capture_thread.is_alive(): if not capture_thread.is_alive():
self.camera_fps.value = 0 self.camera_fps.value = 0
self.logger.error( self.logger.error(
f"Ffmpeg process crashed unexpectedly for {self.camera_name}." f"Ffmpeg process crashed unexpectedly for {self.camera_name}."
@ -221,8 +147,8 @@ class CameraWatchdog(threading.Thread):
"The following ffmpeg logs include the last 100 lines prior to exit." "The following ffmpeg logs include the last 100 lines prior to exit."
) )
self.logpipe.dump() self.logpipe.dump()
self.start_ffmpeg_detect() capture_thread = self.start_ffmpeg_detect()
elif now - self.capture_thread.current_frame.value > 20: elif now - capture_thread.current_frame.value > 20:
self.camera_fps.value = 0 self.camera_fps.value = 0
self.logger.info( self.logger.info(
f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..."
@ -236,10 +162,10 @@ class CameraWatchdog(threading.Thread):
self.ffmpeg_detect_process.kill() self.ffmpeg_detect_process.kill()
self.ffmpeg_detect_process.communicate() self.ffmpeg_detect_process.communicate()
elif self.camera_fps.value >= (self.config.detect.fps + 10): elif self.camera_fps.value >= (self.config.detect.fps + 10):
self.fps_overflow_count += 1 fps_overflow_count += 1
if self.fps_overflow_count == 3: if fps_overflow_count == 3:
self.fps_overflow_count = 0 fps_overflow_count = 0
self.camera_fps.value = 0 self.camera_fps.value = 0
self.logger.info( self.logger.info(
f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..."
@ -254,7 +180,7 @@ class CameraWatchdog(threading.Thread):
self.ffmpeg_detect_process.communicate() self.ffmpeg_detect_process.communicate()
else: else:
# process is running normally # process is running normally
self.fps_overflow_count = 0 fps_overflow_count = 0
for p in self.ffmpeg_other_processes: for p in self.ffmpeg_other_processes:
poll = p["process"].poll() poll = p["process"].poll()
@ -305,7 +231,7 @@ class CameraWatchdog(threading.Thread):
ffmpeg_cmd, self.logger, self.logpipe, self.frame_size ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
) )
self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
self.capture_thread = CameraCapture( capture_thread = CameraCapture(
self.config, self.config,
self.shm_frame_count, self.shm_frame_count,
self.ffmpeg_detect_process, self.ffmpeg_detect_process,
@ -315,7 +241,9 @@ class CameraWatchdog(threading.Thread):
self.skipped_fps, self.skipped_fps,
self.stop_event, self.stop_event,
) )
self.capture_thread.start() capture_thread.start()
return capture_thread
def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int:
"""Checks if ffmpeg is still writing recording segments to cache.""" """Checks if ffmpeg is still writing recording segments to cache."""
@ -355,8 +283,8 @@ class CameraCapture(threading.Thread):
skipped_fps, skipped_fps,
stop_event, stop_event,
): ):
threading.Thread.__init__(self) super().__init__(name=f"capture:{config.name}")
self.name = f"capture:{config.name}"
self.config = config self.config = config
self.shm_frame_count = shm_frame_count self.shm_frame_count = shm_frame_count
self.frame_shape = frame_shape self.frame_shape = frame_shape
@ -367,49 +295,65 @@ class CameraCapture(threading.Thread):
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
self.ffmpeg_process = ffmpeg_process self.ffmpeg_process = ffmpeg_process
self.current_frame = mp.Value("d", 0.0) self.current_frame = mp.Value("d", 0.0)
self.last_frame = 0
def run(self): def run(self):
capture_frames( frame_size = self.frame_shape[0] * self.frame_shape[1]
self.ffmpeg_process, frame_rate = EventsPerSecond()
self.config, frame_rate.start()
self.shm_frame_count, skipped_eps = EventsPerSecond()
self.frame_shape, skipped_eps.start()
self.frame_manager,
self.frame_queue, shm_frames: list[str] = []
self.fps,
self.skipped_fps, while True:
self.current_frame, self.fps.value = frame_rate.eps()
self.stop_event, self.skipped_fps.value = skipped_eps.eps()
self.current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{self.config.name}{self.current_frame.value}"
frame_buffer = self.frame_manager.create(frame_name, frame_size)
try:
frame_buffer[:] = self.ffmpeg_process.stdout.read(frame_size)
# update frame cache and cleanup existing frames
shm_frames.append(frame_name)
if len(shm_frames) > self.shm_frame_count:
expired_frame_name = shm_frames.pop(0)
self.frame_manager.delete(expired_frame_name)
except Exception:
# always delete the frame
self.frame_manager.delete(frame_name)
# shutdown has been initiated
if self.stop_event.is_set():
break
logger.error(
f"{self.config.name}: Unable to read frames from ffmpeg process."
) )
if self.ffmpeg_process.poll() is not None:
def capture_camera( logger.error(
name, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics f"{self.config.name}: ffmpeg process is not running. exiting capture thread..."
):
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = f"capture:{name}"
setproctitle(f"frigate.capture:{name}")
camera_watchdog = CameraWatchdog(
name,
config,
shm_frame_count,
camera_metrics.frame_queue,
camera_metrics.camera_fps,
camera_metrics.skipped_fps,
camera_metrics.ffmpeg_pid,
stop_event,
) )
camera_watchdog.start() break
camera_watchdog.join()
continue
frame_rate.update()
# don't lock the queue to check, just try since it should rarely be full
try:
# add to the queue
self.frame_queue.put(self.current_frame.value, False)
self.frame_manager.close(frame_name)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()
# clear out frames
for frame in shm_frames:
self.frame_manager.delete(frame)
def track_camera( def track_camera(