Address feedback

This commit is contained in:
JP Verdejo 2023-07-02 17:04:54 -05:00
parent 133a565449
commit 0d3d78a695
3 changed files with 35 additions and 34 deletions

View File

@ -17,8 +17,8 @@ from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.dispatcher import Communicator, Dispatcher
from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient from frigate.comms.mqtt import MqttClient
from frigate.comms.stream_metadata import StreamMetadataCommunicator
from frigate.comms.ws import WebSocketClient from frigate.comms.ws import WebSocketClient
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import ( from frigate.const import (
@ -206,8 +206,8 @@ class FrigateApp:
# Queue for timeline events # Queue for timeline events
self.timeline_queue: Queue = mp.Queue() self.timeline_queue: Queue = mp.Queue()
# Queue for streams metadata # Queue for inter process communication
self.stream_metadata_queue: Queue = mp.Queue() self.inter_process_queue: Queue = mp.Queue()
def init_database(self) -> None: def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None: def vacuum_db(db: SqliteExtDatabase) -> None:
@ -297,9 +297,9 @@ class FrigateApp:
self.config, self.event_queue self.config, self.event_queue
) )
def init_stream_metadata_communicator(self) -> None: def init_inter_process_communicator(self) -> None:
self.stream_metadata_communicator = StreamMetadataCommunicator( self.inter_process_communicator = InterProcessCommunicator(
self.stream_metadata_queue self.inter_process_queue
) )
def init_web_server(self) -> None: def init_web_server(self) -> None:
@ -312,7 +312,6 @@ class FrigateApp:
self.onvif_controller, self.onvif_controller,
self.external_event_processor, self.external_event_processor,
self.plus_api, self.plus_api,
self.dispatcher,
) )
def init_onvif(self) -> None: def init_onvif(self) -> None:
@ -325,7 +324,7 @@ class FrigateApp:
comms.append(MqttClient(self.config)) comms.append(MqttClient(self.config))
comms.append(WebSocketClient(self.config)) comms.append(WebSocketClient(self.config))
comms.append(self.stream_metadata_communicator) comms.append(self.inter_process_communicator)
self.dispatcher = Dispatcher( self.dispatcher = Dispatcher(
self.config, self.config,
@ -448,7 +447,7 @@ class FrigateApp:
args=( args=(
self.config, self.config,
self.feature_metrics, self.feature_metrics,
self.stream_metadata_communicator, self.inter_process_communicator,
), ),
) )
audio_process.daemon = True audio_process.daemon = True
@ -541,7 +540,7 @@ class FrigateApp:
self.init_recording_manager() self.init_recording_manager()
self.init_go2rtc() self.init_go2rtc()
self.bind_database() self.bind_database()
self.init_stream_metadata_communicator() self.init_inter_process_communicator()
self.init_dispatcher() self.init_dispatcher()
except Exception as e: except Exception as e:
print(e) print(e)
@ -611,7 +610,7 @@ class FrigateApp:
self.detected_frames_queue, self.detected_frames_queue,
self.recordings_info_queue, self.recordings_info_queue,
self.log_queue, self.log_queue,
self.stream_metadata_queue, self.inter_process_queue,
]: ]:
while not queue.empty(): while not queue.empty():
queue.get_nowait() queue.get_nowait()

View File

@ -9,14 +9,11 @@ from faster_fifo import Queue
from frigate.comms.dispatcher import Communicator from frigate.comms.dispatcher import Communicator
class StreamMetadataCommunicator(Communicator): class InterProcessCommunicator(Communicator):
def __init__(self, queue: Queue) -> None: def __init__(self, queue: Queue) -> None:
self.queue = queue self.queue = queue
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
def _get_metadata_topic(self, camera: str, metric: str) -> str:
return f"{camera}/metadata/{metric}"
def publish(self, topic: str, payload: str, retain: bool) -> None: def publish(self, topic: str, payload: str, retain: bool) -> None:
pass pass
@ -29,14 +26,13 @@ class StreamMetadataCommunicator(Communicator):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
( (
camera, topic,
payload, value,
) = self.queue.get(True, 1) ) = self.queue.get(True, 1)
except queue.Empty: except queue.Empty:
continue continue
for field, value in payload.items(): self._dispatcher(topic, value)
self._dispatcher(self._get_metadata_topic(camera, field), value)
def stop(self) -> None: def stop(self) -> None:
self.stop_event.set() self.stop_event.set()

