From c5dfc36171099257a7558ad74afc25ebc11d3521 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 26 May 2025 07:21:48 -0500 Subject: [PATCH] api and transcription post processor --- frigate/api/classification.py | 60 ++++- .../api/defs/request/classification_body.py | 4 + .../post/audio_transcription.py | 212 ++++++++++++++++++ frigate/types.py | 1 + 4 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 frigate/data_processing/post/audio_transcription.py diff --git a/frigate/api/classification.py b/frigate/api/classification.py index 75ca13735..81112933c 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -14,7 +14,10 @@ from peewee import DoesNotExist from playhouse.shortcuts import model_to_dict from frigate.api.auth import require_role -from frigate.api.defs.request.classification_body import RenameFaceBody +from frigate.api.defs.request.classification_body import ( + AudioTranscriptionBody, + RenameFaceBody, +) from frigate.api.defs.tags import Tags from frigate.config.camera import DetectConfig from frigate.const import FACE_DIR @@ -366,3 +369,58 @@ def reindex_embeddings(request: Request): }, status_code=500, ) + + +@router.put("/audio/transcribe") +def transcribe_audio(request: Request, body: AudioTranscriptionBody): + event_id = body.event_id + + try: + event = Event.get(Event.id == event_id) + except DoesNotExist: + message = f"Event {event_id} not found" + logger.error(message) + return JSONResponse( + content=({"success": False, "message": message}), status_code=404 + ) + + if not request.app.frigate_config.cameras[event.camera].audio_transcription.enabled: + message = f"Audio transcription is not enabled for {event.camera}." + logger.error(message) + return JSONResponse( + content=( + { + "success": False, + "message": message, + } + ), + status_code=400, + ) + + context: EmbeddingsContext = request.app.embeddings + response = context.transcribe_audio(model_to_dict(event)) + + if response == "started": + return JSONResponse( + content={ + "success": True, + "message": "Audio transcription has started.", + }, + status_code=202, # 202 Accepted + ) + elif response == "in_progress": + return JSONResponse( + content={ + "success": False, + "message": "Audio transcription for a speech event is currently in progress. Try again later.", + }, + status_code=409, # 409 Conflict + ) + else: + return JSONResponse( + content={ + "success": False, + "message": "Failed to transcribe audio.", + }, + status_code=500, + ) diff --git a/frigate/api/defs/request/classification_body.py b/frigate/api/defs/request/classification_body.py index c4a32c332..31c5688bf 100644 --- a/frigate/api/defs/request/classification_body.py +++ b/frigate/api/defs/request/classification_body.py @@ -3,3 +3,7 @@ from pydantic import BaseModel class RenameFaceBody(BaseModel): new_name: str + + +class AudioTranscriptionBody(BaseModel): + event_id: str diff --git a/frigate/data_processing/post/audio_transcription.py b/frigate/data_processing/post/audio_transcription.py new file mode 100644 index 000000000..528a70dc0 --- /dev/null +++ b/frigate/data_processing/post/audio_transcription.py @@ -0,0 +1,212 @@ +"""Handle post-processing for audio transcription.""" + +import logging +import os +import threading +import time +from typing import Optional + +from faster_whisper import WhisperModel +from peewee import DoesNotExist + +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import FrigateConfig +from frigate.const import ( + CACHE_DIR, + MODEL_CACHE_DIR, + UPDATE_EVENT_DESCRIPTION, +) +from frigate.data_processing.types import PostProcessDataEnum +from frigate.embeddings.embeddings import Embeddings +from frigate.types import TrackedObjectUpdateTypesEnum +from frigate.util.audio import get_audio_from_recording + +from ..types import DataProcessorMetrics +from .api import PostProcessorApi + +logger = logging.getLogger(__name__) + + +class AudioTranscriptionPostProcessor(PostProcessorApi): + def __init__( + self, + config: FrigateConfig, + requestor: InterProcessRequestor, + metrics: DataProcessorMetrics, + embeddings: Embeddings, + ): + super().__init__(config, metrics, None) + self.config = config + self.requestor = requestor + self.embeddings = embeddings + self.recognizer = None + self.transcription_lock = threading.Lock() + self.transcription_thread = None + self.transcription_running = False + + # faster-whisper handles model downloading automatically + self.model_path = os.path.join(MODEL_CACHE_DIR, "whisper") + os.makedirs(self.model_path, exist_ok=True) + + self.__build_recognizer() + + def __build_recognizer(self) -> None: + try: + self.recognizer = WhisperModel( + model_size_or_path="small", + device="cuda" + if self.config.audio_transcription.device == "GPU" + else "cpu", + download_root=self.model_path, + local_files_only=False, # Allow downloading if not cached + compute_type="int8", + ) + logger.debug("Audio transcription (recordings) initialized") + except Exception as e: + logger.error(f"Failed to initialize recordings audio transcription: {e}") + self.recognizer = None + + def process_data( + self, data: dict[str, any], data_type: PostProcessDataEnum + ) -> None: + """Transcribe audio from a recording. + + Args: + data (dict): Contains data about the input (event_id, camera, etc.). + data_type (enum): Describes the data being processed (recording or tracked_object). + + Returns: + None + """ + event_id = data["event_id"] + camera_name = data["camera"] + + if data_type == PostProcessDataEnum.recording: + start_ts = data["frame_time"] + recordings_available_through = data["recordings_available"] + end_ts = min(recordings_available_through, start_ts + 60) # Default 60s + + elif data_type == PostProcessDataEnum.tracked_object: + obj_data = data["event"]["data"] + obj_data["id"] = data["event"]["id"] + obj_data["camera"] = data["event"]["camera"] + start_ts = data["event"]["start_time"] + end_ts = data["event"].get( + "end_time", start_ts + 60 + ) # Use end_time if available + + else: + logger.error("No data type passed to audio transcription post-processing") + return + + try: + audio_data = get_audio_from_recording( + self.config.cameras[camera_name].ffmpeg, + camera_name, + start_ts, + end_ts, + sample_rate=16000, + ) + + if not audio_data: + logger.debug(f"No audio data extracted for {event_id}") + return + + transcription = self.__transcribe_audio(audio_data) + if not transcription: + logger.debug("No transcription generated from audio") + return + + logger.debug(f"Transcribed audio for {event_id}: '{transcription}'") + + self.requestor.send_data( + UPDATE_EVENT_DESCRIPTION, + { + "type": TrackedObjectUpdateTypesEnum.description, + "id": event_id, + "description": transcription, + "camera": camera_name, + }, + ) + + # Embed the description + if self.config.semantic_search.enabled: + self.embeddings.embed_description(event_id, transcription) + + except DoesNotExist: + logger.debug("No recording found for audio transcription post-processing") + return + except Exception as e: + logger.error(f"Error in audio transcription post-processing: {e}") + + def __transcribe_audio(self, audio_data: bytes) -> Optional[tuple[str, float]]: + """Transcribe WAV audio data using faster-whisper.""" + if not self.recognizer: + logger.debug("Recognizer not initialized") + return None + + try: + # Save audio data to a temporary wav (faster-whisper expects a file) + temp_wav = os.path.join(CACHE_DIR, f"temp_audio_{int(time.time())}.wav") + with open(temp_wav, "wb") as f: + f.write(audio_data) + + segments, info = self.recognizer.transcribe( + temp_wav, + language=self.config.audio_transcription.language, + beam_size=5, + ) + + os.remove(temp_wav) + + # Combine all segment texts + text = " ".join(segment.text.strip() for segment in segments) + if not text: + return None + + logger.debug( + "Detected language '%s' with probability %f" + % (info.language, info.language_probability) + ) + + return text + except Exception as e: + logger.error(f"Error transcribing audio: {e}") + return None + + def _transcription_wrapper(self, event: dict[str, any]) -> None: + """Wrapper to run transcription and reset running flag when done.""" + try: + self.process_data( + { + "event_id": event["id"], + "camera": event["camera"], + "event": event, + }, + PostProcessDataEnum.tracked_object, + ) + finally: + with self.transcription_lock: + self.transcription_running = False + self.transcription_thread = None + + def handle_request(self, topic: str, request_data: dict[str, any]) -> bool | None: + if topic == "transcribe_audio": + event = request_data["event"] + + with self.transcription_lock: + if self.transcription_running: + logger.warning( + "Audio transcription for a speech event is already running." + ) + return False + + # Mark as running and start the thread + self.transcription_running = True + self.transcription_thread = threading.Thread( + target=self._transcription_wrapper, args=(event,), daemon=True + ) + self.transcription_thread.start() + return True + + return None diff --git a/frigate/types.py b/frigate/types.py index ee48cc02b..13d51390f 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -27,3 +27,4 @@ class TrackedObjectUpdateTypesEnum(str, Enum): description = "description" face = "face" lpr = "lpr" + transcription = "transcription"