diff --git a/frigate/app.py b/frigate/app.py index 6c36ea98e..d30b6a945 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -18,6 +18,7 @@ from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.mqtt import MqttClient +from frigate.comms.stream_metadata import StreamMetadataCommunicator from frigate.comms.ws import WebSocketClient from frigate.config import FrigateConfig from frigate.const import ( @@ -205,6 +206,9 @@ class FrigateApp: # Queue for timeline events self.timeline_queue: Queue = mp.Queue() + # Queue for streams metadata + self.stream_metadata_queue: Queue = mp.Queue() + def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: db.execute_sql("VACUUM;") @@ -293,6 +297,11 @@ class FrigateApp: self.config, self.event_queue ) + def init_stream_metadata_communicator(self) -> None: + self.stream_metadata_communicator = StreamMetadataCommunicator( + self.stream_metadata_queue + ) + def init_web_server(self) -> None: self.flask_app = create_app( self.config, @@ -316,6 +325,8 @@ class FrigateApp: comms.append(MqttClient(self.config)) comms.append(WebSocketClient(self.config)) + comms.append(self.stream_metadata_communicator) + self.dispatcher = Dispatcher( self.config, self.onvif_controller, @@ -434,7 +445,11 @@ class FrigateApp: audio_process = mp.Process( target=listen_to_audio, name="audio_capture", - args=(self.config, self.feature_metrics), + args=( + self.config, + self.feature_metrics, + self.stream_metadata_communicator, + ), ) audio_process.daemon = True audio_process.start() @@ -526,6 +541,7 @@ class FrigateApp: self.init_recording_manager() self.init_go2rtc() self.bind_database() + self.init_stream_metadata_communicator() self.init_dispatcher() except Exception as e: print(e) @@ -595,6 +611,7 @@ class FrigateApp: self.detected_frames_queue, self.recordings_info_queue, self.log_queue, + self.stream_metadata_queue, ]: while not queue.empty(): queue.get_nowait() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 1c9105ce8..08aa0adc9 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -83,6 +83,9 @@ class Dispatcher: return elif topic == "restart": restart_frigate() + elif "metadata" in topic: + # example /cam_name/metadata/dbfs payload=-55.23 + self.publish(topic, payload, retain=True) def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Handle publishing to communicators.""" diff --git a/frigate/comms/stream_metadata.py b/frigate/comms/stream_metadata.py new file mode 100644 index 000000000..e71b902a6 --- /dev/null +++ b/frigate/comms/stream_metadata.py @@ -0,0 +1,43 @@ +import multiprocessing as mp +import queue +import threading +from multiprocessing.synchronize import Event as MpEvent +from typing import Callable + +from faster_fifo import Queue + +from frigate.comms.dispatcher import Communicator + + +class StreamMetadataCommunicator(Communicator): + def __init__(self, queue: Queue) -> None: + self.queue = queue + self.stop_event: MpEvent = mp.Event() + + def _get_metadata_topic(self, camera: str, metric: str) -> str: + return f"{camera}/metadata/{metric}" + + def publish(self, topic: str, payload: str, retain: bool) -> None: + pass + + def subscribe(self, receiver: Callable) -> None: + self._dispatcher = receiver + self.reader_thread = threading.Thread(target=self.read) + self.reader_thread.start() + + def read(self) -> None: + while not self.stop_event.is_set(): + try: + ( + camera, + payload, + ) = self.queue.get(True, 1) + except queue.Empty: + continue + + for field, value in payload.items(): + self._dispatcher(self._get_metadata_topic(camera, field), value) + + def stop(self) -> None: + self.stop_event.set() + self.reader_thread.join() diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 53ec23690..6d17f10fb 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -13,6 +13,7 @@ import numpy as np import requests from setproctitle import setproctitle +from frigate.comms.stream_metadata import StreamMetadataCommunicator from frigate.config import CameraConfig, FrigateConfig from frigate.const import ( AUDIO_DURATION, @@ -50,6 +51,7 @@ def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> lis def listen_to_audio( config: FrigateConfig, process_info: dict[str, FeatureMetricsTypes], + stream_metadata_communicator: StreamMetadataCommunicator, ) -> None: stop_event = mp.Event() audio_threads: list[threading.Thread] = [] @@ -73,7 +75,9 @@ def listen_to_audio( for camera in config.cameras.values(): if camera.enabled and camera.audio.enabled_in_config: - audio = AudioEventMaintainer(camera, process_info, stop_event) + audio = AudioEventMaintainer( + camera, process_info, stop_event, stream_metadata_communicator + ) audio_threads.append(audio) audio.start() @@ -143,11 +147,13 @@ class AudioEventMaintainer(threading.Thread): camera: CameraConfig, feature_metrics: dict[str, FeatureMetricsTypes], stop_event: mp.Event, + stream_metadata_communicator: StreamMetadataCommunicator, ) -> None: threading.Thread.__init__(self) self.name = f"{camera.name}_audio_event_processor" self.config = camera self.feature_metrics = feature_metrics + self.stream_metadata_communicator = stream_metadata_communicator self.detections: dict[dict[str, any]] = feature_metrics self.stop_event = stop_event self.detector = AudioTfl(stop_event) @@ -174,14 +180,13 @@ class AudioEventMaintainer(threading.Thread): # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude # Note: np.float32 isn't serializable, we must use np.float64 to publish the message - rms = np.sqrt(np.mean(np.absolute(audio_as_float**2))).astype(np.float64) + rms = np.sqrt(np.mean(np.absolute(audio_as_float**2))) # Transform RMS to dBFS (decibels relative to full scale) dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE) - requests.post( - f"http://127.0.0.1:5000/api/{self.config.name}/metadata", - json={"dBFS": dBFS, "rms": rms}, + self.stream_metadata_communicator.queue.put( + (self.config.name, {"dBFS": float(dBFS), "rms": float(rms)}) ) for label, score, _ in model_detections: diff --git a/frigate/http.py b/frigate/http.py index f6859a210..2ab156ae3 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -1426,38 +1426,6 @@ def recording_clip(camera_name, start_ts, end_ts): return response -@bp.route("//metadata", methods=["POST"]) -def create_metadata_message(camera_name): - if not camera_name or not current_app.frigate_config.cameras.get(camera_name): - return jsonify( - {"success": False, "message": f"{camera_name} is not a valid camera."}, 404 - ) - - request_json = request.get_json(silent=True) - if request_json == {}: - return jsonify( - {"success": False, "message": "Metadata json cannot be empty."}, 404 - ) - - try: - current_app.dispatcher.publish( - "metadata", json.dumps(request_json), retain=False - ) - except Exception as e: - logger.error(f"The error is {e}") - return jsonify( - {"success": False, "message": f"An unknown error occurred: {e}"}, 404 - ) - - return jsonify( - { - "success": True, - "message": "Successfully published metadata message.", - }, - 200, - ) - - @bp.route("/vod//start//end/") @bp.route("/vod//start//end/") def vod_ts(camera_name, start_ts, end_ts):