From 27bfc81a20b7fcb00f7752e8a7ac4c2c70c852a5 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 26 May 2025 11:52:18 -0500 Subject: [PATCH] run real-time transcription in its own thread --- .../real_time/audio_transcription.py | 96 ++++++++++++++----- frigate/events/audio.py | 15 +++ 2 files changed, 89 insertions(+), 22 deletions(-) diff --git a/frigate/data_processing/real_time/audio_transcription.py b/frigate/data_processing/real_time/audio_transcription.py index bd95cb407..9dc2783c3 100644 --- a/frigate/data_processing/real_time/audio_transcription.py +++ b/frigate/data_processing/real_time/audio_transcription.py @@ -3,6 +3,8 @@ import json import logging import os +import queue +import threading from typing import Optional import numpy as np @@ -28,6 +30,7 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): camera_config: CameraConfig, requestor: InterProcessRequestor, metrics: DataProcessorMetrics, + stop_event: threading.Event, ): super().__init__(config, metrics) self.config = config @@ -36,6 +39,8 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): self.recognizer = None self.stream = None self.transcription_segments = [] + self.audio_queue = queue.Queue() + self.stop_event = stop_event if self.config.audio_transcription.model_size == "large": self.asr = FasterWhisperASR( @@ -46,7 +51,7 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): lan=config.audio_transcription.language, model_dir=os.path.join(MODEL_CACHE_DIR, "whisper"), ) - # self.asr.use_vad() # Enable Silero VAD for low-RMS audio + self.asr.use_vad() # Enable Silero VAD for low-RMS audio else: # small model as default @@ -113,7 +118,7 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): def __process_audio_stream( self, audio_data: np.ndarray - ) -> Optional[tuple[str, float, bool]]: + ) -> Optional[tuple[str, bool]]: if (not self.recognizer or not self.stream) and not self.online: logger.debug( "Audio transcription (streaming) recognizer or stream not initialized" @@ -174,31 +179,56 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): pass def process_audio(self, obj_data: dict[str, any], audio: np.ndarray) -> bool | None: - camera = obj_data["camera"] - if audio is None or audio.size == 0: logger.debug("No audio data provided for transcription") - return + return None - result = self.__process_audio_stream(audio) + # enqueue audio data for processing in the thread + self.audio_queue.put((obj_data, audio)) + return None - if not result: - return - - text, is_endpoint = result - logger.debug(f"Transcribed audio: '{text}', Endpoint: {is_endpoint}") - - self.requestor.send_data( - "tracked_object_update", - json.dumps( - { - "type": TrackedObjectUpdateTypesEnum.transcription, - "text": text, - "camera": camera, - } - ), + def run(self) -> None: + """Run method for the transcription thread to process queued audio data.""" + logger.debug( + f"Starting audio transcription thread for {self.camera_config.name}" + ) + while not self.stop_event.is_set(): + try: + # Get audio data from queue with a timeout to check stop_event + obj_data, audio = self.audio_queue.get(timeout=0.1) + result = self.__process_audio_stream(audio) + + if not result: + continue + + text, is_endpoint = result + logger.debug(f"Transcribed audio: '{text}', Endpoint: {is_endpoint}") + + self.requestor.send_data( + "tracked_object_update", + json.dumps( + { + "type": TrackedObjectUpdateTypesEnum.transcription, + "text": text, + "camera": obj_data["camera"], + } + ), + ) + + self.audio_queue.task_done() + + if is_endpoint: + self.reset(obj_data["camera"]) + + except queue.Empty: + continue + except Exception as e: + logger.error(f"Error processing audio in thread: {e}") + self.audio_queue.task_done() + + logger.debug( + f"Stopping audio transcription thread for {self.camera_config.name}" ) - return is_endpoint def reset(self, camera: str) -> None: if self.config.audio_transcription.model_size == "large": @@ -223,8 +253,30 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): # reset sherpa self.recognizer.reset(self.stream) + # Clear the audio queue + while not self.audio_queue.empty(): + try: + self.audio_queue.get_nowait() + self.audio_queue.task_done() + except queue.Empty: + break + logger.debug("Stream reset") + def stop(self) -> None: + """Stop the transcription thread and clean up.""" + self.stop_event.set() + # Clear the queue to prevent processing stale data + while not self.audio_queue.empty(): + try: + self.audio_queue.get_nowait() + self.audio_queue.task_done() + except queue.Empty: + break + logger.debug( + f"Transcription thread stop signaled for {self.camera_config.name}" + ) + def handle_request( self, topic: str, request_data: dict[str, any] ) -> dict[str, any] | None: diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 5bf8c63b0..a19746f3d 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -149,6 +149,7 @@ class AudioEventMaintainer(threading.Thread): self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio") self.audio_listener = None self.transcription_processor = None + self.transcription_thread = None # create communication for audio detections self.requestor = InterProcessRequestor() @@ -170,8 +171,16 @@ class AudioEventMaintainer(threading.Thread): camera_config=self.camera_config, requestor=self.requestor, metrics=self.camera_metrics[self.camera_config.name], + stop_event=self.stop_event, ) + self.transcription_thread = threading.Thread( + target=self.transcription_processor.run, + name=f"{self.camera_config.name}_transcription_processor", + daemon=True, + ) + self.transcription_thread.start() + self.was_enabled = camera.enabled def detect_audio(self, audio) -> None: @@ -399,6 +408,12 @@ class AudioEventMaintainer(threading.Thread): if self.audio_listener: stop_ffmpeg(self.audio_listener, self.logger) + if self.transcription_thread: + self.transcription_thread.join(timeout=2) + if self.transcription_thread.is_alive(): + self.logger.warning( + f"Audio transcription thread {self.transcription_thread.name} is still alive" + ) self.logpipe.close() self.requestor.stop() self.config_subscriber.stop()