run real-time transcription in its own thread

This commit is contained in:
Josh Hawkins 2025-05-26 11:52:18 -05:00
parent 8cf1e1cdf1
commit 27bfc81a20
2 changed files with 89 additions and 22 deletions

View File

@ -3,6 +3,8 @@
import json import json
import logging import logging
import os import os
import queue
import threading
from typing import Optional from typing import Optional
import numpy as np import numpy as np
@ -28,6 +30,7 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi):
camera_config: CameraConfig, camera_config: CameraConfig,
requestor: InterProcessRequestor, requestor: InterProcessRequestor,
metrics: DataProcessorMetrics, metrics: DataProcessorMetrics,
stop_event: threading.Event,
): ):
super().__init__(config, metrics) super().__init__(config, metrics)
self.config = config self.config = config
@ -36,6 +39,8 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi):
self.recognizer = None self.recognizer = None
self.stream = None self.stream = None
self.transcription_segments = [] self.transcription_segments = []
self.audio_queue = queue.Queue()
self.stop_event = stop_event
if self.config.audio_transcription.model_size == "large": if self.config.audio_transcription.model_size == "large":
self.asr = FasterWhisperASR( self.asr = FasterWhisperASR(
@ -46,7 +51,7 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi):
lan=config.audio_transcription.language, lan=config.audio_transcription.language,
model_dir=os.path.join(MODEL_CACHE_DIR, "whisper"), model_dir=os.path.join(MODEL_CACHE_DIR, "whisper"),
) )
# self.asr.use_vad() # Enable Silero VAD for low-RMS audio self.asr.use_vad() # Enable Silero VAD for low-RMS audio
else: else:
# small model as default # small model as default
@ -113,7 +118,7 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi):
def __process_audio_stream( def __process_audio_stream(
self, audio_data: np.ndarray self, audio_data: np.ndarray
) -> Optional[tuple[str, float, bool]]: ) -> Optional[tuple[str, bool]]:
if (not self.recognizer or not self.stream) and not self.online: if (not self.recognizer or not self.stream) and not self.online:
logger.debug( logger.debug(
"Audio transcription (streaming) recognizer or stream not initialized" "Audio transcription (streaming) recognizer or stream not initialized"
@ -174,31 +179,56 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi):
pass pass
def process_audio(self, obj_data: dict[str, any], audio: np.ndarray) -> bool | None: def process_audio(self, obj_data: dict[str, any], audio: np.ndarray) -> bool | None:
camera = obj_data["camera"]
if audio is None or audio.size == 0: if audio is None or audio.size == 0:
logger.debug("No audio data provided for transcription") logger.debug("No audio data provided for transcription")
return return None
result = self.__process_audio_stream(audio) # enqueue audio data for processing in the thread
self.audio_queue.put((obj_data, audio))
return None
if not result: def run(self) -> None:
return """Run method for the transcription thread to process queued audio data."""
logger.debug(
text, is_endpoint = result f"Starting audio transcription thread for {self.camera_config.name}"
logger.debug(f"Transcribed audio: '{text}', Endpoint: {is_endpoint}") )
while not self.stop_event.is_set():
self.requestor.send_data( try:
"tracked_object_update", # Get audio data from queue with a timeout to check stop_event
json.dumps( obj_data, audio = self.audio_queue.get(timeout=0.1)
{ result = self.__process_audio_stream(audio)
"type": TrackedObjectUpdateTypesEnum.transcription,
"text": text, if not result:
"camera": camera, continue
}
), text, is_endpoint = result
logger.debug(f"Transcribed audio: '{text}', Endpoint: {is_endpoint}")
self.requestor.send_data(
"tracked_object_update",
json.dumps(
{
"type": TrackedObjectUpdateTypesEnum.transcription,
"text": text,
"camera": obj_data["camera"],
}
),
)
self.audio_queue.task_done()
if is_endpoint:
self.reset(obj_data["camera"])
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error processing audio in thread: {e}")
self.audio_queue.task_done()
logger.debug(
f"Stopping audio transcription thread for {self.camera_config.name}"
) )
return is_endpoint
def reset(self, camera: str) -> None: def reset(self, camera: str) -> None:
if self.config.audio_transcription.model_size == "large": if self.config.audio_transcription.model_size == "large":
@ -223,8 +253,30 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi):
# reset sherpa # reset sherpa
self.recognizer.reset(self.stream) self.recognizer.reset(self.stream)
# Clear the audio queue
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
self.audio_queue.task_done()
except queue.Empty:
break
logger.debug("Stream reset") logger.debug("Stream reset")
def stop(self) -> None:
"""Stop the transcription thread and clean up."""
self.stop_event.set()
# Clear the queue to prevent processing stale data
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
self.audio_queue.task_done()
except queue.Empty:
break
logger.debug(
f"Transcription thread stop signaled for {self.camera_config.name}"
)
def handle_request( def handle_request(
self, topic: str, request_data: dict[str, any] self, topic: str, request_data: dict[str, any]
) -> dict[str, any] | None: ) -> dict[str, any] | None:

View File

@ -149,6 +149,7 @@ class AudioEventMaintainer(threading.Thread):
self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio") self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio")
self.audio_listener = None self.audio_listener = None
self.transcription_processor = None self.transcription_processor = None
self.transcription_thread = None
# create communication for audio detections # create communication for audio detections
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
@ -170,8 +171,16 @@ class AudioEventMaintainer(threading.Thread):
camera_config=self.camera_config, camera_config=self.camera_config,
requestor=self.requestor, requestor=self.requestor,
metrics=self.camera_metrics[self.camera_config.name], metrics=self.camera_metrics[self.camera_config.name],
stop_event=self.stop_event,
) )
self.transcription_thread = threading.Thread(
target=self.transcription_processor.run,
name=f"{self.camera_config.name}_transcription_processor",
daemon=True,
)
self.transcription_thread.start()
self.was_enabled = camera.enabled self.was_enabled = camera.enabled
def detect_audio(self, audio) -> None: def detect_audio(self, audio) -> None:
@ -399,6 +408,12 @@ class AudioEventMaintainer(threading.Thread):
if self.audio_listener: if self.audio_listener:
stop_ffmpeg(self.audio_listener, self.logger) stop_ffmpeg(self.audio_listener, self.logger)
if self.transcription_thread:
self.transcription_thread.join(timeout=2)
if self.transcription_thread.is_alive():
self.logger.warning(
f"Audio transcription thread {self.transcription_thread.name} is still alive"
)
self.logpipe.close() self.logpipe.close()
self.requestor.stop() self.requestor.stop()
self.config_subscriber.stop() self.config_subscriber.stop()