Cleanup events mypy

This commit is contained in:
Nicolas Mowen 2026-03-26 11:58:25 -06:00
parent 3febd2d671
commit 55c995b099
4 changed files with 71 additions and 46 deletions

View File

@ -2,17 +2,19 @@
import datetime
import logging
import subprocess
import threading
import time
from multiprocessing.managers import DictProxy
from multiprocessing.synchronize import Event as MpEvent
from typing import Tuple
from typing import Any, Tuple
import numpy as np
from config.camera.ffmpeg import CameraFfmpegConfig
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig
from frigate.config import CameraConfig, CameraInput, FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
@ -35,8 +37,7 @@ from frigate.data_processing.real_time.audio_transcription import (
)
from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe, suppress_stderr_during
from frigate.object_detection.base import load_labels
from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.util.builtin import get_ffmpeg_arg_list, load_labels
from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg
from frigate.util.process import FrigateProcess
@ -49,7 +50,7 @@ except ModuleNotFoundError:
logger = logging.getLogger(__name__)
def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]:
def get_ffmpeg_command(ffmpeg: CameraFfmpegConfig) -> list[str]:
ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0]
input_args = get_ffmpeg_arg_list(ffmpeg.global_args) + (
parse_preset_input(ffmpeg_input.input_args, 1)
@ -102,9 +103,11 @@ class AudioProcessor(FrigateProcess):
threading.current_thread().name = "process:audio_manager"
if self.config.audio_transcription.enabled:
self.transcription_model_runner = AudioTranscriptionModelRunner(
self.config.audio_transcription.device,
self.config.audio_transcription.model_size,
self.transcription_model_runner: AudioTranscriptionModelRunner | None = (
AudioTranscriptionModelRunner(
self.config.audio_transcription.device or "AUTO",
self.config.audio_transcription.model_size,
)
)
else:
self.transcription_model_runner = None
@ -118,7 +121,7 @@ class AudioProcessor(FrigateProcess):
self.config,
self.camera_metrics,
self.transcription_model_runner,
self.stop_event,
self.stop_event, # type: ignore[arg-type]
)
audio_threads.append(audio_thread)
audio_thread.start()
@ -162,7 +165,7 @@ class AudioEventMaintainer(threading.Thread):
self.logger = logging.getLogger(f"audio.{self.camera_config.name}")
self.ffmpeg_cmd = get_ffmpeg_command(self.camera_config.ffmpeg)
self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio")
self.audio_listener = None
self.audio_listener: subprocess.Popen[Any] | None = None
self.audio_transcription_model_runner = audio_transcription_model_runner
self.transcription_processor = None
self.transcription_thread = None
@ -171,7 +174,7 @@ class AudioEventMaintainer(threading.Thread):
self.requestor = InterProcessRequestor()
self.config_subscriber = CameraConfigUpdateSubscriber(
None,
{self.camera_config.name: self.camera_config},
{str(self.camera_config.name): self.camera_config},
[
CameraConfigUpdateEnum.audio,
CameraConfigUpdateEnum.enabled,
@ -180,7 +183,10 @@ class AudioEventMaintainer(threading.Thread):
)
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value)
if self.config.audio_transcription.enabled:
if (
self.config.audio_transcription.enabled
and self.audio_transcription_model_runner is not None
):
# init the transcription processor for this camera
self.transcription_processor = AudioTranscriptionRealTimeProcessor(
config=self.config,
@ -200,11 +206,11 @@ class AudioEventMaintainer(threading.Thread):
self.was_enabled = camera.enabled
def detect_audio(self, audio) -> None:
def detect_audio(self, audio: np.ndarray) -> None:
if not self.camera_config.audio.enabled or self.stop_event.is_set():
return
audio_as_float = audio.astype(np.float32)
audio_as_float: np.ndarray = audio.astype(np.float32)
rms, dBFS = self.calculate_audio_levels(audio_as_float)
self.camera_metrics[self.camera_config.name].audio_rms.value = rms
@ -261,7 +267,7 @@ class AudioEventMaintainer(threading.Thread):
else:
self.transcription_processor.check_unload_model()
def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]:
def calculate_audio_levels(self, audio_as_float: np.ndarray) -> Tuple[float, float]:
# Calculate RMS (Root-Mean-Square) which represents the average signal amplitude
# Note: np.float32 isn't serializable, we must use np.float64 to publish the message
rms = np.sqrt(np.mean(np.absolute(np.square(audio_as_float))))
@ -296,6 +302,10 @@ class AudioEventMaintainer(threading.Thread):
self.logpipe.dump()
self.start_or_restart_ffmpeg()
if self.audio_listener is None or self.audio_listener.stdout is None:
log_and_restart()
return
try:
chunk = self.audio_listener.stdout.read(self.chunk_size)
@ -341,7 +351,10 @@ class AudioEventMaintainer(threading.Thread):
self.requestor.send_data(
EXPIRE_AUDIO_ACTIVITY, self.camera_config.name
)
stop_ffmpeg(self.audio_listener, self.logger)
if self.audio_listener:
stop_ffmpeg(self.audio_listener, self.logger)
self.audio_listener = None
self.was_enabled = enabled
continue
@ -367,7 +380,7 @@ class AudioEventMaintainer(threading.Thread):
class AudioTfl:
def __init__(self, stop_event: threading.Event, num_threads=2):
def __init__(self, stop_event: threading.Event, num_threads: int = 2) -> None:
self.stop_event = stop_event
self.num_threads = num_threads
self.labels = load_labels("/audio-labelmap.txt", prefill=521)
@ -382,7 +395,7 @@ class AudioTfl:
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def _detect_raw(self, tensor_input):
def _detect_raw(self, tensor_input: np.ndarray) -> np.ndarray:
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
detections = np.zeros((20, 6), np.float32)
@ -410,8 +423,10 @@ class AudioTfl:
return detections
def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE):
detections = []
def detect(
self, tensor_input: np.ndarray, threshold: float = AUDIO_MIN_CONFIDENCE
) -> list[tuple[str, float, tuple[float, float, float, float]]]:
detections: list[tuple[str, float, tuple[float, float, float, float]]] = []
if self.stop_event.is_set():
return detections

