Use pyobj instead of json and Need to use separate requestors for each audio listener

This commit is contained in:
Nick Mowen 2024-01-11 16:45:08 -07:00 committed by Nicolas Mowen
parent 8cf021630e
commit c1bd6bdc51
2 changed files with 20 additions and 23 deletions

View File

@ -32,18 +32,19 @@ class InterProcessCommunicator(Communicator):
self.reader_thread.start() self.reader_thread.start()
def read(self) -> None: def read(self) -> None:
while not self.stop_event.wait(1): while not self.stop_event.wait(0.5):
try: while True: # load all messages that are queued
(topic, value) = self.socket.recv_json(flags=zmq.NOBLOCK) try:
except zmq.ZMQError: (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
continue
response = self._dispatcher(topic, value) response = self._dispatcher(topic, value)
if response is not None: if response is not None:
self.socket.send_json(response) self.socket.send_pyobj(response)
else: else:
self.socket.send_json([]) self.socket.send_pyobj([])
except zmq.ZMQError:
break
def stop(self) -> None: def stop(self) -> None:
self.stop_event.set() self.stop_event.set()
@ -62,8 +63,8 @@ class InterProcessRequestor:
def send_data(self, topic: str, data: any) -> any: def send_data(self, topic: str, data: any) -> any:
"""Sends data and then waits for reply.""" """Sends data and then waits for reply."""
self.socket.send_json((topic, data)) self.socket.send_pyobj((topic, data))
return self.socket.recv_json() return self.socket.recv_pyobj()
def stop(self) -> None: def stop(self) -> None:
self.socket.close() self.socket.close()

View File

@ -76,18 +76,10 @@ def listen_to_audio(
stop_event = mp.Event() stop_event = mp.Event()
audio_threads: list[threading.Thread] = [] audio_threads: list[threading.Thread] = []
# create communication for finished previews
INTER_PROCESS_COMM_PORT = (
os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
)
requestor = InterProcessRequestor(INTER_PROCESS_COMM_PORT)
def exit_process() -> None: def exit_process() -> None:
for thread in audio_threads: for thread in audio_threads:
thread.join() thread.join()
requestor.stop()
logger.info("Exiting audio detector...") logger.info("Exiting audio detector...")
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
@ -109,7 +101,6 @@ def listen_to_audio(
camera_metrics, camera_metrics,
process_info, process_info,
stop_event, stop_event,
requestor,
) )
audio_threads.append(audio) audio_threads.append(audio)
audio.start() audio.start()
@ -183,7 +174,6 @@ class AudioEventMaintainer(threading.Thread):
camera_metrics: dict[str, CameraMetricsTypes], camera_metrics: dict[str, CameraMetricsTypes],
feature_metrics: dict[str, FeatureMetricsTypes], feature_metrics: dict[str, FeatureMetricsTypes],
stop_event: mp.Event, stop_event: mp.Event,
requestor: InterProcessRequestor,
) -> None: ) -> None:
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = f"{camera.name}_audio_event_processor" self.name = f"{camera.name}_audio_event_processor"
@ -191,7 +181,6 @@ class AudioEventMaintainer(threading.Thread):
self.recordings_info_queue = recordings_info_queue self.recordings_info_queue = recordings_info_queue
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.feature_metrics = feature_metrics self.feature_metrics = feature_metrics
self.requestor = requestor
self.detections: dict[dict[str, any]] = {} self.detections: dict[dict[str, any]] = {}
self.stop_event = stop_event self.stop_event = stop_event
self.detector = AudioTfl(stop_event, self.config.audio.num_threads) self.detector = AudioTfl(stop_event, self.config.audio.num_threads)
@ -202,6 +191,12 @@ class AudioEventMaintainer(threading.Thread):
self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio") self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio")
self.audio_listener = None self.audio_listener = None
# create communication for finished previews
INTER_PROCESS_COMM_PORT = (
os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
)
self.requestor = InterProcessRequestor(INTER_PROCESS_COMM_PORT)
def detect_audio(self, audio) -> None: def detect_audio(self, audio) -> None:
if not self.feature_metrics[self.config.name]["audio_enabled"].value: if not self.feature_metrics[self.config.name]["audio_enabled"].value:
return return
@ -353,3 +348,4 @@ class AudioEventMaintainer(threading.Thread):
stop_ffmpeg(self.audio_listener, self.logger) stop_ffmpeg(self.audio_listener, self.logger)
self.logpipe.close() self.logpipe.close()
self.requestor.stop()