View File

@ -13,7 +13,7 @@ import numpy as np
import requests import requests
from setproctitle import setproctitle from setproctitle import setproctitle
from frigate.comms.stream_metadata import StreamMetadataCommunicator from frigate.comms.inter_process import InterProcessCommunicator
from frigate.config import CameraConfig, FrigateConfig from frigate.config import CameraConfig, FrigateConfig
from frigate.const import ( from frigate.const import (
AUDIO_DURATION, AUDIO_DURATION,
@ -51,7 +51,7 @@ def get_ffmpeg_command(input_args: list[str], input_path: str, pipe: str) -> lis
def listen_to_audio( def listen_to_audio(
config: FrigateConfig, config: FrigateConfig,
process_info: dict[str, FeatureMetricsTypes], process_info: dict[str, FeatureMetricsTypes],
stream_metadata_communicator: StreamMetadataCommunicator, inter_process_communicator: InterProcessCommunicator,
) -> None: ) -> None:
stop_event = mp.Event() stop_event = mp.Event()
audio_threads: list[threading.Thread] = [] audio_threads: list[threading.Thread] = []
@ -76,7 +76,7 @@ def listen_to_audio(
for camera in config.cameras.values(): for camera in config.cameras.values():
if camera.enabled and camera.audio.enabled_in_config: if camera.enabled and camera.audio.enabled_in_config:
audio = AudioEventMaintainer( audio = AudioEventMaintainer(
camera, process_info, stop_event, stream_metadata_communicator camera, process_info, stop_event, inter_process_communicator
) )
audio_threads.append(audio) audio_threads.append(audio)
audio.start() audio.start()
@ -147,13 +147,13 @@ class AudioEventMaintainer(threading.Thread):
camera: CameraConfig, camera: CameraConfig,
feature_metrics: dict[str, FeatureMetricsTypes], feature_metrics: dict[str, FeatureMetricsTypes],
stop_event: mp.Event, stop_event: mp.Event,
stream_metadata_communicator: StreamMetadataCommunicator, inter_process_communicator: InterProcessCommunicator,
) -> None: ) -> None:
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = f"{camera.name}_audio_event_processor" self.name = f"{camera.name}_audio_event_processor"
self.config = camera self.config = camera
self.feature_metrics = feature_metrics self.feature_metrics = feature_metrics
self.stream_metadata_communicator = stream_metadata_communicator self.inter_process_communicator = inter_process_communicator
self.detections: dict[dict[str, any]] = feature_metrics self.detections: dict[dict[str, any]] = feature_metrics
self.stop_event = stop_event self.stop_event = stop_event
self.detector = AudioTfl(stop_event) self.detector = AudioTfl(stop_event)
@ -178,16 +178,7 @@ class AudioEventMaintainer(threading.Thread):
waveform = audio_as_float / AUDIO_MAX_BIT_RANGE waveform = audio_as_float / AUDIO_MAX_BIT_RANGE
model_detections = self.detector.detect(waveform) model_detections = self.detector.detect(waveform)
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude self.calculate_audio_levels(audio_as_float)
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
rms = np.sqrt(np.mean(np.absolute(audio_as_float**2)))
# Transform RMS to dBFS (decibels relative to full scale)
dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE)
self.stream_metadata_communicator.queue.put(
(self.config.name, {"dBFS": float(dBFS), "rms": float(rms)})
)
for label, score, _ in model_detections: for label, score, _ in model_detections:
if label not in self.config.audio.listen: if label not in self.config.audio.listen:
@ -197,6 +188,21 @@ class AudioEventMaintainer(threading.Thread):
self.expire_detections() self.expire_detections()
def calculate_audio_levels(self, audio_as_float: np.float32) -> None:
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
rms = np.sqrt(np.mean(np.absolute(audio_as_float**2)))
# Transform RMS to dBFS (decibels relative to full scale)
dBFS = 20 * np.log10(np.abs(rms) / AUDIO_MAX_BIT_RANGE)
self.inter_process_communicator.queue.put(
(f"{self.config.name}/metadata/dBFS", float(dBFS))
)
self.inter_process_communicator.queue.put(
(f"{self.config.name}/metadata/rms", float(rms))
)
def handle_detection(self, label: str, score: float) -> None: def handle_detection(self, label: str, score: float) -> None:
if self.detections.get(label): if self.detections.get(label):
self.detections[label][ self.detections[label][