From 71184c0ef42dfe4ad530c095af868fda472f63da Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Sun, 18 Feb 2024 10:06:43 -0700 Subject: [PATCH] Use select for time sensitive polls --- frigate/comms/detections_updater.py | 14 +++++++++----- frigate/comms/inter_process.py | 7 ++++++- frigate/object_processing.py | 4 +--- frigate/output/output.py | 6 ++---- frigate/record/maintainer.py | 4 +++- 5 files changed, 21 insertions(+), 14 deletions(-) diff --git a/frigate/comms/detections_updater.py b/frigate/comms/detections_updater.py index d1624dc93..6a6de4c7b 100644 --- a/frigate/comms/detections_updater.py +++ b/frigate/comms/detections_updater.py @@ -22,7 +22,6 @@ class DetectionTypeEnum(str, Enum): class DetectionProxyRunner(threading.Thread): - def __init__(self, context: zmq.Context[zmq.Socket]) -> None: threading.Thread.__init__(self) self.name = "detection_proxy" @@ -105,13 +104,18 @@ class DetectionSubscriber: self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value) self.socket.connect(f"tcp://127.0.0.1:{port}") - def get_data(self) -> Optional[tuple[str, any]]: + def get_data(self, timeout: float = None) -> Optional[tuple[str, any]]: """Returns detections or None if no update.""" try: - topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] - return (topic, self.socket.recv_pyobj()) + has_update, _, _ = zmq.select([self.socket], [], [], timeout) + + if has_update: + topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] + return (topic, self.socket.recv_pyobj()) except zmq.ZMQError: - return (None, None) + pass + + return (None, None) def stop(self) -> None: self.socket.close() diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py index bcf99738f..6a8929194 100644 --- a/frigate/comms/inter_process.py +++ b/frigate/comms/inter_process.py @@ -32,8 +32,13 @@ class InterProcessCommunicator(Communicator): self.reader_thread.start() def read(self) -> None: - while not self.stop_event.wait(0.1): + while not self.stop_event.is_set(): while True: # load all messages that are queued + has_message, _, _ = zmq.select([self.socket], [], [], 1) + + if not has_message: + break + try: (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 6146f54a3..68cac4ec0 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -521,9 +521,7 @@ class CameraState: ): max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[ self.name - ][ - "max_target_box" - ] + ]["max_target_box"] side_length = max_target_box * ( max( self.camera_config.detect.width, diff --git a/frigate/output/output.py b/frigate/output/output.py index 85ea586a7..e717463b1 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -4,7 +4,6 @@ import logging import multiprocessing as mp import signal import threading -import time from typing import Optional from wsgiref.simple_server import make_server @@ -75,10 +74,9 @@ def output_frames( websocket_thread.start() while not stop_event.is_set(): - (topic, data) = detection_subscriber.get_data() + (topic, data) = detection_subscriber.get_data(timeout=10) if not topic: - time.sleep(0.1) continue ( @@ -128,7 +126,7 @@ def output_frames( previous_frames[camera] = frame_time while True: - (topic, data) = detection_subscriber.get_data() + (topic, data) = detection_subscriber.get_data(timeout=0) if not topic: break diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 52f357894..2715dec89 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -450,7 +450,9 @@ class RecordingMaintainer(threading.Thread): stale_frame_count_threshold = 10 # empty the object recordings info queue while True: - (topic, data) = self.detection_subscriber.get_data() + (topic, data) = self.detection_subscriber.get_data( + timeout=QUEUE_READ_TIMEOUT + ) if not topic: break