diff --git a/frigate/app.py b/frigate/app.py index f22908f6c..116405e06 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -18,6 +18,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.config_updater import ConfigPublisher +from frigate.comms.detections_updater import DetectionProxy from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient @@ -202,16 +203,6 @@ class FrigateApp: maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2 ) - # Queue for object recordings info - self.object_recordings_info_queue: Queue = mp.Queue() - - # Queue for audio recordings info if enabled - self.audio_recordings_info_queue: Optional[Queue] = ( - mp.Queue() - if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0 - else None - ) - # Queue for timeline events self.timeline_queue: Queue = mp.Queue() @@ -287,11 +278,7 @@ class FrigateApp: recording_process = mp.Process( target=manage_recordings, name="recording_manager", - args=( - self.config, - self.object_recordings_info_queue, - self.audio_recordings_info_queue, - ), + args=(self.config,), ) recording_process.daemon = True self.recording_process = recording_process @@ -329,6 +316,7 @@ class FrigateApp: def init_inter_process_communicator(self) -> None: self.inter_process_communicator = InterProcessCommunicator() self.inter_config_updater = ConfigPublisher() + self.inter_detection_proxy = DetectionProxy() def init_web_server(self) -> None: self.flask_app = create_app( @@ -417,7 +405,6 @@ class FrigateApp: self.event_queue, self.event_processed_queue, self.video_output_queue, - self.object_recordings_info_queue, self.ptz_autotracker_thread, self.stop_event, ) @@ -500,7 +487,6 @@ class FrigateApp: name="audio_capture", args=( self.config, - self.audio_recordings_info_queue, self.camera_metrics, ), ) @@ -687,6 +673,7 @@ class FrigateApp: # Stop Communicators self.inter_process_communicator.stop() self.inter_config_updater.stop() + self.inter_detection_proxy.stop() self.dispatcher.stop() self.detected_frames_processor.join() @@ -708,8 +695,6 @@ class FrigateApp: self.event_processed_queue, self.video_output_queue, self.detected_frames_queue, - self.object_recordings_info_queue, - self.audio_recordings_info_queue, self.log_queue, ]: if queue is not None: diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py new file mode 100644 index 000000000..75e725bf5 --- /dev/null +++ b/frigate/comms/detections_updater.py @@ -0,0 +1,109 @@ +"""Facilitates communication between processes.""" + +import os +import threading +from enum import Enum +from typing import Optional + +import zmq + +from frigate.const import ( + PORT_INTER_PROCESS_DETECTION_SUB, + PORT_INTER_PROCESS_DETECTION_PUB, +) + + +class DetectionTypeEnum(str, Enum): + all = "" + video = "video" + audio = "audio" + + +class DetectionProxyRunner(threading.Thread): + + def __init__(self, context: zmq.Context[zmq.Socket]) -> None: + threading.Thread.__init__(self) + self.name = "detection_proxy" + self.context = context + + def run(self) -> None: + """Run the proxy.""" + PUB_PORT = ( + os.environ.get("INTER_PROCESS_DETECTION_PUB_PORT") + or PORT_INTER_PROCESS_DETECTION_PUB + ) + SUB_PORT = ( + os.environ.get("INTER_PROCESS_DETECTION_SUB_PORT") + or PORT_INTER_PROCESS_DETECTION_SUB + ) + incoming = self.context.socket(zmq.XSUB) + incoming.bind(f"tcp://127.0.0.1:{PUB_PORT}") + outgoing = self.context.socket(zmq.XPUB) + outgoing.bind(f"tcp://127.0.0.1:{SUB_PORT}") + zmq.proxy( + incoming, outgoing + ) # blocking, will unblock when context is destroyed + + incoming.close() + outgoing.close() + + +class DetectionProxy: + """Proxies video and audio detections.""" + + def __init__(self) -> None: + self.context = zmq.Context() + self.runner = DetectionProxyRunner(self.context) + self.runner.start() + + def stop(self) -> None: + self.context.destroy() # destroying the context will stop the proxy + + +class DetectionPublisher: + """Simplifies receiving video and audio detections.""" + + def __init__(self, topic: DetectionTypeEnum) -> None: + port = ( + os.environ.get("INTER_PROCESS_DETECTIONS_PORT") + or PORT_INTER_PROCESS_DETECTION_PUB + ) + self.topic = topic + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.connect(f"tcp://127.0.0.1:{port}") + + def send_data(self, payload: any) -> None: + """Publish detection.""" + self.socket.send_string(self.topic.value, flags=zmq.SNDMORE) + self.socket.send_pyobj(payload) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() + + +class DetectionSubscriber: + """Simplifies receiving video and audio detections.""" + + def __init__(self, topic: DetectionTypeEnum) -> None: + port = ( + os.environ.get("INTER_PROCESS_DETECTIONS_PORT") + or PORT_INTER_PROCESS_DETECTION_SUB + ) + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value) + self.socket.connect(f"tcp://127.0.0.1:{port}") + + def get_data(self) -> Optional[tuple[str, any]]: + """Returns detections or None if no update.""" + try: + topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] + return (topic, self.socket.recv_pyobj()) + except zmq.ZMQError: + return (None, None) + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/const.py b/frigate/const.py index 7c56bb337..c745b3e51 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -61,6 +61,8 @@ DRIVER_INTEL_iHD = "iHD" PORT_INTER_PROCESS_COMM = 4892 PORT_INTER_PROCESS_CONFIG = 4893 +PORT_INTER_PROCESS_DETECTION_PUB = 4894 +PORT_INTER_PROCESS_DETECTION_SUB = 4895 # Record Values diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 6a1a6269a..c3f8e36a4 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -14,6 +14,7 @@ import requests from setproctitle import setproctitle from frigate.comms.config_updater import ConfigSubscriber +from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig from frigate.const import ( @@ -68,7 +69,6 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: def listen_to_audio( config: FrigateConfig, - recordings_info_queue: mp.Queue, camera_metrics: dict[str, CameraMetricsTypes], ) -> None: stop_event = mp.Event() @@ -95,7 +95,6 @@ def listen_to_audio( if camera.enabled and camera.audio.enabled_in_config: audio = AudioEventMaintainer( camera, - recordings_info_queue, camera_metrics, stop_event, ) @@ -167,14 +166,12 @@ class AudioEventMaintainer(threading.Thread): def __init__( self, camera: CameraConfig, - recordings_info_queue: mp.Queue, camera_metrics: dict[str, CameraMetricsTypes], stop_event: mp.Event, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" self.config = camera - self.recordings_info_queue = recordings_info_queue self.camera_metrics = camera_metrics self.detections: dict[dict[str, any]] = {} self.stop_event = stop_event @@ -189,6 +186,7 @@ class AudioEventMaintainer(threading.Thread): # create communication for audio detections self.requestor = InterProcessRequestor() self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}") + self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) def detect_audio(self, audio) -> None: if not self.config.audio.enabled: @@ -219,8 +217,8 @@ class AudioEventMaintainer(threading.Thread): self.handle_detection(label, score) audio_detections.append(label) - # add audio info to recordings queue - self.recordings_info_queue.put( + # send audio detection data + self.detection_publisher.send_data( ( self.config.name, datetime.datetime.now().timestamp(), diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 2d141b970..8a956a4af 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -12,6 +12,7 @@ from typing import Callable import cv2 import numpy as np +from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher from frigate.config import ( CameraConfig, @@ -520,7 +521,9 @@ class CameraState: ): max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[ self.name - ]["max_target_box"] + ][ + "max_target_box" + ] side_length = max_target_box * ( max( self.camera_config.detect.width, @@ -814,7 +817,6 @@ class TrackedObjectProcessor(threading.Thread): event_queue, event_processed_queue, video_output_queue, - recordings_info_queue, ptz_autotracker_thread, stop_event, ): @@ -826,12 +828,12 @@ class TrackedObjectProcessor(threading.Thread): self.event_queue = event_queue self.event_processed_queue = event_processed_queue self.video_output_queue = video_output_queue - self.recordings_info_queue = recordings_info_queue self.stop_event = stop_event self.camera_states: dict[str, CameraState] = {} self.frame_manager = SharedMemoryFrameManager() self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread + self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video) def start(camera, obj: TrackedObject, current_frame_time): self.event_queue.put( @@ -1126,8 +1128,8 @@ class TrackedObjectProcessor(threading.Thread): ) ) - # send info on this frame to the recordings maintainer - self.recordings_info_queue.put( + # publish info on this frame + self.detection_publisher.send_data( ( camera, frame_time, diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 70dee66ba..81a5d582f 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -5,7 +5,6 @@ import datetime import logging import multiprocessing as mp import os -import queue import random import string import threading @@ -18,6 +17,7 @@ import numpy as np import psutil from frigate.comms.config_updater import ConfigSubscriber +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( @@ -57,13 +57,7 @@ class SegmentInfo: class RecordingMaintainer(threading.Thread): - def __init__( - self, - config: FrigateConfig, - object_recordings_info_queue: mp.Queue, - audio_recordings_info_queue: Optional[mp.Queue], - stop_event: MpEvent, - ): + def __init__(self, config: FrigateConfig, stop_event: MpEvent): threading.Thread.__init__(self) self.name = "recording_maintainer" self.config = config @@ -71,9 +65,8 @@ class RecordingMaintainer(threading.Thread): # create communication for retained recordings self.requestor = InterProcessRequestor() self.config_subscriber = ConfigSubscriber("config/record/") + self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) - self.object_recordings_info_queue = object_recordings_info_queue - self.audio_recordings_info_queue = audio_recordings_info_queue self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) self.audio_recordings_info: dict[str, list] = defaultdict(list) @@ -436,7 +429,6 @@ class RecordingMaintainer(threading.Thread): return None def run(self) -> None: - camera_count = sum(camera.enabled for camera in self.config.cameras.values()) # Check for new files every 5 seconds wait_time = 0.0 while not self.stop_event.wait(wait_time): @@ -459,19 +451,19 @@ class RecordingMaintainer(threading.Thread): stale_frame_count_threshold = 10 # empty the object recordings info queue while True: - try: + (topic, data) = self.detection_subscriber.get_data() + + if not topic: + break + + if topic == DetectionTypeEnum.video: ( camera, frame_time, current_tracked_objects, motion_boxes, regions, - ) = self.object_recordings_info_queue.get( - True, timeout=QUEUE_READ_TIMEOUT - ) - - if frame_time < run_start - stale_frame_count_threshold: - stale_frame_count += 1 + ) = data if self.config.cameras[camera].record.enabled: self.object_recordings_info[camera].append( @@ -482,56 +474,29 @@ class RecordingMaintainer(threading.Thread): regions, ) ) - except queue.Empty: - q_size = self.object_recordings_info_queue.qsize() - if q_size > camera_count: - logger.debug( - f"object_recordings_info loop queue not empty ({q_size})." + elif topic == DetectionTypeEnum.audio: + ( + camera, + frame_time, + dBFS, + audio_detections, + ) = data + + if self.config.cameras[camera].record.enabled: + self.audio_recordings_info[camera].append( + ( + frame_time, + dBFS, + audio_detections, + ) ) - break + + if frame_time < run_start - stale_frame_count_threshold: + stale_frame_count += 1 if stale_frame_count > 0: logger.debug(f"Found {stale_frame_count} old frames.") - # empty the audio recordings info queue if audio is enabled - if self.audio_recordings_info_queue: - stale_frame_count = 0 - - while True: - try: - ( - camera, - frame_time, - dBFS, - audio_detections, - ) = self.audio_recordings_info_queue.get( - True, timeout=QUEUE_READ_TIMEOUT - ) - - if frame_time < run_start - stale_frame_count_threshold: - stale_frame_count += 1 - - if self.config.cameras[camera].record.enabled: - self.audio_recordings_info[camera].append( - ( - frame_time, - dBFS, - audio_detections, - ) - ) - except queue.Empty: - q_size = self.audio_recordings_info_queue.qsize() - if q_size > camera_count: - logger.debug( - f"object_recordings_info loop audio queue not empty ({q_size})." - ) - break - - if stale_frame_count > 0: - logger.error( - f"Found {stale_frame_count} old audio frames, segments from recordings may be missing" - ) - try: asyncio.run(self.move_files()) except Exception as e: diff --git a/frigate/record/record.py b/frigate/record/record.py index 29f8931ec..1ffd5394d 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -18,11 +18,7 @@ from frigate.util.services import listen logger = logging.getLogger(__name__) -def manage_recordings( - config: FrigateConfig, - object_recordings_info_queue: mp.Queue, - audio_recordings_info_queue: mp.Queue, -) -> None: +def manage_recordings(config: FrigateConfig) -> None: stop_event = mp.Event() def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: @@ -49,8 +45,6 @@ def manage_recordings( maintainer = RecordingMaintainer( config, - object_recordings_info_queue, - audio_recordings_info_queue, stop_event, ) maintainer.start()