View File

@ -29,7 +29,7 @@ class EventCleanup(threading.Thread):
self.stop_event = stop_event
self.db = db
self.camera_keys = list(self.config.cameras.keys())
self.removed_camera_labels: list[str] = None
self.removed_camera_labels: list[Event] | None = None
self.camera_labels: dict[str, dict[str, Any]] = {}
def get_removed_camera_labels(self) -> list[Event]:
@ -37,7 +37,7 @@ class EventCleanup(threading.Thread):
if self.removed_camera_labels is None:
self.removed_camera_labels = list(
Event.select(Event.label)
.where(Event.camera.not_in(self.camera_keys))
.where(Event.camera.not_in(self.camera_keys)) # type: ignore[arg-type,call-arg,misc]
.distinct()
.execute()
)
@ -61,7 +61,7 @@ class EventCleanup(threading.Thread):
),
}
return self.camera_labels[camera]["labels"]
return self.camera_labels[camera]["labels"] # type: ignore[no-any-return]
def expire_snapshots(self) -> list[str]:
## Expire events from unlisted cameras based on the global config
@ -74,7 +74,9 @@ class EventCleanup(threading.Thread):
# loop over object types in db
for event in distinct_labels:
# get expiration time for this label
expire_days = retain_config.objects.get(event.label, retain_config.default)
expire_days = retain_config.objects.get(
str(event.label), retain_config.default
)
expire_after = (
datetime.datetime.now() - datetime.timedelta(days=expire_days)
@ -87,7 +89,7 @@ class EventCleanup(threading.Thread):
Event.thumbnail,
)
.where(
Event.camera.not_in(self.camera_keys),
Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc]
Event.start_time < expire_after,
Event.label == event.label,
Event.retain_indefinitely == False,
@ -109,16 +111,16 @@ class EventCleanup(threading.Thread):
# update the clips attribute for the db entry
query = Event.select(Event.id).where(
Event.camera.not_in(self.camera_keys),
Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc]
Event.start_time < expire_after,
Event.label == event.label,
Event.retain_indefinitely == False,
)
events_to_update = []
events_to_update: list[str] = []
for event in query.iterator():
events_to_update.append(event.id)
events_to_update.append(str(event.id))
if len(events_to_update) >= CHUNK_SIZE:
logger.debug(
f"Updating {update_params} for {len(events_to_update)} events"
@ -150,7 +152,7 @@ class EventCleanup(threading.Thread):
for event in distinct_labels:
# get expiration time for this label
expire_days = retain_config.objects.get(
event.label, retain_config.default
str(event.label), retain_config.default
)
expire_after = (
@ -177,7 +179,7 @@ class EventCleanup(threading.Thread):
# only snapshots are stored in /clips
# so no need to delete mp4 files
for event in expired_events:
events_to_update.append(event.id)
events_to_update.append(str(event.id))
deleted = delete_event_snapshot(event)
if not deleted:
@ -214,7 +216,7 @@ class EventCleanup(threading.Thread):
Event.camera,
)
.where(
Event.camera.not_in(self.camera_keys),
Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc]
Event.start_time < expire_after,
Event.retain_indefinitely == False,
)
@ -245,7 +247,7 @@ class EventCleanup(threading.Thread):
# update the clips attribute for the db entry
query = Event.select(Event.id).where(
Event.camera.not_in(self.camera_keys),
Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc]
Event.start_time < expire_after,
Event.retain_indefinitely == False,
)
@ -358,7 +360,7 @@ class EventCleanup(threading.Thread):
logger.debug(f"Found {len(events_to_delete)} events that can be expired")
if len(events_to_delete) > 0:
ids_to_delete = [e.id for e in events_to_delete]
ids_to_delete = [str(e.id) for e in events_to_delete]
for i in range(0, len(ids_to_delete), CHUNK_SIZE):
chunk = ids_to_delete[i : i + CHUNK_SIZE]
logger.debug(f"Deleting {len(chunk)} events from the database")

