dispatcher changes

This commit is contained in:
Josh Hawkins 2025-08-25 13:04:40 -05:00
parent 12e4f4b9cd
commit 5631d78adc
3 changed files with 32 additions and 86 deletions

View File

@ -6,7 +6,7 @@ import logging
from typing import Any, Callable, Optional, cast from typing import Any, Callable, Optional, cast
from frigate.camera import PTZMetrics from frigate.camera import PTZMetrics
from frigate.camera.activity_manager import CameraActivityManager from frigate.camera.activity_manager import AudioActivityManager, CameraActivityManager
from frigate.comms.base_communicator import Communicator from frigate.comms.base_communicator import Communicator
from frigate.comms.webpush import WebPushClient from frigate.comms.webpush import WebPushClient
from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.config import BirdseyeModeEnum, FrigateConfig
@ -17,10 +17,12 @@ from frigate.config.camera.updater import (
) )
from frigate.const import ( from frigate.const import (
CLEAR_ONGOING_REVIEW_SEGMENTS, CLEAR_ONGOING_REVIEW_SEGMENTS,
EXPIRE_AUDIO_ACTIVITY,
INSERT_MANY_RECORDINGS, INSERT_MANY_RECORDINGS,
INSERT_PREVIEW, INSERT_PREVIEW,
NOTIFICATION_TEST, NOTIFICATION_TEST,
REQUEST_REGION_GRID, REQUEST_REGION_GRID,
UPDATE_AUDIO_ACTIVITY,
UPDATE_BIRDSEYE_LAYOUT, UPDATE_BIRDSEYE_LAYOUT,
UPDATE_CAMERA_ACTIVITY, UPDATE_CAMERA_ACTIVITY,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS, UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
@ -55,6 +57,7 @@ class Dispatcher:
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
self.comms = communicators self.comms = communicators
self.camera_activity = CameraActivityManager(config, self.publish) self.camera_activity = CameraActivityManager(config, self.publish)
self.audio_activity = AudioActivityManager(config, self.publish)
self.model_state: dict[str, ModelStatusTypesEnum] = {} self.model_state: dict[str, ModelStatusTypesEnum] = {}
self.embeddings_reindex: dict[str, Any] = {} self.embeddings_reindex: dict[str, Any] = {}
self.birdseye_layout: dict[str, Any] = {} self.birdseye_layout: dict[str, Any] = {}
@ -135,6 +138,12 @@ class Dispatcher:
def handle_update_camera_activity() -> None: def handle_update_camera_activity() -> None:
self.camera_activity.update_activity(payload) self.camera_activity.update_activity(payload)
def handle_update_audio_activity() -> None:
self.audio_activity.update_activity(payload)
def handle_expire_audio_activity() -> None:
self.audio_activity.expire_all(payload)
def handle_update_event_description() -> None: def handle_update_event_description() -> None:
event: Event = Event.get(Event.id == payload["id"]) event: Event = Event.get(Event.id == payload["id"])
cast(dict, event.data)["description"] = payload["description"] cast(dict, event.data)["description"] = payload["description"]
@ -192,6 +201,7 @@ class Dispatcher:
def handle_on_connect() -> None: def handle_on_connect() -> None:
camera_status = self.camera_activity.last_camera_activity.copy() camera_status = self.camera_activity.last_camera_activity.copy()
audio_detections = self.audio_activity.current_audio_detections.copy()
cameras_with_status = camera_status.keys() cameras_with_status = camera_status.keys()
for camera in self.config.cameras.keys(): for camera in self.config.cameras.keys():
@ -234,6 +244,7 @@ class Dispatcher:
json.dumps(self.embeddings_reindex.copy()), json.dumps(self.embeddings_reindex.copy()),
) )
self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy())) self.publish("birdseye_layout", json.dumps(self.birdseye_layout.copy()))
self.publish("audio_detections", json.dumps(audio_detections[camera]))
def handle_notification_test() -> None: def handle_notification_test() -> None:
self.publish("notification_test", "Test notification") self.publish("notification_test", "Test notification")
@ -246,6 +257,8 @@ class Dispatcher:
UPSERT_REVIEW_SEGMENT: handle_upsert_review_segment, UPSERT_REVIEW_SEGMENT: handle_upsert_review_segment,
CLEAR_ONGOING_REVIEW_SEGMENTS: handle_clear_ongoing_review_segments, CLEAR_ONGOING_REVIEW_SEGMENTS: handle_clear_ongoing_review_segments,
UPDATE_CAMERA_ACTIVITY: handle_update_camera_activity, UPDATE_CAMERA_ACTIVITY: handle_update_camera_activity,
UPDATE_AUDIO_ACTIVITY: handle_update_audio_activity,
EXPIRE_AUDIO_ACTIVITY: handle_expire_audio_activity,
UPDATE_EVENT_DESCRIPTION: handle_update_event_description, UPDATE_EVENT_DESCRIPTION: handle_update_event_description,
UPDATE_REVIEW_DESCRIPTION: handle_update_review_description, UPDATE_REVIEW_DESCRIPTION: handle_update_review_description,
UPDATE_MODEL_STATE: handle_update_model_state, UPDATE_MODEL_STATE: handle_update_model_state,

