mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-05 18:55:23 +03:00
Implement metadata communicator
This commit is contained in:
parent
38782fe718
commit
afa675c19b
@ -18,6 +18,7 @@ from playhouse.sqliteq import SqliteQueueDatabase
|
||||
|
||||
from frigate.comms.dispatcher import Communicator, Dispatcher
|
||||
from frigate.comms.mqtt import MqttClient
|
||||
from frigate.comms.stream_metadata import StreamMetadataCommunicator
|
||||
from frigate.comms.ws import WebSocketClient
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import (
|
||||
@ -205,6 +206,9 @@ class FrigateApp:
|
||||
# Queue for timeline events
|
||||
self.timeline_queue: Queue = mp.Queue()
|
||||
|
||||
# Queue for streams metadata
|
||||
self.stream_metadata_queue: Queue = mp.Queue()
|
||||
|
||||
def init_database(self) -> None:
|
||||
def vacuum_db(db: SqliteExtDatabase) -> None:
|
||||
db.execute_sql("VACUUM;")
|
||||
@ -293,6 +297,11 @@ class FrigateApp:
|
||||
self.config, self.event_queue
|
||||
)
|
||||
|
||||
def init_stream_metadata_communicator(self) -> None:
|
||||
self.stream_metadata_communicator = StreamMetadataCommunicator(
|
||||
self.stream_metadata_queue
|
||||
)
|
||||
|
||||
def init_web_server(self) -> None:
|
||||
self.flask_app = create_app(
|
||||
self.config,
|
||||
@ -316,6 +325,8 @@ class FrigateApp:
|
||||
comms.append(MqttClient(self.config))
|
||||
|
||||
comms.append(WebSocketClient(self.config))
|
||||
comms.append(self.stream_metadata_communicator)
|
||||
|
||||
self.dispatcher = Dispatcher(
|
||||
self.config,
|
||||
self.onvif_controller,
|
||||
@ -434,7 +445,11 @@ class FrigateApp:
|
||||
audio_process = mp.Process(
|
||||
target=listen_to_audio,
|
||||
name="audio_capture",
|
||||
args=(self.config, self.feature_metrics),
|
||||
args=(
|
||||
self.config,
|
||||
self.feature_metrics,
|
||||
self.stream_metadata_communicator,
|
||||
),
|
||||
)
|
||||
audio_process.daemon = True
|
||||
audio_process.start()
|
||||
@ -526,6 +541,7 @@ class FrigateApp:
|
||||
self.init_recording_manager()
|
||||
self.init_go2rtc()
|
||||
self.bind_database()
|
||||
self.init_stream_metadata_communicator()
|
||||
self.init_dispatcher()
|
||||
except Exception as e:
|
||||
print(e)
|
||||
@ -595,6 +611,7 @@ class FrigateApp:
|
||||
self.detected_frames_queue,
|
||||
self.recordings_info_queue,
|
||||
self.log_queue,
|
||||
self.stream_metadata_queue,
|
||||
]:
|
||||
while not queue.empty():
|
||||
queue.get_nowait()
|
||||
|
||||
@ -83,6 +83,9 @@ class Dispatcher:
|
||||
return
|
||||
elif topic == "restart":
|
||||
restart_frigate()
|
||||
elif "metadata" in topic:
|
||||
# example /cam_name/metadata/dbfs payload=-55.23
|
||||
self.publish(topic, payload, retain=True)
|
||||
|
||||
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
||||
"""Handle publishing to communicators."""
|
||||
|
||||
43
frigate/comms/stream_metadata.py
Normal file
43
frigate/comms/stream_metadata.py
Normal file
@ -0,0 +1,43 @@
|
||||
import multiprocessing as mp
|
||||
import queue
|
||||
import threading
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
from typing import Callable
|
||||
|
||||
from faster_fifo import Queue
|
||||
|
||||
from frigate.comms.dispatcher import Communicator
|
||||
|
||||
|
||||
class StreamMetadataCommunicator(Communicator):
|
||||
def __init__(self, queue: Queue) -> None:
|
||||
self.queue = queue
|
||||
self.stop_event: MpEvent = mp.Event()
|
||||
|
||||
def _get_metadata_topic(self, camera: str, metric: str) -> str:
|
||||
return f"{camera}/metadata/{metric}"
|
||||
|
||||
def publish(self, topic: str, payload: str, retain: bool) -> None:
|
||||
pass
|
||||
|
||||
def subscribe(self, receiver: Callable) -> None:
|
||||
self._dispatcher = receiver
|
||||
self.reader_thread = threading.Thread(target=self.read)
|
||||
self.reader_thread.start()
|
||||
|
||||
def read(self) -> None:
|
||||
while not self.stop_event.is_set():
|
||||
try:
|
||||
(
|
||||
camera,
|
||||
payload,
|
||||
) = self.queue.get(True, 1)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
for field, value in payload.items():
|
||||
self._dispatcher(self._get_metadata_topic(camera, field), value)
|
||||
|
||||
def stop(self) -> None:
|
||||
self.stop_event.set()
|
||||
self.reader_thread.join()
|
||||
@ -13,6 +13,7 @@ import numpy as np
|
||||
import requests
|
||||
from setproctitle import setproctitle
|
||||
|
||||
from frigate.comms.stream_metadata import StreamMetadataCommunicator
|
||||
from frigate.config import CameraConfig, FrigateConfig
|
||||
from frigate.const import (
|
||||
AUDIO_DURATION,
|
||||
@ -50,6 +51,7 @@ def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> lis
|
||||
def listen_to_audio(
|
||||
config: FrigateConfig,
|
||||
process_info: dict[str, FeatureMetricsTypes],
|
||||
stream_metadata_communicator: StreamMetadataCommunicator,
|
||||
) -> None:
|
||||
stop_event = mp.Event()
|
||||
audio_threads: list[threading.Thread] = []
|
||||
@ -73,7 +75,9 @@ def listen_to_audio(
|
||||
|
||||
for camera in config.cameras.values():
|
||||
if camera.enabled and camera.audio.enabled_in_config:
|
||||
audio = AudioEventMaintainer(camera, process_info, stop_event)
|
||||
audio = AudioEventMaintainer(
|
||||
camera, process_info, stop_event, stream_metadata_communicator
|
||||
)
|
||||
audio_threads.append(audio)
|
||||
audio.start()
|
||||
|
||||
@ -143,11 +147,13 @@ class AudioEventMaintainer(threading.Thread):
|
||||
camera: CameraConfig,
|
||||
feature_metrics: dict[str, FeatureMetricsTypes],
|
||||
stop_event: mp.Event,
|
||||
stream_metadata_communicator: StreamMetadataCommunicator,
|
||||
) -> None:
|
||||
threading.Thread.__init__(self)
|
||||
self.name = f"{camera.name}_audio_event_processor"
|
||||
self.config = camera
|
||||
self.feature_metrics = feature_metrics
|
||||
self.stream_metadata_communicator = stream_metadata_communicator
|
||||
self.detections: dict[dict[str, any]] = feature_metrics
|
||||
self.stop_event = stop_event
|
||||
self.detector = AudioTfl(stop_event)
|
||||
@ -174,14 +180,13 @@ class AudioEventMaintainer(threading.Thread):
|
||||
|
||||
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
|
||||
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
|
||||
rms = np.sqrt(np.mean(np.absolute(audio_as_float**2))).astype(np.float64)
|
||||
rms = np.sqrt(np.mean(np.absolute(audio_as_float**2)))
|
||||
|
||||
# Transform RMS to dBFS (decibels relative to full scale)
|
||||
dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE)
|
||||
|
||||
requests.post(
|
||||
f"http://127.0.0.1:5000/api/{self.config.name}/metadata",
|
||||
json={"dBFS": dBFS, "rms": rms},
|
||||
self.stream_metadata_communicator.queue.put(
|
||||
(self.config.name, {"dBFS": float(dBFS), "rms": float(rms)})
|
||||
)
|
||||
|
||||
for label, score, _ in model_detections:
|
||||
|
||||
@ -1426,38 +1426,6 @@ def recording_clip(camera_name, start_ts, end_ts):
|
||||
return response
|
||||
|
||||
|
||||
@bp.route("/<camera_name>/metadata", methods=["POST"])
|
||||
def create_metadata_message(camera_name):
|
||||
if not camera_name or not current_app.frigate_config.cameras.get(camera_name):
|
||||
return jsonify(
|
||||
{"success": False, "message": f"{camera_name} is not a valid camera."}, 404
|
||||
)
|
||||
|
||||
request_json = request.get_json(silent=True)
|
||||
if request_json == {}:
|
||||
return jsonify(
|
||||
{"success": False, "message": "Metadata json cannot be empty."}, 404
|
||||
)
|
||||
|
||||
try:
|
||||
current_app.dispatcher.publish(
|
||||
"metadata", json.dumps(request_json), retain=False
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"The error is {e}")
|
||||
return jsonify(
|
||||
{"success": False, "message": f"An unknown error occurred: {e}"}, 404
|
||||
)
|
||||
|
||||
return jsonify(
|
||||
{
|
||||
"success": True,
|
||||
"message": "Successfully published metadata message.",
|
||||
},
|
||||
200,
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/vod/<camera_name>/start/<int:start_ts>/end/<int:end_ts>")
|
||||
@bp.route("/vod/<camera_name>/start/<float:start_ts>/end/<float:end_ts>")
|
||||
def vod_ts(camera_name, start_ts, end_ts):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user