diff --git a/frigate/comms/inter_process.py b/frigate/comms/inter_process.py index 55e0d73c4..94f328154 100644 --- a/frigate/comms/inter_process.py +++ b/frigate/comms/inter_process.py @@ -32,18 +32,19 @@ class InterProcessCommunicator(Communicator): self.reader_thread.start() def read(self) -> None: - while not self.stop_event.wait(1): - try: - (topic, value) = self.socket.recv_json(flags=zmq.NOBLOCK) - except zmq.ZMQError: - continue + while not self.stop_event.wait(0.5): + while True: # load all messages that are queued + try: + (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) - response = self._dispatcher(topic, value) + response = self._dispatcher(topic, value) - if response is not None: - self.socket.send_json(response) - else: - self.socket.send_json([]) + if response is not None: + self.socket.send_pyobj(response) + else: + self.socket.send_pyobj([]) + except zmq.ZMQError: + break def stop(self) -> None: self.stop_event.set() @@ -62,8 +63,8 @@ class InterProcessRequestor: def send_data(self, topic: str, data: any) -> any: """Sends data and then waits for reply.""" - self.socket.send_json((topic, data)) - return self.socket.recv_json() + self.socket.send_pyobj((topic, data)) + return self.socket.recv_pyobj() def stop(self) -> None: self.socket.close() diff --git a/frigate/events/audio.py b/frigate/events/audio.py index fdc3621af..2d8faf9a4 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -76,18 +76,10 @@ def listen_to_audio( stop_event = mp.Event() audio_threads: list[threading.Thread] = [] - # create communication for finished previews - INTER_PROCESS_COMM_PORT = ( - os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM - ) - requestor = InterProcessRequestor(INTER_PROCESS_COMM_PORT) - def exit_process() -> None: for thread in audio_threads: thread.join() - requestor.stop() - logger.info("Exiting audio detector...") def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: @@ -109,7 +101,6 @@ def listen_to_audio( camera_metrics, process_info, stop_event, - requestor, ) audio_threads.append(audio) audio.start() @@ -183,7 +174,6 @@ class AudioEventMaintainer(threading.Thread): camera_metrics: dict[str, CameraMetricsTypes], feature_metrics: dict[str, FeatureMetricsTypes], stop_event: mp.Event, - requestor: InterProcessRequestor, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" @@ -191,7 +181,6 @@ class AudioEventMaintainer(threading.Thread): self.recordings_info_queue = recordings_info_queue self.camera_metrics = camera_metrics self.feature_metrics = feature_metrics - self.requestor = requestor self.detections: dict[dict[str, any]] = {} self.stop_event = stop_event self.detector = AudioTfl(stop_event, self.config.audio.num_threads) @@ -202,6 +191,12 @@ class AudioEventMaintainer(threading.Thread): self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.audio") self.audio_listener = None + # create communication for finished previews + INTER_PROCESS_COMM_PORT = ( + os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM + ) + self.requestor = InterProcessRequestor(INTER_PROCESS_COMM_PORT) + def detect_audio(self, audio) -> None: if not self.feature_metrics[self.config.name]["audio_enabled"].value: return @@ -353,3 +348,4 @@ class AudioEventMaintainer(threading.Thread): stop_ffmpeg(self.audio_listener, self.logger) self.logpipe.close() + self.requestor.stop()