From 301c01dbf201b4bd3651aadff5c7928d5a5d31e6 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 11 Jun 2025 08:31:01 -0600 Subject: [PATCH] Use ZMQ for signaling object detectoin is completed --- frigate/app.py | 6 +----- frigate/camera/maintainer.py | 5 ----- frigate/comms/object_detector_signaler.py | 21 ++++++++++++++++++ frigate/object_detection/base.py | 26 +++++++++++++---------- frigate/video.py | 3 +-- 5 files changed, 38 insertions(+), 23 deletions(-) create mode 100644 frigate/comms/object_detector_signaler.py diff --git a/frigate/app.py b/frigate/app.py index b070f9ffc..cccbce53e 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -82,7 +82,6 @@ class FrigateApp: self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} - self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.camera_metrics: dict[str, CameraMetrics] = {} @@ -363,8 +362,6 @@ class FrigateApp: def start_detectors(self) -> None: for name in self.config.cameras.keys(): - self.detection_out_events[name] = mp.Event() - try: largest_frame = max( [ @@ -396,7 +393,7 @@ class FrigateApp: self.detectors[name] = ObjectDetectProcess( name, self.detection_queue, - self.detection_out_events, + list(self.config.cameras.keys()), detector_config, ) @@ -435,7 +432,6 @@ class FrigateApp: self.camera_maintainer = CameraMaintainer( self.config, self.detection_queue, - self.detection_out_events, self.detected_frames_queue, self.camera_metrics, self.ptz_metrics, diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py index ab9f1b794..090cc479a 100644 --- a/frigate/camera/maintainer.py +++ b/frigate/camera/maintainer.py @@ -31,7 +31,6 @@ class CameraMaintainer(threading.Thread): self, config: FrigateConfig, detection_queue: Queue, - detection_out_events: dict[str, MpEvent], detected_frames_queue: Queue, camera_metrics: dict[str, CameraMetrics], ptz_metrics: dict[str, PTZMetrics], @@ -40,7 +39,6 @@ class CameraMaintainer(threading.Thread): super().__init__(name="camera_processor") self.config = config self.detection_queue = detection_queue - self.detection_out_events = detection_out_events self.detected_frames_queue = detected_frames_queue self.stop_event = stop_event self.camera_metrics = camera_metrics @@ -54,7 +52,6 @@ class CameraMaintainer(threading.Thread): CameraConfigUpdateEnum.remove, ], ) - self.detector_camera_publisher = DetectorCameraPublisher() self.shm_count = self.__calculate_shm_frame_count() def __init_historical_regions(self) -> None: @@ -151,7 +148,6 @@ class CameraMaintainer(threading.Thread): self.config.model, self.config.model.merged_labelmap, self.detection_queue, - self.detection_out_events[name], self.detected_frames_queue, self.camera_metrics[name], self.ptz_metrics[name], @@ -234,6 +230,5 @@ class CameraMaintainer(threading.Thread): for camera in self.camera_metrics.keys(): self.__stop_camera_process(camera) - self.detector_camera_publisher.stop() self.update_subscriber.stop() self.frame_manager.cleanup() diff --git a/frigate/comms/object_detector_signaler.py b/frigate/comms/object_detector_signaler.py new file mode 100644 index 000000000..befc83e4d --- /dev/null +++ b/frigate/comms/object_detector_signaler.py @@ -0,0 +1,21 @@ +"""Facilitates communication between processes for object detection signals.""" + +from .zmq_proxy import Publisher, Subscriber + + +class ObjectDetectorPublisher(Publisher): + """Publishes signal for object detection to different processes.""" + + topic_base = "object_detector/" + + +class ObjectDetectorSubscriber(Subscriber): + """Simplifies receiving a signal for object detection.""" + + topic_base = "object_detector/" + + def __init__(self, topic: str) -> None: + super().__init__(topic) + + def check_for_update(self): + return super().check_for_update(timeout=5) diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index 21e52ebf8..6711c8002 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -13,6 +13,10 @@ import numpy as np from setproctitle import setproctitle import frigate.util as util +from frigate.comms.object_detector_signaler import ( + ObjectDetectorPublisher, + ObjectDetectorSubscriber, +) from frigate.detectors import create_detector from frigate.detectors.detector_config import ( BaseDetectorConfig, @@ -89,7 +93,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, detection_queue: Queue, - out_events: dict[str, MpEvent], + cameras: list[str], avg_speed: Value, start: Value, detector_config: BaseDetectorConfig, @@ -110,9 +114,10 @@ def run_detector( frame_manager = SharedMemoryFrameManager() object_detector = LocalObjectDetector(detector_config=detector_config) + detector_publisher = ObjectDetectorPublisher() outputs = {} - for name in out_events.keys(): + for name in cameras: out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) outputs[name] = {"shm": out_shm, "np": out_np} @@ -137,11 +142,12 @@ def run_detector( duration = datetime.datetime.now().timestamp() - start.value frame_manager.close(connection_id) outputs[connection_id]["np"][:] = detections[:] - out_events[connection_id].set() + detector_publisher.publish(connection_id, connection_id) start.value = 0.0 avg_speed.value = (avg_speed.value * 9 + duration) / 10 + detector_publisher.stop() logger.info("Exited detection process...") @@ -150,11 +156,11 @@ class ObjectDetectProcess: self, name: str, detection_queue: Queue, - out_events: dict[str, MpEvent], + cameras: list[str], detector_config: BaseDetectorConfig, ): self.name = name - self.out_events = out_events + self.cameras = cameras self.detection_queue = detection_queue self.avg_inference_speed = Value("d", 0.01) self.detection_start = Value("d", 0.0) @@ -176,7 +182,6 @@ class ObjectDetectProcess: logging.info("Detection process has exited...") def start_or_restart(self): - # TODO have to create a separate ZMQ listener for the MP.Events to be sent here self.detection_start.value = 0.0 if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() @@ -186,7 +191,7 @@ class ObjectDetectProcess: args=( self.name, self.detection_queue, - self.out_events, + self.cameras, self.avg_inference_speed, self.detection_start, self.detector_config, @@ -202,7 +207,6 @@ class RemoteObjectDetector: name: str, labels: dict[int, str], detection_queue: Queue, - event: MpEvent, model_config: ModelConfig, stop_event: MpEvent, ): @@ -210,7 +214,6 @@ class RemoteObjectDetector: self.name = name self.fps = EventsPerSecond() self.detection_queue = detection_queue - self.event = event self.stop_event = stop_event self.shm = UntrackedSharedMemory(name=self.name, create=False) self.np_shm = np.ndarray( @@ -220,6 +223,7 @@ class RemoteObjectDetector: ) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) + self.detector_subscriber = ObjectDetectorSubscriber(name) def detect(self, tensor_input, threshold=0.4): detections = [] @@ -229,9 +233,8 @@ class RemoteObjectDetector: # copy input to shared memory self.np_shm[:] = tensor_input[:] - self.event.clear() self.detection_queue.put(self.name) - result = self.event.wait(timeout=5.0) + result = self.detector_subscriber.check_for_update() # if it timed out if result is None: @@ -247,5 +250,6 @@ class RemoteObjectDetector: return detections def cleanup(self): + self.detector_subscriber.stop() self.shm.unlink() self.out_shm.unlink() diff --git a/frigate/video.py b/frigate/video.py index 5fc70ca02..369971b4c 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -473,7 +473,6 @@ def track_camera( model_config: ModelConfig, labelmap: dict[int, str], detection_queue: Queue, - result_connection: MpEvent, detected_objects_queue, camera_metrics: CameraMetrics, ptz_metrics: PTZMetrics, @@ -503,7 +502,7 @@ def track_camera( ptz_metrics=ptz_metrics, ) object_detector = RemoteObjectDetector( - name, labelmap, detection_queue, result_connection, model_config, stop_event + name, labelmap, detection_queue, model_config, stop_event ) object_tracker = NorfairTracker(config, ptz_metrics)