From bbef515ebcae8bd457c88056509a5449d6f3dc47 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Fri, 16 Feb 2024 13:33:11 -0700 Subject: [PATCH] Cleanup video output queue --- frigate/app.py | 20 +++++---------- frigate/comms/detections_updater.py | 19 +++++++++++---- frigate/events/audio.py | 3 ++- frigate/object_processing.py | 13 +--------- frigate/output/output.py | 38 +++++++++++++++++++---------- frigate/record/maintainer.py | 2 +- 6 files changed, 49 insertions(+), 46 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 116405e06..81050e30b 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -194,9 +194,6 @@ class FrigateApp: # Queues for clip processing self.event_queue: Queue = mp.Queue() self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( - maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 - ) # Queue for cameras to push tracked objects to self.detected_frames_queue: Queue = mp.Queue( @@ -404,7 +401,6 @@ class FrigateApp: self.detected_frames_queue, self.event_queue, self.event_processed_queue, - self.video_output_queue, self.ptz_autotracker_thread, self.stop_event, ) @@ -414,10 +410,7 @@ class FrigateApp: output_processor = mp.Process( target=output_frames, name="output_processor", - args=( - self.config, - self.video_output_queue, - ), + args=(self.config,), ) output_processor.daemon = True self.output_processor = output_processor @@ -670,11 +663,6 @@ class FrigateApp: self.detection_queue.close() self.detection_queue.join_thread() - # Stop Communicators - self.inter_process_communicator.stop() - self.inter_config_updater.stop() - self.inter_detection_proxy.stop() - self.dispatcher.stop() self.detected_frames_processor.join() self.ptz_autotracker_thread.join() @@ -693,7 +681,6 @@ class FrigateApp: for queue in [ self.event_queue, self.event_processed_queue, - self.video_output_queue, self.detected_frames_queue, self.log_queue, ]: @@ -702,3 +689,8 @@ class FrigateApp: queue.get_nowait() queue.close() queue.join_thread() + + # Stop Communicators + self.inter_process_communicator.stop() + self.inter_config_updater.stop() + self.inter_detection_proxy.stop() diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py index 75e725bf5..d1624dc93 100644 --- a/frigate/comms/detections_updater.py +++ b/frigate/comms/detections_updater.py @@ -8,10 +8,12 @@ from typing import Optional import zmq from frigate.const import ( - PORT_INTER_PROCESS_DETECTION_SUB, PORT_INTER_PROCESS_DETECTION_PUB, + PORT_INTER_PROCESS_DETECTION_SUB, ) +SOCKET_CONTROL = "inproc://control.detections_updater" + class DetectionTypeEnum(str, Enum): all = "" @@ -36,14 +38,17 @@ class DetectionProxyRunner(threading.Thread): os.environ.get("INTER_PROCESS_DETECTION_SUB_PORT") or PORT_INTER_PROCESS_DETECTION_SUB ) + control = self.context.socket(zmq.SUB) + control.connect(SOCKET_CONTROL) + control.setsockopt_string(zmq.SUBSCRIBE, "") incoming = self.context.socket(zmq.XSUB) incoming.bind(f"tcp://127.0.0.1:{PUB_PORT}") outgoing = self.context.socket(zmq.XPUB) outgoing.bind(f"tcp://127.0.0.1:{SUB_PORT}") - zmq.proxy( - incoming, outgoing - ) # blocking, will unblock when context is destroyed + zmq.proxy_steerable( + incoming, outgoing, None, control + ) # blocking, will unblock terminate message is received incoming.close() outgoing.close() @@ -53,11 +58,15 @@ class DetectionProxy: def __init__(self) -> None: self.context = zmq.Context() + self.control = self.context.socket(zmq.PUB) + self.control.bind(SOCKET_CONTROL) self.runner = DetectionProxyRunner(self.context) self.runner.start() def stop(self) -> None: - self.context.destroy() # destroying the context will stop the proxy + self.control.send_string("TERMINATE") # tell the proxy to stop + self.runner.join() + self.context.destroy() class DetectionPublisher: diff --git a/frigate/events/audio.py b/frigate/events/audio.py index c3f8e36a4..c1b38cfb4 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -189,7 +189,7 @@ class AudioEventMaintainer(threading.Thread): self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) def detect_audio(self, audio) -> None: - if not self.config.audio.enabled: + if not self.config.audio.enabled or self.stop_event.is_set(): return audio_as_float = audio.astype(np.float32) @@ -350,3 +350,4 @@ class AudioEventMaintainer(threading.Thread): self.logpipe.close() self.requestor.stop() self.config_subscriber.stop() + self.detection_publisher.stop() diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 8a956a4af..6146f54a3 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -816,7 +816,6 @@ class TrackedObjectProcessor(threading.Thread): tracked_objects_queue, event_queue, event_processed_queue, - video_output_queue, ptz_autotracker_thread, stop_event, ): @@ -827,7 +826,6 @@ class TrackedObjectProcessor(threading.Thread): self.tracked_objects_queue = tracked_objects_queue self.event_queue = event_queue self.event_processed_queue = event_processed_queue - self.video_output_queue = video_output_queue self.stop_event = stop_event self.camera_states: dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() @@ -1118,16 +1116,6 @@ class TrackedObjectProcessor(threading.Thread): o.to_dict() for o in camera_state.tracked_objects.values() ] - self.video_output_queue.put( - ( - camera, - frame_time, - tracked_objects, - motion_boxes, - regions, - ) - ) - # publish info on this frame self.detection_publisher.send_data( ( @@ -1214,4 +1202,5 @@ class TrackedObjectProcessor(threading.Thread): event_id, camera = self.event_processed_queue.get() self.camera_states[camera].finished(event_id) + self.detection_publisher.stop() logger.info("Exiting object processor...") diff --git a/frigate/output/output.py b/frigate/output/output.py index fb9be80e2..85ea586a7 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -2,9 +2,9 @@ import logging import multiprocessing as mp -import queue import signal import threading +import time from typing import Optional from wsgiref.simple_server import make_server @@ -16,6 +16,7 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig from frigate.output.birdseye import Birdseye @@ -28,7 +29,6 @@ logger = logging.getLogger(__name__) def output_frames( config: FrigateConfig, - video_output_queue: mp.Queue, ): threading.current_thread().name = "output" setproctitle("frigate.output") @@ -56,6 +56,8 @@ def output_frames( websocket_server.initialize_websockets_manager() websocket_thread = threading.Thread(target=websocket_server.serve_forever) + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) + jsmpeg_cameras: dict[str, JsmpegCamera] = {} birdseye: Optional[Birdseye] = None preview_recorders: dict[str, PreviewRecorder] = {} @@ -73,17 +75,20 @@ def output_frames( websocket_thread.start() while not stop_event.is_set(): - try: - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = video_output_queue.get(True, 1) - except queue.Empty: + (topic, data) = detection_subscriber.get_data() + + if not topic: + time.sleep(0.1) continue + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) @@ -122,19 +127,26 @@ def output_frames( previous_frames[camera] = frame_time - while not video_output_queue.empty(): + while True: + (topic, data) = detection_subscriber.get_data() + + if not topic: + break + ( camera, frame_time, current_tracked_objects, motion_boxes, regions, - ) = video_output_queue.get(True, 10) + ) = data frame_id = f"{camera}{frame_time}" frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv) frame_manager.delete(frame_id) + detection_subscriber.stop() + for jsmpeg in jsmpeg_cameras.values(): jsmpeg.stop() diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 81a5d582f..52f357894 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -3,7 +3,6 @@ import asyncio import datetime import logging -import multiprocessing as mp import os import random import string @@ -509,4 +508,5 @@ class RecordingMaintainer(threading.Thread): self.requestor.stop() self.config_subscriber.stop() + self.detection_subscriber.stop() logger.info("Exiting recording maintenance...")