From 55c995b099d0015980f34c91a984e24be78d2b17 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 26 Mar 2026 11:58:25 -0600 Subject: [PATCH] Cleanup events mypy --- frigate/events/audio.py | 55 +++++++++++++++++++++++------------- frigate/events/cleanup.py | 28 +++++++++--------- frigate/events/maintainer.py | 31 +++++++++++++------- frigate/mypy.ini | 3 -- 4 files changed, 71 insertions(+), 46 deletions(-) diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 505874469..492c6adca 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -2,17 +2,19 @@ import datetime import logging +import subprocess import threading import time from multiprocessing.managers import DictProxy from multiprocessing.synchronize import Event as MpEvent -from typing import Tuple +from typing import Any, Tuple import numpy as np +from config.camera.ffmpeg import CameraFfmpegConfig from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor -from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig +from frigate.config import CameraConfig, CameraInput, FrigateConfig from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, @@ -35,8 +37,7 @@ from frigate.data_processing.real_time.audio_transcription import ( ) from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe, suppress_stderr_during -from frigate.object_detection.base import load_labels -from frigate.util.builtin import get_ffmpeg_arg_list +from frigate.util.builtin import get_ffmpeg_arg_list, load_labels from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg from frigate.util.process import FrigateProcess @@ -49,7 +50,7 @@ except ModuleNotFoundError: logger = logging.getLogger(__name__) -def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: +def get_ffmpeg_command(ffmpeg: CameraFfmpegConfig) -> list[str]: ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0] input_args = get_ffmpeg_arg_list(ffmpeg.global_args) + ( parse_preset_input(ffmpeg_input.input_args, 1) @@ -102,9 +103,11 @@ class AudioProcessor(FrigateProcess): threading.current_thread().name = "process:audio_manager" if self.config.audio_transcription.enabled: - self.transcription_model_runner = AudioTranscriptionModelRunner( - self.config.audio_transcription.device, - self.config.audio_transcription.model_size, + self.transcription_model_runner: AudioTranscriptionModelRunner | None = ( + AudioTranscriptionModelRunner( + self.config.audio_transcription.device or "AUTO", + self.config.audio_transcription.model_size, + ) ) else: self.transcription_model_runner = None @@ -118,7 +121,7 @@ class AudioProcessor(FrigateProcess): self.config, self.camera_metrics, self.transcription_model_runner, - self.stop_event, + self.stop_event, # type: ignore[arg-type] ) audio_threads.append(audio_thread) audio_thread.start() @@ -162,7 +165,7 @@ class AudioEventMaintainer(threading.Thread): self.logger = logging.getLogger(f"audio.{self.camera_config.name}") self.ffmpeg_cmd = get_ffmpeg_command(self.camera_config.ffmpeg) self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio") - self.audio_listener = None + self.audio_listener: subprocess.Popen[Any] | None = None self.audio_transcription_model_runner = audio_transcription_model_runner self.transcription_processor = None self.transcription_thread = None @@ -171,7 +174,7 @@ class AudioEventMaintainer(threading.Thread): self.requestor = InterProcessRequestor() self.config_subscriber = CameraConfigUpdateSubscriber( None, - {self.camera_config.name: self.camera_config}, + {str(self.camera_config.name): self.camera_config}, [ CameraConfigUpdateEnum.audio, CameraConfigUpdateEnum.enabled, @@ -180,7 +183,10 @@ class AudioEventMaintainer(threading.Thread): ) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value) - if self.config.audio_transcription.enabled: + if ( + self.config.audio_transcription.enabled + and self.audio_transcription_model_runner is not None + ): # init the transcription processor for this camera self.transcription_processor = AudioTranscriptionRealTimeProcessor( config=self.config, @@ -200,11 +206,11 @@ class AudioEventMaintainer(threading.Thread): self.was_enabled = camera.enabled - def detect_audio(self, audio) -> None: + def detect_audio(self, audio: np.ndarray) -> None: if not self.camera_config.audio.enabled or self.stop_event.is_set(): return - audio_as_float = audio.astype(np.float32) + audio_as_float: np.ndarray = audio.astype(np.float32) rms, dBFS = self.calculate_audio_levels(audio_as_float) self.camera_metrics[self.camera_config.name].audio_rms.value = rms @@ -261,7 +267,7 @@ class AudioEventMaintainer(threading.Thread): else: self.transcription_processor.check_unload_model() - def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]: + def calculate_audio_levels(self, audio_as_float: np.ndarray) -> Tuple[float, float]: # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude # Note: np.float32 isn't serializable, we must use np.float64 to publish the message rms = np.sqrt(np.mean(np.absolute(np.square(audio_as_float)))) @@ -296,6 +302,10 @@ class AudioEventMaintainer(threading.Thread): self.logpipe.dump() self.start_or_restart_ffmpeg() + if self.audio_listener is None or self.audio_listener.stdout is None: + log_and_restart() + return + try: chunk = self.audio_listener.stdout.read(self.chunk_size) @@ -341,7 +351,10 @@ class AudioEventMaintainer(threading.Thread): self.requestor.send_data( EXPIRE_AUDIO_ACTIVITY, self.camera_config.name ) - stop_ffmpeg(self.audio_listener, self.logger) + + if self.audio_listener: + stop_ffmpeg(self.audio_listener, self.logger) + self.audio_listener = None self.was_enabled = enabled continue @@ -367,7 +380,7 @@ class AudioEventMaintainer(threading.Thread): class AudioTfl: - def __init__(self, stop_event: threading.Event, num_threads=2): + def __init__(self, stop_event: threading.Event, num_threads: int = 2) -> None: self.stop_event = stop_event self.num_threads = num_threads self.labels = load_labels("/audio-labelmap.txt", prefill=521) @@ -382,7 +395,7 @@ class AudioTfl: self.tensor_input_details = self.interpreter.get_input_details() self.tensor_output_details = self.interpreter.get_output_details() - def _detect_raw(self, tensor_input): + def _detect_raw(self, tensor_input: np.ndarray) -> np.ndarray: self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.invoke() detections = np.zeros((20, 6), np.float32) @@ -410,8 +423,10 @@ class AudioTfl: return detections - def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE): - detections = [] + def detect( + self, tensor_input: np.ndarray, threshold: float = AUDIO_MIN_CONFIDENCE + ) -> list[tuple[str, float, tuple[float, float, float, float]]]: + detections: list[tuple[str, float, tuple[float, float, float, float]]] = [] if self.stop_event.is_set(): return detections diff --git a/frigate/events/cleanup.py b/frigate/events/cleanup.py index 263c5f18e..b867bf947 100644 --- a/frigate/events/cleanup.py +++ b/frigate/events/cleanup.py @@ -29,7 +29,7 @@ class EventCleanup(threading.Thread): self.stop_event = stop_event self.db = db self.camera_keys = list(self.config.cameras.keys()) - self.removed_camera_labels: list[str] = None + self.removed_camera_labels: list[Event] | None = None self.camera_labels: dict[str, dict[str, Any]] = {} def get_removed_camera_labels(self) -> list[Event]: @@ -37,7 +37,7 @@ class EventCleanup(threading.Thread): if self.removed_camera_labels is None: self.removed_camera_labels = list( Event.select(Event.label) - .where(Event.camera.not_in(self.camera_keys)) + .where(Event.camera.not_in(self.camera_keys)) # type: ignore[arg-type,call-arg,misc] .distinct() .execute() ) @@ -61,7 +61,7 @@ class EventCleanup(threading.Thread): ), } - return self.camera_labels[camera]["labels"] + return self.camera_labels[camera]["labels"] # type: ignore[no-any-return] def expire_snapshots(self) -> list[str]: ## Expire events from unlisted cameras based on the global config @@ -74,7 +74,9 @@ class EventCleanup(threading.Thread): # loop over object types in db for event in distinct_labels: # get expiration time for this label - expire_days = retain_config.objects.get(event.label, retain_config.default) + expire_days = retain_config.objects.get( + str(event.label), retain_config.default + ) expire_after = ( datetime.datetime.now() - datetime.timedelta(days=expire_days) @@ -87,7 +89,7 @@ class EventCleanup(threading.Thread): Event.thumbnail, ) .where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.label == event.label, Event.retain_indefinitely == False, @@ -109,16 +111,16 @@ class EventCleanup(threading.Thread): # update the clips attribute for the db entry query = Event.select(Event.id).where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.label == event.label, Event.retain_indefinitely == False, ) - events_to_update = [] + events_to_update: list[str] = [] for event in query.iterator(): - events_to_update.append(event.id) + events_to_update.append(str(event.id)) if len(events_to_update) >= CHUNK_SIZE: logger.debug( f"Updating {update_params} for {len(events_to_update)} events" @@ -150,7 +152,7 @@ class EventCleanup(threading.Thread): for event in distinct_labels: # get expiration time for this label expire_days = retain_config.objects.get( - event.label, retain_config.default + str(event.label), retain_config.default ) expire_after = ( @@ -177,7 +179,7 @@ class EventCleanup(threading.Thread): # only snapshots are stored in /clips # so no need to delete mp4 files for event in expired_events: - events_to_update.append(event.id) + events_to_update.append(str(event.id)) deleted = delete_event_snapshot(event) if not deleted: @@ -214,7 +216,7 @@ class EventCleanup(threading.Thread): Event.camera, ) .where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.retain_indefinitely == False, ) @@ -245,7 +247,7 @@ class EventCleanup(threading.Thread): # update the clips attribute for the db entry query = Event.select(Event.id).where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.retain_indefinitely == False, ) @@ -358,7 +360,7 @@ class EventCleanup(threading.Thread): logger.debug(f"Found {len(events_to_delete)} events that can be expired") if len(events_to_delete) > 0: - ids_to_delete = [e.id for e in events_to_delete] + ids_to_delete = [str(e.id) for e in events_to_delete] for i in range(0, len(ids_to_delete), CHUNK_SIZE): chunk = ids_to_delete[i : i + CHUNK_SIZE] logger.debug(f"Deleting {len(chunk)} events from the database") diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index 6a8da45b2..80bdaccd3 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -2,7 +2,7 @@ import logging import threading from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent -from typing import Dict +from typing import Any, Dict from frigate.comms.events_updater import EventEndPublisher, EventUpdateSubscriber from frigate.config import FrigateConfig @@ -15,7 +15,7 @@ from frigate.util.builtin import to_relative_box logger = logging.getLogger(__name__) -def should_update_db(prev_event: Event, current_event: Event) -> bool: +def should_update_db(prev_event: dict[str, Any], current_event: dict[str, Any]) -> bool: """If current_event has updated fields and (clip or snapshot).""" # If event is ending and was previously saved, always update to set end_time # This ensures events are properly ended even when alerts/detections are disabled @@ -47,7 +47,9 @@ def should_update_db(prev_event: Event, current_event: Event) -> bool: return False -def should_update_state(prev_event: Event, current_event: Event) -> bool: +def should_update_state( + prev_event: dict[str, Any], current_event: dict[str, Any] +) -> bool: """If current event should update state, but not necessarily update the db.""" if prev_event["stationary"] != current_event["stationary"]: return True @@ -74,7 +76,7 @@ class EventProcessor(threading.Thread): super().__init__(name="event_processor") self.config = config self.timeline_queue = timeline_queue - self.events_in_process: Dict[str, Event] = {} + self.events_in_process: Dict[str, dict[str, Any]] = {} self.stop_event = stop_event self.event_receiver = EventUpdateSubscriber() @@ -92,7 +94,7 @@ class EventProcessor(threading.Thread): if update == None: continue - source_type, event_type, camera, _, event_data = update + source_type, event_type, camera, _, event_data = update # type: ignore[misc] logger.debug( f"Event received: {source_type} {event_type} {camera} {event_data['id']}" @@ -140,7 +142,7 @@ class EventProcessor(threading.Thread): self, event_type: str, camera: str, - event_data: Event, + event_data: dict[str, Any], ) -> None: """handle tracked object event updates.""" updated_db = False @@ -150,8 +152,13 @@ class EventProcessor(threading.Thread): camera_config = self.config.cameras.get(camera) if camera_config is None: return + width = camera_config.detect.width height = camera_config.detect.height + + if width is None or height is None: + return + first_detector = list(self.config.detectors.values())[0] start_time = event_data["start_time"] @@ -222,8 +229,12 @@ class EventProcessor(threading.Thread): Event.thumbnail: event_data.get("thumbnail"), Event.has_clip: event_data["has_clip"], Event.has_snapshot: event_data["has_snapshot"], - Event.model_hash: first_detector.model.model_hash, - Event.model_type: first_detector.model.model_type, + Event.model_hash: first_detector.model.model_hash + if first_detector.model + else None, + Event.model_type: first_detector.model.model_type + if first_detector.model + else None, Event.detector_type: first_detector.type, Event.data: { "box": box, @@ -287,10 +298,10 @@ class EventProcessor(threading.Thread): if event_type == EventStateEnum.end: del self.events_in_process[event_data["id"]] - self.event_end_publisher.publish((event_data["id"], camera, updated_db)) + self.event_end_publisher.publish((event_data["id"], camera, updated_db)) # type: ignore[arg-type] def handle_external_detection( - self, event_type: EventStateEnum, event_data: Event + self, event_type: EventStateEnum, event_data: dict[str, Any] ) -> None: # Skip replay cameras if event_data.get("camera", "").startswith(REPLAY_CAMERA_PREFIX): diff --git a/frigate/mypy.ini b/frigate/mypy.ini index 78c156c38..3c643236f 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -45,9 +45,6 @@ ignore_errors = true [mypy-frigate.embeddings.*] ignore_errors = true -[mypy-frigate.events.*] -ignore_errors = true - [mypy-frigate.http] ignore_errors = true