View File

@ -110,6 +110,8 @@ REQUEST_REGION_GRID = "request_region_grid"
UPSERT_REVIEW_SEGMENT = "upsert_review_segment" UPSERT_REVIEW_SEGMENT = "upsert_review_segment"
CLEAR_ONGOING_REVIEW_SEGMENTS = "clear_ongoing_review_segments" CLEAR_ONGOING_REVIEW_SEGMENTS = "clear_ongoing_review_segments"
UPDATE_CAMERA_ACTIVITY = "update_camera_activity" UPDATE_CAMERA_ACTIVITY = "update_camera_activity"
UPDATE_AUDIO_ACTIVITY = "update_audio_activity"
EXPIRE_AUDIO_ACTIVITY = "expire_audio_activity"
UPDATE_EVENT_DESCRIPTION = "update_event_description" UPDATE_EVENT_DESCRIPTION = "update_event_description"
UPDATE_REVIEW_DESCRIPTION = "update_review_description" UPDATE_REVIEW_DESCRIPTION = "update_review_description"
UPDATE_MODEL_STATE = "update_model_state" UPDATE_MODEL_STATE = "update_model_state"

View File

@ -2,21 +2,15 @@
import datetime import datetime
import logging import logging
import random
import string
import threading import threading
import time import time
from multiprocessing.managers import DictProxy from multiprocessing.managers import DictProxy
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Tuple from typing import Tuple
import numpy as np import numpy as np
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig
from frigate.config.camera.updater import ( from frigate.config.camera.updater import (
@ -29,7 +23,9 @@ from frigate.const import (
AUDIO_MAX_BIT_RANGE, AUDIO_MAX_BIT_RANGE,
AUDIO_MIN_CONFIDENCE, AUDIO_MIN_CONFIDENCE,
AUDIO_SAMPLE_RATE, AUDIO_SAMPLE_RATE,
EXPIRE_AUDIO_ACTIVITY,
PROCESS_PRIORITY_HIGH, PROCESS_PRIORITY_HIGH,
UPDATE_AUDIO_ACTIVITY,
) )
from frigate.data_processing.common.audio_transcription.model import ( from frigate.data_processing.common.audio_transcription.model import (
AudioTranscriptionModelRunner, AudioTranscriptionModelRunner,
@ -159,7 +155,6 @@ class AudioEventMaintainer(threading.Thread):
self.config = config self.config = config
self.camera_config = camera self.camera_config = camera
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.detections: dict[dict[str, Any]] = {}
self.stop_event = stop_event self.stop_event = stop_event
self.detector = AudioTfl(stop_event, self.camera_config.audio.num_threads) self.detector = AudioTfl(stop_event, self.camera_config.audio.num_threads)
self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),) self.shape = (int(round(AUDIO_DURATION * AUDIO_SAMPLE_RATE)),)
@ -184,7 +179,6 @@ class AudioEventMaintainer(threading.Thread):
], ],
) )
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value)
self.event_metadata_publisher = EventMetadataPublisher()
if self.camera_config.audio_transcription.enabled_in_config: if self.camera_config.audio_transcription.enabled_in_config:
# init the transcription processor for this camera # init the transcription processor for this camera
@ -216,12 +210,13 @@ class AudioEventMaintainer(threading.Thread):
self.camera_metrics[self.camera_config.name].audio_rms.value = rms self.camera_metrics[self.camera_config.name].audio_rms.value = rms
self.camera_metrics[self.camera_config.name].audio_dBFS.value = dBFS self.camera_metrics[self.camera_config.name].audio_dBFS.value = dBFS
audio_detections: list[Tuple[str, float]] = []
# only run audio detection when volume is above min_volume # only run audio detection when volume is above min_volume
if rms >= self.camera_config.audio.min_volume: if rms >= self.camera_config.audio.min_volume:
# create waveform relative to max range and look for detections # create waveform relative to max range and look for detections
waveform = (audio / AUDIO_MAX_BIT_RANGE).astype(np.float32) waveform = (audio / AUDIO_MAX_BIT_RANGE).astype(np.float32)
model_detections = self.detector.detect(waveform) model_detections = self.detector.detect(waveform)
audio_detections = []
for label, score, _ in model_detections: for label, score, _ in model_detections:
self.logger.debug( self.logger.debug(
@ -234,8 +229,7 @@ class AudioEventMaintainer(threading.Thread):
if score > dict( if score > dict(
(self.camera_config.audio.filters or {}).get(label, {}) (self.camera_config.audio.filters or {}).get(label, {})
).get("threshold", 0.8): ).get("threshold", 0.8):
self.handle_detection(label, score) audio_detections.append((label, score))
audio_detections.append(label)
# send audio detection data # send audio detection data
self.detection_publisher.publish( self.detection_publisher.publish(
@ -243,10 +237,16 @@ class AudioEventMaintainer(threading.Thread):
self.camera_config.name, self.camera_config.name,
datetime.datetime.now().timestamp(), datetime.datetime.now().timestamp(),
dBFS, dBFS,
audio_detections, [label for label, _ in audio_detections],
) )
) )
# send audio activity update
self.requestor.send_data(
UPDATE_AUDIO_ACTIVITY,
{self.camera_config.name: {"detections": audio_detections}},
)
# run audio transcription # run audio transcription
if self.transcription_processor is not None: if self.transcription_processor is not None:
if self.camera_config.audio_transcription.live_enabled: if self.camera_config.audio_transcription.live_enabled:
@ -261,8 +261,6 @@ class AudioEventMaintainer(threading.Thread):
else: else:
self.transcription_processor.check_unload_model() self.transcription_processor.check_unload_model()
self.expire_detections()
def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]: def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]:
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message # Note: np.float32 isn't serializable, we must use np.float64 to publish the message
@ -279,75 +277,6 @@ class AudioEventMaintainer(threading.Thread):
return float(rms), float(dBFS) return float(rms), float(dBFS)
def handle_detection(self, label: str, score: float) -> None:
if self.detections.get(label):
self.detections[label]["last_detection"] = (
datetime.datetime.now().timestamp()
)
else:
now = datetime.datetime.now().timestamp()
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.requestor.send_data(f"{self.camera_config.name}/audio/{label}", "ON")
self.event_metadata_publisher.publish(
(
now,
self.camera_config.name,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
),
EventMetadataTypeEnum.manual_event_create.value,
)
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": now,
}
def expire_detections(self) -> None:
now = datetime.datetime.now().timestamp()
for detection in self.detections.values():
if not detection:
continue
if (
now - detection.get("last_detection", now)
> self.camera_config.audio.max_not_heard
):
self.requestor.send_data(
f"{self.camera_config.name}/audio/{detection['label']}", "OFF"
)
self.event_metadata_publisher.publish(
(detection["id"], detection["last_detection"]),
EventMetadataTypeEnum.manual_event_end.value,
)
self.detections[detection["label"]] = None
def expire_all_detections(self) -> None:
"""Immediately end all current detections"""
now = datetime.datetime.now().timestamp()
for label, detection in list(self.detections.items()):
if detection:
self.requestor.send_data(
f"{self.camera_config.name}/audio/{label}", "OFF"
)
self.event_metadata_publisher.publish(
(detection["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
self.detections[label] = None
def start_or_restart_ffmpeg(self) -> None: def start_or_restart_ffmpeg(self) -> None:
self.audio_listener = start_or_restart_ffmpeg( self.audio_listener = start_or_restart_ffmpeg(
self.ffmpeg_cmd, self.ffmpeg_cmd,
@ -406,7 +335,9 @@ class AudioEventMaintainer(threading.Thread):
self.logger.debug( self.logger.debug(
f"Disabling audio detections for {self.camera_config.name}, ending events" f"Disabling audio detections for {self.camera_config.name}, ending events"
) )
self.expire_all_detections() self.requestor.send_data(
EXPIRE_AUDIO_ACTIVITY, self.camera_config.name
)
stop_ffmpeg(self.audio_listener, self.logger) stop_ffmpeg(self.audio_listener, self.logger)
self.audio_listener = None self.audio_listener = None
self.was_enabled = enabled self.was_enabled = enabled