From 5c421375c053ddca1ee449ba99e08b3037bb5c6c Mon Sep 17 00:00:00 2001 From: George Tsiamasiotis Date: Tue, 24 Sep 2024 10:29:05 +0300 Subject: [PATCH] Subclass Process for audio_process --- frigate/app.py | 24 ++--- frigate/events/audio.py | 205 ++++++++++++++++++++++------------------ 2 files changed, 117 insertions(+), 112 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 00add1acc..e233820ee 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -37,7 +37,7 @@ from frigate.const import ( RECORD_DIR, ) from frigate.embeddings import EmbeddingsContext, manage_embeddings -from frigate.events.audio import listen_to_audio +from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.external import ExternalEventProcessor from frigate.events.maintainer import EventProcessor @@ -489,20 +489,9 @@ class FrigateApp: logger.info(f"Capture process started for {name}: {capture_process.pid}") def start_audio_processors(self) -> None: - self.audio_process = None - if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0: - self.audio_process = mp.Process( - target=listen_to_audio, - name="audio_capture", - args=( - self.config, - self.camera_metrics, - ), - ) - self.audio_process.daemon = True - self.audio_process.start() - self.processes["audio_detector"] = self.audio_process.pid or 0 - logger.info(f"Audio process started: {self.audio_process.pid}") + self.audio_process = AudioProcessor(self.config, self.camera_metrics) + self.audio_process.start() + self.processes["audio_detector"] = self.audio_process.pid or 0 def start_timeline_processor(self) -> None: self.timeline_processor = TimelineProcessor( @@ -686,9 +675,8 @@ class FrigateApp: ).execute() # stop the audio process - if self.audio_process is not None: - self.audio_process.terminate() - self.audio_process.join() + self.audio_process.terminate() + self.audio_process.join() # ensure the capture processes are done for camera in self.camera_metrics.keys(): diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 5875dca84..e617d29c6 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -4,14 +4,13 @@ import datetime import logging import multiprocessing as mp import signal +import sys import threading import time -from types import FrameType -from typing import Optional, Tuple +from typing import Tuple import numpy as np import requests -from setproctitle import setproctitle from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum @@ -30,7 +29,6 @@ from frigate.log import LogPipe from frigate.object_detection import load_labels from frigate.types import CameraMetricsTypes from frigate.util.builtin import get_ffmpeg_arg_list -from frigate.util.services import listen from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg try: @@ -38,8 +36,6 @@ try: except ModuleNotFoundError: from tensorflow.lite.python.interpreter import Interpreter -logger = logging.getLogger(__name__) - def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0] @@ -69,97 +65,58 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ) -def listen_to_audio( - config: FrigateConfig, - camera_metrics: dict[str, CameraMetricsTypes], -) -> None: - stop_event = mp.Event() - audio_threads: list[threading.Thread] = [] +class AudioProcessor(mp.Process): + def __init__( + self, + config: FrigateConfig, + camera_metrics: dict[str, CameraMetricsTypes], + ): + super().__init__(name="frigate.audio_manager", daemon=True) - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() + self.logger = logging.getLogger(self.name) + self.camera_metrics = camera_metrics + self.audio_cameras = [ + c + for c in config.cameras.values() + if c.enabled and c.audio.enabled_in_config + ] - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) + def run(self) -> None: + stop_event = threading.Event() + audio_threads: list[AudioEventMaintainer] = [] - threading.current_thread().name = "process:audio_manager" - setproctitle("frigate.audio_manager") - listen() + threading.current_thread().name = "process:audio_manager" + signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit()) - for camera in config.cameras.values(): - if camera.enabled and camera.audio.enabled_in_config: - audio = AudioEventMaintainer( - camera, - camera_metrics, - stop_event, - ) - audio_threads.append(audio) - audio.start() + if len(self.audio_cameras) == 0: + return - for thread in audio_threads: - thread.join() + try: + for camera in self.audio_cameras: + audio_thread = AudioEventMaintainer( + camera, + self.camera_metrics, + stop_event, + ) + audio_threads.append(audio_thread) + audio_thread.start() - logger.info("Exiting audio detector...") + self.logger.info(f"Audio process started (pid: {self.pid})") + while True: + signal.pause() + finally: + stop_event.set() + for thread in audio_threads: + thread.join(1) + if thread.is_alive(): + self.logger.info(f"Waiting for thread {thread.name:s} to exit") + thread.join(10) -class AudioTfl: - def __init__(self, stop_event: mp.Event, num_threads=2): - self.stop_event = stop_event - self.num_threads = num_threads - self.labels = load_labels("/audio-labelmap.txt", prefill=521) - self.interpreter = Interpreter( - model_path="/cpu_audio_model.tflite", - num_threads=self.num_threads, - ) - - self.interpreter.allocate_tensors() - - self.tensor_input_details = self.interpreter.get_input_details() - self.tensor_output_details = self.interpreter.get_output_details() - - def _detect_raw(self, tensor_input): - self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) - self.interpreter.invoke() - detections = np.zeros((20, 6), np.float32) - - res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0] - non_zero_indices = res > 0 - class_ids = np.argpartition(-res, 20)[:20] - class_ids = class_ids[np.argsort(-res[class_ids])] - class_ids = class_ids[non_zero_indices[class_ids]] - scores = res[class_ids] - boxes = np.full((scores.shape[0], 4), -1, np.float32) - count = len(scores) - - for i in range(count): - if scores[i] < AUDIO_MIN_CONFIDENCE or i == 20: - break - detections[i] = [ - class_ids[i], - float(scores[i]), - boxes[i][0], - boxes[i][1], - boxes[i][2], - boxes[i][3], - ] - - return detections - - def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE): - detections = [] - - if self.stop_event.is_set(): - return detections - - raw_detections = self._detect_raw(tensor_input) - - for d in raw_detections: - if d[1] < threshold: - break - detections.append( - (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) - ) - return detections + for thread in audio_threads: + if thread.is_alive(): + self.logger.warning(f"Thread {thread.name} is still alive") + self.logger.info("Exiting audio process") class AudioEventMaintainer(threading.Thread): @@ -167,10 +124,10 @@ class AudioEventMaintainer(threading.Thread): self, camera: CameraConfig, camera_metrics: dict[str, CameraMetricsTypes], - stop_event: mp.Event, + stop_event: threading.Event, ) -> None: - threading.Thread.__init__(self) - self.name = f"{camera.name}_audio_event_processor" + super().__init__(name=f"{camera.name}_audio_event_processor") + self.config = camera self.camera_metrics = camera_metrics self.detections: dict[dict[str, any]] = {} @@ -206,7 +163,7 @@ class AudioEventMaintainer(threading.Thread): audio_detections = [] for label, score, _ in model_detections: - logger.debug( + self.logger.debug( f"{self.config.name} heard {label} with a score of {score}" ) @@ -291,7 +248,7 @@ class AudioEventMaintainer(threading.Thread): if resp.status_code == 200: self.detections[detection["label"]] = None else: - self.logger.warn( + self.logger.warning( f"Failed to end audio event {detection['id']} with status code {resp.status_code}" ) @@ -350,3 +307,63 @@ class AudioEventMaintainer(threading.Thread): self.requestor.stop() self.config_subscriber.stop() self.detection_publisher.stop() + + +class AudioTfl: + def __init__(self, stop_event: threading.Event, num_threads=2): + self.stop_event = stop_event + self.num_threads = num_threads + self.labels = load_labels("/audio-labelmap.txt", prefill=521) + self.interpreter = Interpreter( + model_path="/cpu_audio_model.tflite", + num_threads=self.num_threads, + ) + + self.interpreter.allocate_tensors() + + self.tensor_input_details = self.interpreter.get_input_details() + self.tensor_output_details = self.interpreter.get_output_details() + + def _detect_raw(self, tensor_input): + self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) + self.interpreter.invoke() + detections = np.zeros((20, 6), np.float32) + + res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0] + non_zero_indices = res > 0 + class_ids = np.argpartition(-res, 20)[:20] + class_ids = class_ids[np.argsort(-res[class_ids])] + class_ids = class_ids[non_zero_indices[class_ids]] + scores = res[class_ids] + boxes = np.full((scores.shape[0], 4), -1, np.float32) + count = len(scores) + + for i in range(count): + if scores[i] < AUDIO_MIN_CONFIDENCE or i == 20: + break + detections[i] = [ + class_ids[i], + float(scores[i]), + boxes[i][0], + boxes[i][1], + boxes[i][2], + boxes[i][3], + ] + + return detections + + def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE): + detections = [] + + if self.stop_event.is_set(): + return detections + + raw_detections = self._detect_raw(tensor_input) + + for d in raw_detections: + if d[1] < threshold: + break + detections.append( + (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) + ) + return detections