Use ZMQ for signaling object detectoin is completed

This commit is contained in:
Nicolas Mowen 2025-06-11 08:31:01 -06:00
parent 225010c570
commit 301c01dbf2
5 changed files with 38 additions and 23 deletions

View File

@ -82,7 +82,6 @@ class FrigateApp:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {}
@ -363,8 +362,6 @@ class FrigateApp:
def start_detectors(self) -> None:
for name in self.config.cameras.keys():
self.detection_out_events[name] = mp.Event()
try:
largest_frame = max(
[
@ -396,7 +393,7 @@ class FrigateApp:
self.detectors[name] = ObjectDetectProcess(
name,
self.detection_queue,
self.detection_out_events,
list(self.config.cameras.keys()),
detector_config,
)
@ -435,7 +432,6 @@ class FrigateApp:
self.camera_maintainer = CameraMaintainer(
self.config,
self.detection_queue,
self.detection_out_events,
self.detected_frames_queue,
self.camera_metrics,
self.ptz_metrics,

View File

@ -31,7 +31,6 @@ class CameraMaintainer(threading.Thread):
self,
config: FrigateConfig,
detection_queue: Queue,
detection_out_events: dict[str, MpEvent],
detected_frames_queue: Queue,
camera_metrics: dict[str, CameraMetrics],
ptz_metrics: dict[str, PTZMetrics],
@ -40,7 +39,6 @@ class CameraMaintainer(threading.Thread):
super().__init__(name="camera_processor")
self.config = config
self.detection_queue = detection_queue
self.detection_out_events = detection_out_events
self.detected_frames_queue = detected_frames_queue
self.stop_event = stop_event
self.camera_metrics = camera_metrics
@ -54,7 +52,6 @@ class CameraMaintainer(threading.Thread):
CameraConfigUpdateEnum.remove,
],
)
self.detector_camera_publisher = DetectorCameraPublisher()
self.shm_count = self.__calculate_shm_frame_count()
def __init_historical_regions(self) -> None:
@ -151,7 +148,6 @@ class CameraMaintainer(threading.Thread):
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
@ -234,6 +230,5 @@ class CameraMaintainer(threading.Thread):
for camera in self.camera_metrics.keys():
self.__stop_camera_process(camera)
self.detector_camera_publisher.stop()
self.update_subscriber.stop()
self.frame_manager.cleanup()

View File

@ -0,0 +1,21 @@
"""Facilitates communication between processes for object detection signals."""
from .zmq_proxy import Publisher, Subscriber
class ObjectDetectorPublisher(Publisher):
"""Publishes signal for object detection to different processes."""
topic_base = "object_detector/"
class ObjectDetectorSubscriber(Subscriber):
"""Simplifies receiving a signal for object detection."""
topic_base = "object_detector/"
def __init__(self, topic: str) -> None:
super().__init__(topic)
def check_for_update(self):
return super().check_for_update(timeout=5)

View File

@ -13,6 +13,10 @@ import numpy as np
from setproctitle import setproctitle
import frigate.util as util
from frigate.comms.object_detector_signaler import (
ObjectDetectorPublisher,
ObjectDetectorSubscriber,
)
from frigate.detectors import create_detector
from frigate.detectors.detector_config import (
BaseDetectorConfig,
@ -89,7 +93,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector(
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
cameras: list[str],
avg_speed: Value,
start: Value,
detector_config: BaseDetectorConfig,
@ -110,9 +114,10 @@ def run_detector(
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
detector_publisher = ObjectDetectorPublisher()
outputs = {}
for name in out_events.keys():
for name in cameras:
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np}
@ -137,11 +142,12 @@ def run_detector(
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set()
detector_publisher.publish(connection_id, connection_id)
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10
detector_publisher.stop()
logger.info("Exited detection process...")
@ -150,11 +156,11 @@ class ObjectDetectProcess:
self,
name: str,
detection_queue: Queue,
out_events: dict[str, MpEvent],
cameras: list[str],
detector_config: BaseDetectorConfig,
):
self.name = name
self.out_events = out_events
self.cameras = cameras
self.detection_queue = detection_queue
self.avg_inference_speed = Value("d", 0.01)
self.detection_start = Value("d", 0.0)
@ -176,7 +182,6 @@ class ObjectDetectProcess:
logging.info("Detection process has exited...")
def start_or_restart(self):
# TODO have to create a separate ZMQ listener for the MP.Events to be sent here
self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
@ -186,7 +191,7 @@ class ObjectDetectProcess:
args=(
self.name,
self.detection_queue,
self.out_events,
self.cameras,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
@ -202,7 +207,6 @@ class RemoteObjectDetector:
name: str,
labels: dict[int, str],
detection_queue: Queue,
event: MpEvent,
model_config: ModelConfig,
stop_event: MpEvent,
):
@ -210,7 +214,6 @@ class RemoteObjectDetector:
self.name = name
self.fps = EventsPerSecond()
self.detection_queue = detection_queue
self.event = event
self.stop_event = stop_event
self.shm = UntrackedSharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray(
@ -220,6 +223,7 @@ class RemoteObjectDetector:
)
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
self.detector_subscriber = ObjectDetectorSubscriber(name)
def detect(self, tensor_input, threshold=0.4):
detections = []
@ -229,9 +233,8 @@ class RemoteObjectDetector:
# copy input to shared memory
self.np_shm[:] = tensor_input[:]
self.event.clear()
self.detection_queue.put(self.name)
result = self.event.wait(timeout=5.0)
result = self.detector_subscriber.check_for_update()
# if it timed out
if result is None:
@ -247,5 +250,6 @@ class RemoteObjectDetector:
return detections
def cleanup(self):
self.detector_subscriber.stop()
self.shm.unlink()
self.out_shm.unlink()

View File

@ -473,7 +473,6 @@ def track_camera(
model_config: ModelConfig,
labelmap: dict[int, str],
detection_queue: Queue,
result_connection: MpEvent,
detected_objects_queue,
camera_metrics: CameraMetrics,
ptz_metrics: PTZMetrics,
@ -503,7 +502,7 @@ def track_camera(
ptz_metrics=ptz_metrics,
)
object_detector = RemoteObjectDetector(
name, labelmap, detection_queue, result_connection, model_config, stop_event
name, labelmap, detection_queue, model_config, stop_event
)
object_tracker = NorfairTracker(config, ptz_metrics)