Use select for time sensitive polls

This commit is contained in:
Nicolas Mowen 2024-02-18 10:06:43 -07:00
parent bbef515ebc
commit 71184c0ef4
5 changed files with 21 additions and 14 deletions

View File

@ -22,7 +22,6 @@ class DetectionTypeEnum(str, Enum):
class DetectionProxyRunner(threading.Thread): class DetectionProxyRunner(threading.Thread):
def __init__(self, context: zmq.Context[zmq.Socket]) -> None: def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "detection_proxy" self.name = "detection_proxy"
@ -105,13 +104,18 @@ class DetectionSubscriber:
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value) self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value)
self.socket.connect(f"tcp://127.0.0.1:{port}") self.socket.connect(f"tcp://127.0.0.1:{port}")
def get_data(self) -> Optional[tuple[str, any]]: def get_data(self, timeout: float = None) -> Optional[tuple[str, any]]:
"""Returns detections or None if no update.""" """Returns detections or None if no update."""
try: try:
topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)] has_update, _, _ = zmq.select([self.socket], [], [], timeout)
return (topic, self.socket.recv_pyobj())
if has_update:
topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)]
return (topic, self.socket.recv_pyobj())
except zmq.ZMQError: except zmq.ZMQError:
return (None, None) pass
return (None, None)
def stop(self) -> None: def stop(self) -> None:
self.socket.close() self.socket.close()

View File

@ -32,8 +32,13 @@ class InterProcessCommunicator(Communicator):
self.reader_thread.start() self.reader_thread.start()
def read(self) -> None: def read(self) -> None:
while not self.stop_event.wait(0.1): while not self.stop_event.is_set():
while True: # load all messages that are queued while True: # load all messages that are queued
has_message, _, _ = zmq.select([self.socket], [], [], 1)
if not has_message:
break
try: try:
(topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK)

View File

@ -521,9 +521,7 @@ class CameraState:
): ):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[ max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name self.name
][ ]["max_target_box"]
"max_target_box"
]
side_length = max_target_box * ( side_length = max_target_box * (
max( max(
self.camera_config.detect.width, self.camera_config.detect.width,

View File

@ -4,7 +4,6 @@ import logging
import multiprocessing as mp import multiprocessing as mp
import signal import signal
import threading import threading
import time
from typing import Optional from typing import Optional
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
@ -75,10 +74,9 @@ def output_frames(
websocket_thread.start() websocket_thread.start()
while not stop_event.is_set(): while not stop_event.is_set():
(topic, data) = detection_subscriber.get_data() (topic, data) = detection_subscriber.get_data(timeout=10)
if not topic: if not topic:
time.sleep(0.1)
continue continue
( (
@ -128,7 +126,7 @@ def output_frames(
previous_frames[camera] = frame_time previous_frames[camera] = frame_time
while True: while True:
(topic, data) = detection_subscriber.get_data() (topic, data) = detection_subscriber.get_data(timeout=0)
if not topic: if not topic:
break break

View File

@ -450,7 +450,9 @@ class RecordingMaintainer(threading.Thread):
stale_frame_count_threshold = 10 stale_frame_count_threshold = 10
# empty the object recordings info queue # empty the object recordings info queue
while True: while True:
(topic, data) = self.detection_subscriber.get_data() (topic, data) = self.detection_subscriber.get_data(
timeout=QUEUE_READ_TIMEOUT
)
if not topic: if not topic:
break break