View File

@ -2,7 +2,7 @@ import logging
import threading
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Dict
from typing import Any, Dict
from frigate.comms.events_updater import EventEndPublisher, EventUpdateSubscriber
from frigate.config import FrigateConfig
@ -15,7 +15,7 @@ from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__)
def should_update_db(prev_event: Event, current_event: Event) -> bool:
def should_update_db(prev_event: dict[str, Any], current_event: dict[str, Any]) -> bool:
"""If current_event has updated fields and (clip or snapshot)."""
# If event is ending and was previously saved, always update to set end_time
# This ensures events are properly ended even when alerts/detections are disabled
@ -47,7 +47,9 @@ def should_update_db(prev_event: Event, current_event: Event) -> bool:
return False
def should_update_state(prev_event: Event, current_event: Event) -> bool:
def should_update_state(
prev_event: dict[str, Any], current_event: dict[str, Any]
) -> bool:
"""If current event should update state, but not necessarily update the db."""
if prev_event["stationary"] != current_event["stationary"]:
return True
@ -74,7 +76,7 @@ class EventProcessor(threading.Thread):
super().__init__(name="event_processor")
self.config = config
self.timeline_queue = timeline_queue
self.events_in_process: Dict[str, Event] = {}
self.events_in_process: Dict[str, dict[str, Any]] = {}
self.stop_event = stop_event
self.event_receiver = EventUpdateSubscriber()
@ -92,7 +94,7 @@ class EventProcessor(threading.Thread):
if update == None:
continue
source_type, event_type, camera, _, event_data = update
source_type, event_type, camera, _, event_data = update # type: ignore[misc]
logger.debug(
f"Event received: {source_type} {event_type} {camera} {event_data['id']}"
@ -140,7 +142,7 @@ class EventProcessor(threading.Thread):
self,
event_type: str,
camera: str,
event_data: Event,
event_data: dict[str, Any],
) -> None:
"""handle tracked object event updates."""
updated_db = False
@ -150,8 +152,13 @@ class EventProcessor(threading.Thread):
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return
width = camera_config.detect.width
height = camera_config.detect.height
if width is None or height is None:
return
first_detector = list(self.config.detectors.values())[0]
start_time = event_data["start_time"]
@ -222,8 +229,12 @@ class EventProcessor(threading.Thread):
Event.thumbnail: event_data.get("thumbnail"),
Event.has_clip: event_data["has_clip"],
Event.has_snapshot: event_data["has_snapshot"],
Event.model_hash: first_detector.model.model_hash,
Event.model_type: first_detector.model.model_type,
Event.model_hash: first_detector.model.model_hash
if first_detector.model
else None,
Event.model_type: first_detector.model.model_type
if first_detector.model
else None,
Event.detector_type: first_detector.type,
Event.data: {
"box": box,
@ -287,10 +298,10 @@ class EventProcessor(threading.Thread):
if event_type == EventStateEnum.end:
del self.events_in_process[event_data["id"]]
self.event_end_publisher.publish((event_data["id"], camera, updated_db))
self.event_end_publisher.publish((event_data["id"], camera, updated_db)) # type: ignore[arg-type]
def handle_external_detection(
self, event_type: EventStateEnum, event_data: Event
self, event_type: EventStateEnum, event_data: dict[str, Any]
) -> None:
# Skip replay cameras
if event_data.get("camera", "").startswith(REPLAY_CAMERA_PREFIX):

View File

@ -45,9 +45,6 @@ ignore_errors = true
[mypy-frigate.embeddings.*]
ignore_errors = true
[mypy-frigate.events.*]
ignore_errors = true
[mypy-frigate.http]
ignore_errors = true