From 3badc757ccd3f72b1499fb67eef37e962f6e3c20 Mon Sep 17 00:00:00 2001 From: George Tsiamasiotis Date: Wed, 2 Oct 2024 12:14:14 +0300 Subject: [PATCH] Process subclass for output_frames --- frigate/app.py | 15 +-- frigate/output/output.py | 277 +++++++++++++++++++-------------------- 2 files changed, 139 insertions(+), 153 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 2f5786101..d8f3d8097 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -13,7 +13,6 @@ from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase -from frigate import util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera.camera import Camera @@ -55,7 +54,7 @@ from frigate.models import ( ) from frigate.object_detection import ObjectDetectProcess from frigate.object_processing import TrackedObjectProcessor -from frigate.output.output import output_frames +from frigate.output.output import OutputProcessor from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup @@ -334,15 +333,9 @@ class FrigateApp: self.detected_frames_processor.start() def start_video_output_processor(self) -> None: - output_processor = util.Process( - target=output_frames, - name="output_processor", - args=(self.config,), - daemon=True, - ) - self.output_processor = output_processor - output_processor.start() - logger.info(f"Output process started: {output_processor.pid}") + self.output_processor = OutputProcessor(self.config) + self.output_processor.start() + logger.info(f"Output process started: {self.output_processor.pid}") def init_cameras(self) -> None: for name in self.config.cameras.keys(): diff --git a/frigate/output/output.py b/frigate/output/output.py index 7d5b6d39a..d30a0bd0b 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -1,15 +1,11 @@ """Handle outputting raw frigate frames""" -import logging -import multiprocessing as mp import os import shutil -import signal import threading from typing import Optional from wsgiref.simple_server import make_server -from setproctitle import setproctitle from ws4py.server.wsgirefserver import ( WebSocketWSGIHandler, WebSocketWSGIRequestHandler, @@ -17,6 +13,7 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication +from frigate import util from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig @@ -26,171 +23,167 @@ from frigate.output.camera import JsmpegCamera from frigate.output.preview import PreviewRecorder from frigate.util.image import SharedMemoryFrameManager -logger = logging.getLogger(__name__) +class OutputProcessor(util.Process): + def __init__(self, config: FrigateConfig): + super().__init__(name="frigate.output", daemon=True) + self.config = config -def output_frames( - config: FrigateConfig, -): - threading.current_thread().name = "output" - setproctitle("frigate.output") + def run(self): + frame_manager = SharedMemoryFrameManager() - stop_event = mp.Event() + # start a websocket server on 8082 + WebSocketWSGIHandler.http_version = "1.1" + websocket_server = make_server( + "127.0.0.1", + 8082, + server_class=WSGIServer, + handler_class=WebSocketWSGIRequestHandler, + app=WebSocketWSGIApplication(handler_cls=WebSocket), + ) + websocket_server.initialize_websockets_manager() + websocket_thread = threading.Thread(target=websocket_server.serve_forever) - def receiveSignal(signalNumber, frame): - stop_event.set() + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) + jsmpeg_cameras: dict[str, JsmpegCamera] = {} + birdseye: Optional[Birdseye] = None + preview_recorders: dict[str, PreviewRecorder] = {} + preview_write_times: dict[str, float] = {} - frame_manager = SharedMemoryFrameManager() + self.move_preview_frames("cache") - # start a websocket server on 8082 - WebSocketWSGIHandler.http_version = "1.1" - websocket_server = make_server( - "127.0.0.1", - 8082, - server_class=WSGIServer, - handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocket), - ) - websocket_server.initialize_websockets_manager() - websocket_thread = threading.Thread(target=websocket_server.serve_forever) + for camera, cam_config in self.config.cameras.items(): + if not cam_config.enabled: + continue - detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - - jsmpeg_cameras: dict[str, JsmpegCamera] = {} - birdseye: Optional[Birdseye] = None - preview_recorders: dict[str, PreviewRecorder] = {} - preview_write_times: dict[str, float] = {} - - move_preview_frames("cache") - - for camera, cam_config in config.cameras.items(): - if not cam_config.enabled: - continue - - jsmpeg_cameras[camera] = JsmpegCamera(cam_config, stop_event, websocket_server) - preview_recorders[camera] = PreviewRecorder(cam_config) - preview_write_times[camera] = 0 - - if config.birdseye.enabled: - birdseye = Birdseye(config, stop_event, websocket_server) - - websocket_thread.start() - - while not stop_event.is_set(): - (topic, data) = detection_subscriber.check_for_update(timeout=1) - - if not topic: - continue - - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data - - frame_id = f"{camera}{frame_time}" - - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - - if frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") - continue - - # send camera frame to ffmpeg process if websockets are connected - if any( - ws.environ["PATH_INFO"].endswith(camera) for ws in websocket_server.manager - ): - # write to the converter for the camera if clients are listening to the specific camera - jsmpeg_cameras[camera].write_frame(frame.tobytes()) - - # send output data to birdseye if websocket is connected or restreaming - if config.birdseye.enabled and ( - config.birdseye.restream - or any( - ws.environ["PATH_INFO"].endswith("birdseye") - for ws in websocket_server.manager + jsmpeg_cameras[camera] = JsmpegCamera( + cam_config, self.stop_event, websocket_server ) - ): - birdseye.write_data( + preview_recorders[camera] = PreviewRecorder(cam_config) + preview_write_times[camera] = 0 + + if self.config.birdseye.enabled: + birdseye = Birdseye(self.config, self.stop_event, websocket_server) + + websocket_thread.start() + + while not self.stop_event.is_set(): + (topic, data) = detection_subscriber.check_for_update(timeout=1) + + if not topic: + continue + + ( camera, + frame_time, current_tracked_objects, motion_boxes, - frame_time, - frame, + regions, + ) = data + + frame_id = f"{camera}{frame_time}" + + frame = frame_manager.get( + frame_id, self.config.cameras[camera].frame_shape_yuv ) - # send frames for low fps recording - generated_preview = preview_recorders[camera].write_data( - current_tracked_objects, motion_boxes, frame_time, frame - ) - preview_write_times[camera] = frame_time + if frame is None: + self.logger.debug(f"Failed to get frame {frame_id} from SHM") + continue - # if another camera generated a preview, - # check for any cameras that are currently offline - # and need to generate a preview - if generated_preview: - for camera, time in preview_write_times.copy().items(): - if time != 0 and frame_time - time > 10: - preview_recorders[camera].flag_offline(frame_time) - preview_write_times[camera] = frame_time + # send camera frame to ffmpeg process if websockets are connected + if any( + ws.environ["PATH_INFO"].endswith(camera) + for ws in websocket_server.manager + ): + # write to the converter for the camera if clients are listening to the specific camera + jsmpeg_cameras[camera].write_frame(frame.tobytes()) - frame_manager.close(frame_id) + # send output data to birdseye if websocket is connected or restreaming + if self.config.birdseye.enabled and ( + self.config.birdseye.restream + or any( + ws.environ["PATH_INFO"].endswith("birdseye") + for ws in websocket_server.manager + ) + ): + birdseye.write_data( + camera, + current_tracked_objects, + motion_boxes, + frame_time, + frame, + ) - move_preview_frames("clips") + # send frames for low fps recording + generated_preview = preview_recorders[camera].write_data( + current_tracked_objects, motion_boxes, frame_time, frame + ) + preview_write_times[camera] = frame_time - while True: - (topic, data) = detection_subscriber.check_for_update(timeout=0) + # if another camera generated a preview, + # check for any cameras that are currently offline + # and need to generate a preview + if generated_preview: + for camera, time in preview_write_times.copy().items(): + if time != 0 and frame_time - time > 10: + preview_recorders[camera].flag_offline(frame_time) + preview_write_times[camera] = frame_time - if not topic: - break + frame_manager.close(frame_id) - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data + self.move_preview_frames("clips") - frame_id = f"{camera}{frame_time}" - frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) - frame_manager.close(frame_id) + while True: + (topic, data) = detection_subscriber.check_for_update(timeout=0) - detection_subscriber.stop() + if not topic: + break - for jsmpeg in jsmpeg_cameras.values(): - jsmpeg.stop() + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data - for preview in preview_recorders.values(): - preview.stop() + frame_id = f"{camera}{frame_time}" + frame = frame_manager.get( + frame_id, self.config.cameras[camera].frame_shape_yuv + ) + frame_manager.close(frame_id) - if birdseye is not None: - birdseye.stop() + detection_subscriber.stop() - websocket_server.manager.close_all() - websocket_server.manager.stop() - websocket_server.manager.join() - websocket_server.shutdown() - websocket_thread.join() - logger.info("exiting output process...") + for jsmpeg in jsmpeg_cameras.values(): + jsmpeg.stop() + for preview in preview_recorders.values(): + preview.stop() -def move_preview_frames(loc: str): - preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache") - preview_cache = os.path.join(CACHE_DIR, "preview_frames") + if birdseye is not None: + birdseye.stop() - try: - if loc == "clips": - shutil.move(preview_cache, preview_holdover) - elif loc == "cache": - if not os.path.exists(preview_holdover): - return + websocket_server.manager.close_all() + websocket_server.manager.stop() + websocket_server.manager.join() + websocket_server.shutdown() + websocket_thread.join() + self.logger.info("Exiting output process...") - shutil.move(preview_holdover, preview_cache) - except shutil.Error: - logger.error("Failed to restore preview cache.") + def move_preview_frames(self, loc: str): + preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache") + preview_cache = os.path.join(CACHE_DIR, "preview_frames") + + try: + if loc == "clips": + shutil.move(preview_cache, preview_holdover) + elif loc == "cache": + if not os.path.exists(preview_holdover): + return + + shutil.move(preview_holdover, preview_cache) + except shutil.Error: + self.logger.error("Failed to restore preview cache.")