diff --git a/frigate/camera/camera.py b/frigate/camera/camera.py index cf909679a..5691412d6 100644 --- a/frigate/camera/camera.py +++ b/frigate/camera/camera.py @@ -6,9 +6,10 @@ from typing import Optional from frigate import util from frigate.config import FrigateConfig from frigate.util.object import get_camera_regions_grid -from frigate.video import CameraTracker, CameraWatchdog +from .capture import CameraWatchdog from .metrics import CameraMetrics, PTZMetrics +from .tracker import CameraTracker logger = logging.getLogger(__name__) diff --git a/frigate/camera/capture.py b/frigate/camera/capture.py new file mode 100644 index 000000000..58d86d3ac --- /dev/null +++ b/frigate/camera/capture.py @@ -0,0 +1,288 @@ +import datetime +import logging +import multiprocessing as mp +import os +import queue +import subprocess as sp +import threading + +from frigate import util +from frigate.camera.metrics import CameraMetrics +from frigate.config import CameraConfig +from frigate.const import ( + CACHE_DIR, + CACHE_SEGMENT_FORMAT, +) +from frigate.log import LogPipe +from frigate.util.builtin import EventsPerSecond +from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg +from frigate.util.image import ( + SharedMemoryFrameManager, +) + + +class CameraWatchdog(util.Process): + def __init__( + self, + camera_name: str, + config: CameraConfig, + shm_frame_count: int, + camera_metrics: CameraMetrics, + ): + super().__init__(name=f"frigate.watchdog:{camera_name}") + + self.camera_name = camera_name + self.config = config + self.shm_frame_count = shm_frame_count + self.camera_fps = camera_metrics.camera_fps + self.skipped_fps = camera_metrics.skipped_fps + self.ffmpeg_pid = camera_metrics.ffmpeg_pid + self.frame_queue = camera_metrics.frame_queue + + def run(self): + self.ffmpeg_detect_process = None + self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") + self.ffmpeg_other_processes: list[dict[str, any]] = [] + self.frame_shape = self.config.frame_shape_yuv + self.frame_size = self.frame_shape[0] * self.frame_shape[1] + + fps_overflow_count = 0 + sleeptime = self.config.ffmpeg.retry_interval + + capture_thread = self.start_ffmpeg_detect() + + for c in self.config.ffmpeg_cmds: + if "detect" in c["roles"]: + continue + logpipe = LogPipe( + f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}" + ) + self.ffmpeg_other_processes.append( + { + "cmd": c["cmd"], + "roles": c["roles"], + "logpipe": logpipe, + "process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe), + } + ) + + while not self.stop_event.wait(sleeptime): + now = datetime.datetime.now().timestamp() + + if not capture_thread.is_alive(): + self.camera_fps.value = 0 + self.logger.error( + f"Ffmpeg process crashed unexpectedly for {self.camera_name}." + ) + self.logger.error( + "The following ffmpeg logs include the last 100 lines prior to exit." + ) + self.logpipe.dump() + capture_thread = self.start_ffmpeg_detect() + elif now - capture_thread.current_frame.value > 20: + self.camera_fps.value = 0 + self.logger.info( + f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." + ) + self.ffmpeg_detect_process.terminate() + try: + self.logger.info("Waiting for ffmpeg to exit gracefully...") + self.ffmpeg_detect_process.communicate(timeout=30) + except sp.TimeoutExpired: + self.logger.info("FFmpeg did not exit. Force killing...") + self.ffmpeg_detect_process.kill() + self.ffmpeg_detect_process.communicate() + elif self.camera_fps.value >= (self.config.detect.fps + 10): + fps_overflow_count += 1 + + if fps_overflow_count == 3: + fps_overflow_count = 0 + self.camera_fps.value = 0 + self.logger.info( + f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." + ) + self.ffmpeg_detect_process.terminate() + try: + self.logger.info("Waiting for ffmpeg to exit gracefully...") + self.ffmpeg_detect_process.communicate(timeout=30) + except sp.TimeoutExpired: + self.logger.info("FFmpeg did not exit. Force killing...") + self.ffmpeg_detect_process.kill() + self.ffmpeg_detect_process.communicate() + else: + # process is running normally + fps_overflow_count = 0 + + for p in self.ffmpeg_other_processes: + poll = p["process"].poll() + + if self.config.record.enabled and "record" in p["roles"]: + latest_segment_time = self.get_latest_segment_datetime( + p.get( + "latest_segment_time", + datetime.datetime.now().astimezone(datetime.timezone.utc), + ) + ) + + if datetime.datetime.now().astimezone(datetime.timezone.utc) > ( + latest_segment_time + datetime.timedelta(seconds=120) + ): + self.logger.error( + f"No new recording segments were created for {self.camera_name} in the last 120s. restarting the ffmpeg record process..." + ) + p["process"] = start_or_restart_ffmpeg( + p["cmd"], + self.logger, + p["logpipe"], + ffmpeg_process=p["process"], + ) + continue + else: + p["latest_segment_time"] = latest_segment_time + + if poll is None: + continue + + p["logpipe"].dump() + p["process"] = start_or_restart_ffmpeg( + p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] + ) + + stop_ffmpeg(self.ffmpeg_detect_process, self.logger) + for p in self.ffmpeg_other_processes: + stop_ffmpeg(p["process"], self.logger) + p["logpipe"].close() + self.logpipe.close() + + def start_ffmpeg_detect(self): + ffmpeg_cmd = [ + c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"] + ][0] + self.ffmpeg_detect_process = start_or_restart_ffmpeg( + ffmpeg_cmd, self.logger, self.logpipe, self.frame_size + ) + self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid + capture_thread = CameraCapture( + self.config, + self.shm_frame_count, + self.ffmpeg_detect_process, + self.frame_shape, + self.frame_queue, + self.camera_fps, + self.skipped_fps, + self.stop_event, + ) + capture_thread.start() + + return capture_thread + + def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: + """Checks if ffmpeg is still writing recording segments to cache.""" + cache_files = sorted( + [ + d + for d in os.listdir(CACHE_DIR) + if os.path.isfile(os.path.join(CACHE_DIR, d)) + and d.endswith(".mp4") + and not d.startswith("preview_") + ] + ) + newest_segment_time = latest_segment + + for file in cache_files: + if self.camera_name in file: + basename = os.path.splitext(file)[0] + _, date = basename.rsplit("@", maxsplit=1) + segment_time = datetime.datetime.strptime( + date, CACHE_SEGMENT_FORMAT + ).astimezone(datetime.timezone.utc) + if segment_time > newest_segment_time: + newest_segment_time = segment_time + + return newest_segment_time + + +class CameraCapture(threading.Thread): + def __init__( + self, + config: CameraConfig, + shm_frame_count: int, + ffmpeg_process, + frame_shape, + frame_queue, + fps, + skipped_fps, + stop_event, + ): + super().__init__(name=f"capture:{config.name}") + + self.logger = logging.getLogger(self.name) + self.config = config + self.shm_frame_count = shm_frame_count + self.frame_shape = frame_shape + self.frame_queue = frame_queue + self.fps = fps + self.stop_event = stop_event + self.skipped_fps = skipped_fps + self.frame_manager = SharedMemoryFrameManager() + self.ffmpeg_process = ffmpeg_process + self.current_frame = mp.Value("d", 0.0) + + def run(self): + frame_size = self.frame_shape[0] * self.frame_shape[1] + frame_rate = EventsPerSecond() + frame_rate.start() + skipped_eps = EventsPerSecond() + skipped_eps.start() + + shm_frames: list[str] = [] + + while True: + self.fps.value = frame_rate.eps() + self.skipped_fps.value = skipped_eps.eps() + self.current_frame.value = datetime.datetime.now().timestamp() + frame_name = f"{self.config.name}{self.current_frame.value}" + frame_buffer = self.frame_manager.create(frame_name, frame_size) + try: + frame_buffer[:] = self.ffmpeg_process.stdout.read(frame_size) + + # update frame cache and cleanup existing frames + shm_frames.append(frame_name) + + if len(shm_frames) > self.shm_frame_count: + expired_frame_name = shm_frames.pop(0) + self.frame_manager.delete(expired_frame_name) + except Exception: + # always delete the frame + self.frame_manager.delete(frame_name) + + # shutdown has been initiated + if self.stop_event.is_set(): + break + + self.logger.error( + f"{self.config.name}: Unable to read frames from ffmpeg process." + ) + + if self.ffmpeg_process.poll() is not None: + self.logger.error( + f"{self.config.name}: ffmpeg process is not running. exiting capture thread..." + ) + break + + continue + + frame_rate.update() + + # don't lock the queue to check, just try since it should rarely be full + try: + # add to the queue + self.frame_queue.put(self.current_frame.value, False) + self.frame_manager.close(frame_name) + except queue.Full: + # if the queue is full, skip this frame + skipped_eps.update() + + # clear out frames + for frame in shm_frames: + self.frame_manager.delete(frame) diff --git a/frigate/video.py b/frigate/camera/tracker.py old mode 100755 new mode 100644 similarity index 59% rename from frigate/video.py rename to frigate/camera/tracker.py index 10958fcec..5bf4ae6e0 --- a/frigate/video.py +++ b/frigate/camera/tracker.py @@ -1,10 +1,6 @@ import datetime -import logging import multiprocessing as mp -import os import queue -import subprocess as sp -import threading from multiprocessing.synchronize import Event import cv2 @@ -15,11 +11,8 @@ from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.const import ( - CACHE_DIR, - CACHE_SEGMENT_FORMAT, REQUEST_REGION_GRID, ) -from frigate.log import LogPipe from frigate.motion.improved_motion import ImprovedMotionDetector from frigate.object_detection import RemoteObjectDetector from frigate.ptz.autotrack import ptz_moving_at_frame_time @@ -44,312 +37,6 @@ from frigate.util.object import ( ) -def stop_ffmpeg(ffmpeg_process, logger): - logger.info("Terminating the existing ffmpeg process...") - ffmpeg_process.terminate() - try: - logger.info("Waiting for ffmpeg to exit gracefully...") - ffmpeg_process.communicate(timeout=30) - except sp.TimeoutExpired: - logger.info("FFmpeg didn't exit. Force killing...") - ffmpeg_process.kill() - ffmpeg_process.communicate() - ffmpeg_process = None - - -def start_or_restart_ffmpeg( - ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None -): - if ffmpeg_process is not None: - stop_ffmpeg(ffmpeg_process, logger) - - if frame_size is None: - process = sp.Popen( - ffmpeg_cmd, - stdout=sp.DEVNULL, - stderr=logpipe, - stdin=sp.DEVNULL, - start_new_session=True, - ) - else: - process = sp.Popen( - ffmpeg_cmd, - stdout=sp.PIPE, - stderr=logpipe, - stdin=sp.DEVNULL, - bufsize=frame_size * 10, - start_new_session=True, - ) - return process - - -class CameraWatchdog(util.Process): - def __init__( - self, - camera_name: str, - config: CameraConfig, - shm_frame_count: int, - camera_metrics: CameraMetrics, - ): - super().__init__(name=f"frigate.watchdog:{camera_name}") - - self.camera_name = camera_name - self.config = config - self.shm_frame_count = shm_frame_count - self.camera_fps = camera_metrics.camera_fps - self.skipped_fps = camera_metrics.skipped_fps - self.ffmpeg_pid = camera_metrics.ffmpeg_pid - self.frame_queue = camera_metrics.frame_queue - - def run(self): - self.ffmpeg_detect_process = None - self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") - self.ffmpeg_other_processes: list[dict[str, any]] = [] - self.frame_shape = self.config.frame_shape_yuv - self.frame_size = self.frame_shape[0] * self.frame_shape[1] - - fps_overflow_count = 0 - sleeptime = self.config.ffmpeg.retry_interval - - capture_thread = self.start_ffmpeg_detect() - - for c in self.config.ffmpeg_cmds: - if "detect" in c["roles"]: - continue - logpipe = LogPipe( - f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}" - ) - self.ffmpeg_other_processes.append( - { - "cmd": c["cmd"], - "roles": c["roles"], - "logpipe": logpipe, - "process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe), - } - ) - - while not self.stop_event.wait(sleeptime): - now = datetime.datetime.now().timestamp() - - if not capture_thread.is_alive(): - self.camera_fps.value = 0 - self.logger.error( - f"Ffmpeg process crashed unexpectedly for {self.camera_name}." - ) - self.logger.error( - "The following ffmpeg logs include the last 100 lines prior to exit." - ) - self.logpipe.dump() - capture_thread = self.start_ffmpeg_detect() - elif now - capture_thread.current_frame.value > 20: - self.camera_fps.value = 0 - self.logger.info( - f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." - ) - self.ffmpeg_detect_process.terminate() - try: - self.logger.info("Waiting for ffmpeg to exit gracefully...") - self.ffmpeg_detect_process.communicate(timeout=30) - except sp.TimeoutExpired: - self.logger.info("FFmpeg did not exit. Force killing...") - self.ffmpeg_detect_process.kill() - self.ffmpeg_detect_process.communicate() - elif self.camera_fps.value >= (self.config.detect.fps + 10): - fps_overflow_count += 1 - - if fps_overflow_count == 3: - fps_overflow_count = 0 - self.camera_fps.value = 0 - self.logger.info( - f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." - ) - self.ffmpeg_detect_process.terminate() - try: - self.logger.info("Waiting for ffmpeg to exit gracefully...") - self.ffmpeg_detect_process.communicate(timeout=30) - except sp.TimeoutExpired: - self.logger.info("FFmpeg did not exit. Force killing...") - self.ffmpeg_detect_process.kill() - self.ffmpeg_detect_process.communicate() - else: - # process is running normally - fps_overflow_count = 0 - - for p in self.ffmpeg_other_processes: - poll = p["process"].poll() - - if self.config.record.enabled and "record" in p["roles"]: - latest_segment_time = self.get_latest_segment_datetime( - p.get( - "latest_segment_time", - datetime.datetime.now().astimezone(datetime.timezone.utc), - ) - ) - - if datetime.datetime.now().astimezone(datetime.timezone.utc) > ( - latest_segment_time + datetime.timedelta(seconds=120) - ): - self.logger.error( - f"No new recording segments were created for {self.camera_name} in the last 120s. restarting the ffmpeg record process..." - ) - p["process"] = start_or_restart_ffmpeg( - p["cmd"], - self.logger, - p["logpipe"], - ffmpeg_process=p["process"], - ) - continue - else: - p["latest_segment_time"] = latest_segment_time - - if poll is None: - continue - - p["logpipe"].dump() - p["process"] = start_or_restart_ffmpeg( - p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] - ) - - stop_ffmpeg(self.ffmpeg_detect_process, self.logger) - for p in self.ffmpeg_other_processes: - stop_ffmpeg(p["process"], self.logger) - p["logpipe"].close() - self.logpipe.close() - - def start_ffmpeg_detect(self): - ffmpeg_cmd = [ - c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"] - ][0] - self.ffmpeg_detect_process = start_or_restart_ffmpeg( - ffmpeg_cmd, self.logger, self.logpipe, self.frame_size - ) - self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid - capture_thread = CameraCapture( - self.config, - self.shm_frame_count, - self.ffmpeg_detect_process, - self.frame_shape, - self.frame_queue, - self.camera_fps, - self.skipped_fps, - self.stop_event, - ) - capture_thread.start() - - return capture_thread - - def get_latest_segment_datetime(self, latest_segment: datetime.datetime) -> int: - """Checks if ffmpeg is still writing recording segments to cache.""" - cache_files = sorted( - [ - d - for d in os.listdir(CACHE_DIR) - if os.path.isfile(os.path.join(CACHE_DIR, d)) - and d.endswith(".mp4") - and not d.startswith("preview_") - ] - ) - newest_segment_time = latest_segment - - for file in cache_files: - if self.camera_name in file: - basename = os.path.splitext(file)[0] - _, date = basename.rsplit("@", maxsplit=1) - segment_time = datetime.datetime.strptime( - date, CACHE_SEGMENT_FORMAT - ).astimezone(datetime.timezone.utc) - if segment_time > newest_segment_time: - newest_segment_time = segment_time - - return newest_segment_time - - -class CameraCapture(threading.Thread): - def __init__( - self, - config: CameraConfig, - shm_frame_count: int, - ffmpeg_process, - frame_shape, - frame_queue, - fps, - skipped_fps, - stop_event, - ): - super().__init__(name=f"capture:{config.name}") - - self.logger = logging.getLogger(self.name) - self.config = config - self.shm_frame_count = shm_frame_count - self.frame_shape = frame_shape - self.frame_queue = frame_queue - self.fps = fps - self.stop_event = stop_event - self.skipped_fps = skipped_fps - self.frame_manager = SharedMemoryFrameManager() - self.ffmpeg_process = ffmpeg_process - self.current_frame = mp.Value("d", 0.0) - - def run(self): - frame_size = self.frame_shape[0] * self.frame_shape[1] - frame_rate = EventsPerSecond() - frame_rate.start() - skipped_eps = EventsPerSecond() - skipped_eps.start() - - shm_frames: list[str] = [] - - while True: - self.fps.value = frame_rate.eps() - self.skipped_fps.value = skipped_eps.eps() - self.current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{self.config.name}{self.current_frame.value}" - frame_buffer = self.frame_manager.create(frame_name, frame_size) - try: - frame_buffer[:] = self.ffmpeg_process.stdout.read(frame_size) - - # update frame cache and cleanup existing frames - shm_frames.append(frame_name) - - if len(shm_frames) > self.shm_frame_count: - expired_frame_name = shm_frames.pop(0) - self.frame_manager.delete(expired_frame_name) - except Exception: - # always delete the frame - self.frame_manager.delete(frame_name) - - # shutdown has been initiated - if self.stop_event.is_set(): - break - - self.logger.error( - f"{self.config.name}: Unable to read frames from ffmpeg process." - ) - - if self.ffmpeg_process.poll() is not None: - self.logger.error( - f"{self.config.name}: ffmpeg process is not running. exiting capture thread..." - ) - break - - continue - - frame_rate.update() - - # don't lock the queue to check, just try since it should rarely be full - try: - # add to the queue - self.frame_queue.put(self.current_frame.value, False) - self.frame_manager.close(frame_name) - except queue.Full: - # if the queue is full, skip this frame - skipped_eps.update() - - # clear out frames - for frame in shm_frames: - self.frame_manager.delete(frame) - - class CameraTracker(util.Process): def __init__( self, diff --git a/frigate/events/audio.py b/frigate/events/audio.py index a00f5ba3d..f86bc2b76 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -29,7 +29,7 @@ from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe from frigate.object_detection import load_labels from frigate.util.builtin import get_ffmpeg_arg_list -from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg +from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg try: from tflite_runtime.interpreter import Interpreter diff --git a/frigate/util/ffmpeg.py b/frigate/util/ffmpeg.py new file mode 100644 index 000000000..2efd2ac94 --- /dev/null +++ b/frigate/util/ffmpeg.py @@ -0,0 +1,42 @@ +import subprocess as sp + +from frigate.log import LogPipe + + +def stop_ffmpeg(ffmpeg_process, logger): + logger.info("Terminating the existing ffmpeg process...") + ffmpeg_process.terminate() + try: + logger.info("Waiting for ffmpeg to exit gracefully...") + ffmpeg_process.communicate(timeout=30) + except sp.TimeoutExpired: + logger.info("FFmpeg didn't exit. Force killing...") + ffmpeg_process.kill() + ffmpeg_process.communicate() + ffmpeg_process = None + + +def start_or_restart_ffmpeg( + ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None +): + if ffmpeg_process is not None: + stop_ffmpeg(ffmpeg_process, logger) + + if frame_size is None: + process = sp.Popen( + ffmpeg_cmd, + stdout=sp.DEVNULL, + stderr=logpipe, + stdin=sp.DEVNULL, + start_new_session=True, + ) + else: + process = sp.Popen( + ffmpeg_cmd, + stdout=sp.PIPE, + stderr=logpipe, + stdin=sp.DEVNULL, + bufsize=frame_size * 10, + start_new_session=True, + ) + return process