diff --git a/frigate/app.py b/frigate/app.py index d30b6a945..0054ec5ed 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -17,8 +17,8 @@ from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.dispatcher import Communicator, Dispatcher +from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient -from frigate.comms.stream_metadata import StreamMetadataCommunicator from frigate.comms.ws import WebSocketClient from frigate.config import FrigateConfig from frigate.const import ( @@ -206,8 +206,8 @@ class FrigateApp: # Queue for timeline events self.timeline_queue: Queue = mp.Queue() - # Queue for streams metadata - self.stream_metadata_queue: Queue = mp.Queue() + # Queue for inter process communication + self.inter_process_queue: Queue = mp.Queue() def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: @@ -297,9 +297,9 @@ class FrigateApp: self.config, self.event_queue ) - def init_stream_metadata_communicator(self) -> None: - self.stream_metadata_communicator = StreamMetadataCommunicator( - self.stream_metadata_queue + def init_inter_process_communicator(self) -> None: + self.inter_process_communicator = InterProcessCommunicator( + self.inter_process_queue ) def init_web_server(self) -> None: @@ -312,7 +312,6 @@ class FrigateApp: self.onvif_controller, self.external_event_processor, self.plus_api, - self.dispatcher, ) def init_onvif(self) -> None: @@ -325,7 +324,7 @@ class FrigateApp: comms.append(MqttClient(self.config)) comms.append(WebSocketClient(self.config)) - comms.append(self.stream_metadata_communicator) + comms.append(self.inter_process_communicator) self.dispatcher = Dispatcher( self.config, @@ -448,7 +447,7 @@ class FrigateApp: args=( self.config, self.feature_metrics, - self.stream_metadata_communicator, + self.inter_process_communicator, ), ) audio_process.daemon = True @@ -541,7 +540,7 @@ class FrigateApp: self.init_recording_manager() self.init_go2rtc() self.bind_database() - self.init_stream_metadata_communicator() + self.init_inter_process_communicator() self.init_dispatcher() except Exception as e: print(e) @@ -611,7 +610,7 @@ class FrigateApp: self.detected_frames_queue, self.recordings_info_queue, self.log_queue, - self.stream_metadata_queue, + self.inter_process_queue, ]: while not queue.empty(): queue.get_nowait() diff --git a/frigate/comms/stream_metadata.py b/frigate/comms/inter_process.py similarity index 72% rename from frigate/comms/stream_metadata.py rename to frigate/comms/inter_process.py index e71b902a6..a81a6b884 100644 --- a/frigate/comms/stream_metadata.py +++ b/frigate/comms/inter_process.py @@ -9,14 +9,11 @@ from faster_fifo import Queue from frigate.comms.dispatcher import Communicator -class StreamMetadataCommunicator(Communicator): +class InterProcessCommunicator(Communicator): def __init__(self, queue: Queue) -> None: self.queue = queue self.stop_event: MpEvent = mp.Event() - def _get_metadata_topic(self, camera: str, metric: str) -> str: - return f"{camera}/metadata/{metric}" - def publish(self, topic: str, payload: str, retain: bool) -> None: pass @@ -29,14 +26,13 @@ class StreamMetadataCommunicator(Communicator): while not self.stop_event.is_set(): try: ( - camera, - payload, + topic, + value, ) = self.queue.get(True, 1) except queue.Empty: continue - for field, value in payload.items(): - self._dispatcher(self._get_metadata_topic(camera, field), value) + self._dispatcher(topic, value) def stop(self) -> None: self.stop_event.set() diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 6d17f10fb..7055537f4 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -13,7 +13,7 @@ import numpy as np import requests from setproctitle import setproctitle -from frigate.comms.stream_metadata import StreamMetadataCommunicator +from frigate.comms.inter_process import InterProcessCommunicator from frigate.config import CameraConfig, FrigateConfig from frigate.const import ( AUDIO_DURATION, @@ -51,7 +51,7 @@ def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> lis def listen_to_audio( config: FrigateConfig, process_info: dict[str, FeatureMetricsTypes], - stream_metadata_communicator: StreamMetadataCommunicator, + inter_process_communicator: InterProcessCommunicator, ) -> None: stop_event = mp.Event() audio_threads: list[threading.Thread] = [] @@ -76,7 +76,7 @@ def listen_to_audio( for camera in config.cameras.values(): if camera.enabled and camera.audio.enabled_in_config: audio = AudioEventMaintainer( - camera, process_info, stop_event, stream_metadata_communicator + camera, process_info, stop_event, inter_process_communicator ) audio_threads.append(audio) audio.start() @@ -147,13 +147,13 @@ class AudioEventMaintainer(threading.Thread): camera: CameraConfig, feature_metrics: dict[str, FeatureMetricsTypes], stop_event: mp.Event, - stream_metadata_communicator: StreamMetadataCommunicator, + inter_process_communicator: InterProcessCommunicator, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" self.config = camera self.feature_metrics = feature_metrics - self.stream_metadata_communicator = stream_metadata_communicator + self.inter_process_communicator = inter_process_communicator self.detections: dict[dict[str, any]] = feature_metrics self.stop_event = stop_event self.detector = AudioTfl(stop_event) @@ -178,16 +178,7 @@ class AudioEventMaintainer(threading.Thread): waveform = audio_as_float / AUDIO_MAX_BIT_RANGE model_detections = self.detector.detect(waveform) - # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude - # Note: np.float32 isn't serializable, we must use np.float64 to publish the message - rms = np.sqrt(np.mean(np.absolute(audio_as_float**2))) - - # Transform RMS to dBFS (decibels relative to full scale) - dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE) - - self.stream_metadata_communicator.queue.put( - (self.config.name, {"dBFS": float(dBFS), "rms": float(rms)}) - ) + self.calculate_audio_levels(audio_as_float) for label, score, _ in model_detections: if label not in self.config.audio.listen: @@ -197,6 +188,21 @@ class AudioEventMaintainer(threading.Thread): self.expire_detections() + def calculate_audio_levels(self, audio_as_float: np.float32) -> None: + # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude + # Note: np.float32 isn't serializable, we must use np.float64 to publish the message + rms = np.sqrt(np.mean(np.absolute(audio_as_float**2))) + + # Transform RMS to dBFS (decibels relative to full scale) + dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE) + + self.inter_process_communicator.queue.put( + (f"{self.config.name}/metadata/dBFS", float(dBFS)) + ) + self.inter_process_communicator.queue.put( + (f"{self.config.name}/metadata/rms", float(rms)) + ) + def handle_detection(self, label: str, score: float) -> None: if self.detections.get(label): self.detections[label][