From 0cf9d7d5b1c9bb477c253643e0693d4965a68ae2 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 25 Mar 2026 18:30:59 -0600 Subject: [PATCH] Inverse mypy and more mypy fixes (#22645) * organize * Improve storage mypy * Cleanup timeline mypy * Cleanup recording mypy * Improve review mypy * Add review mypy * Inverse mypy * Fix ffmpeg presets * fix template thing * Cleanup camera --- .github/workflows/pr_template_check.yml | 2 +- frigate/camera/__init__.py | 45 +++---- frigate/camera/activity_manager.py | 10 +- frigate/camera/maintainer.py | 10 +- frigate/camera/state.py | 67 ++++++----- frigate/ffmpeg_presets.py | 16 +-- frigate/mypy.ini | 80 +++++-------- frigate/record/cleanup.py | 22 ++-- frigate/record/export.py | 18 ++- frigate/record/maintainer.py | 30 +++-- frigate/review/maintainer.py | 149 ++++++++++++------------ frigate/storage.py | 11 +- frigate/timeline.py | 16 ++- frigate/track/tracked_object.py | 4 +- 14 files changed, 245 insertions(+), 235 deletions(-) diff --git a/.github/workflows/pr_template_check.yml b/.github/workflows/pr_template_check.yml index 7c4bb52a4..6b7d950e3 100644 --- a/.github/workflows/pr_template_check.yml +++ b/.github/workflows/pr_template_check.yml @@ -16,7 +16,7 @@ jobs: uses: actions/github-script@v7 with: script: | - const maintainers = ['blakeblackshear', 'NickM-27', 'hawkeye217']; + const maintainers = ['blakeblackshear', 'NickM-27', 'hawkeye217', 'dependabot[bot]']; const author = context.payload.pull_request.user.login; if (maintainers.includes(author)) { diff --git a/frigate/camera/__init__.py b/frigate/camera/__init__.py index 0461c98cb..85831653e 100644 --- a/frigate/camera/__init__.py +++ b/frigate/camera/__init__.py @@ -1,26 +1,27 @@ import multiprocessing as mp -from multiprocessing.managers import SyncManager +import queue +from multiprocessing.managers import SyncManager, ValueProxy from multiprocessing.sharedctypes import Synchronized from multiprocessing.synchronize import Event class CameraMetrics: - camera_fps: Synchronized - detection_fps: Synchronized - detection_frame: Synchronized - process_fps: Synchronized - skipped_fps: Synchronized - read_start: Synchronized - audio_rms: Synchronized - audio_dBFS: Synchronized + camera_fps: ValueProxy[float] + detection_fps: ValueProxy[float] + detection_frame: ValueProxy[float] + process_fps: ValueProxy[float] + skipped_fps: ValueProxy[float] + read_start: ValueProxy[float] + audio_rms: ValueProxy[float] + audio_dBFS: ValueProxy[float] - frame_queue: mp.Queue + frame_queue: queue.Queue - process_pid: Synchronized - capture_process_pid: Synchronized - ffmpeg_pid: Synchronized - reconnects_last_hour: Synchronized - stalls_last_hour: Synchronized + process_pid: ValueProxy[int] + capture_process_pid: ValueProxy[int] + ffmpeg_pid: ValueProxy[int] + reconnects_last_hour: ValueProxy[int] + stalls_last_hour: ValueProxy[int] def __init__(self, manager: SyncManager): self.camera_fps = manager.Value("d", 0) @@ -56,14 +57,14 @@ class PTZMetrics: reset: Event def __init__(self, *, autotracker_enabled: bool): - self.autotracker_enabled = mp.Value("i", autotracker_enabled) + self.autotracker_enabled = mp.Value("i", autotracker_enabled) # type: ignore[assignment] - self.start_time = mp.Value("d", 0) - self.stop_time = mp.Value("d", 0) - self.frame_time = mp.Value("d", 0) - self.zoom_level = mp.Value("d", 0) - self.max_zoom = mp.Value("d", 0) - self.min_zoom = mp.Value("d", 0) + self.start_time = mp.Value("d", 0) # type: ignore[assignment] + self.stop_time = mp.Value("d", 0) # type: ignore[assignment] + self.frame_time = mp.Value("d", 0) # type: ignore[assignment] + self.zoom_level = mp.Value("d", 0) # type: ignore[assignment] + self.max_zoom = mp.Value("d", 0) # type: ignore[assignment] + self.min_zoom = mp.Value("d", 0) # type: ignore[assignment] self.tracking_active = mp.Event() self.motor_stopped = mp.Event() diff --git a/frigate/camera/activity_manager.py b/frigate/camera/activity_manager.py index 3f229e490..38425add9 100644 --- a/frigate/camera/activity_manager.py +++ b/frigate/camera/activity_manager.py @@ -37,6 +37,9 @@ class CameraActivityManager: self.__init_camera(camera_config) def __init_camera(self, camera_config: CameraConfig) -> None: + if camera_config.name is None: + return + self.last_camera_activity[camera_config.name] = {} self.camera_all_object_counts[camera_config.name] = Counter() self.camera_active_object_counts[camera_config.name] = Counter() @@ -114,7 +117,7 @@ class CameraActivityManager: self.last_camera_activity = new_activity def compare_camera_activity( - self, camera: str, new_activity: dict[str, Any] + self, camera: str, new_activity: list[dict[str, Any]] ) -> None: all_objects = Counter( obj["label"].replace("-verified", "") for obj in new_activity @@ -175,6 +178,9 @@ class AudioActivityManager: self.__init_camera(camera_config) def __init_camera(self, camera_config: CameraConfig) -> None: + if camera_config.name is None: + return + self.current_audio_detections[camera_config.name] = {} def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None: @@ -202,7 +208,7 @@ class AudioActivityManager: def compare_audio_activity( self, camera: str, new_detections: list[tuple[str, float]], now: float - ) -> None: + ) -> bool: camera_config = self.config.cameras.get(camera) if camera_config is None: return False diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py index 9cfdcc7f3..c4ddc51e8 100644 --- a/frigate/camera/maintainer.py +++ b/frigate/camera/maintainer.py @@ -102,7 +102,7 @@ class CameraMaintainer(threading.Thread): f"recommend increasing it to at least {shm_stats['min_shm']}MB." ) - return shm_stats["shm_frame_count"] + return int(shm_stats["shm_frame_count"]) def __start_camera_processor( self, name: str, config: CameraConfig, runtime: bool = False @@ -152,10 +152,10 @@ class CameraMaintainer(threading.Thread): camera_stop_event, self.config.logger, ) - self.camera_processes[config.name] = camera_process + self.camera_processes[name] = camera_process camera_process.start() - self.camera_metrics[config.name].process_pid.value = camera_process.pid - logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") + self.camera_metrics[name].process_pid.value = camera_process.pid + logger.info(f"Camera processor started for {name}: {camera_process.pid}") def __start_camera_capture( self, name: str, config: CameraConfig, runtime: bool = False @@ -219,7 +219,7 @@ class CameraMaintainer(threading.Thread): logger.info(f"Closing frame queue for {camera}") empty_and_close_queue(self.camera_metrics[camera].frame_queue) - def run(self): + def run(self) -> None: self.__init_historical_regions() # start camera processes diff --git a/frigate/camera/state.py b/frigate/camera/state.py index f609a05f9..7355afe6b 100644 --- a/frigate/camera/state.py +++ b/frigate/camera/state.py @@ -31,26 +31,26 @@ logger = logging.getLogger(__name__) class CameraState: def __init__( self, - name, + name: str, config: FrigateConfig, frame_manager: SharedMemoryFrameManager, ptz_autotracker_thread: PtzAutoTrackerThread, - ): + ) -> None: self.name = name self.config = config self.camera_config = config.cameras[name] self.frame_manager = frame_manager self.best_objects: dict[str, TrackedObject] = {} self.tracked_objects: dict[str, TrackedObject] = {} - self.frame_cache = {} - self.zone_objects = defaultdict(list) + self.frame_cache: dict[float, dict[str, Any]] = {} + self.zone_objects: defaultdict[str, list[Any]] = defaultdict(list) self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8) self.current_frame_lock = threading.Lock() self.current_frame_time = 0.0 - self.motion_boxes = [] - self.regions = [] - self.previous_frame_id = None - self.callbacks = defaultdict(list) + self.motion_boxes: list[tuple[int, int, int, int]] = [] + self.regions: list[tuple[int, int, int, int]] = [] + self.previous_frame_id: str | None = None + self.callbacks: defaultdict[str, list[Callable]] = defaultdict(list) self.ptz_autotracker_thread = ptz_autotracker_thread self.prev_enabled = self.camera_config.enabled @@ -62,10 +62,10 @@ class CameraState: motion_boxes = self.motion_boxes.copy() regions = self.regions.copy() - frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420) + frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420) # type: ignore[assignment] # draw on the frame if draw_options.get("mask"): - mask_overlay = np.where(self.camera_config.motion.rasterized_mask == [0]) + mask_overlay = np.where(self.camera_config.motion.rasterized_mask == [0]) # type: ignore[attr-defined] frame_copy[mask_overlay] = [0, 0, 0] if draw_options.get("bounding_boxes"): @@ -97,7 +97,7 @@ class CameraState: and obj["id"] == self.ptz_autotracker_thread.ptz_autotracker.tracked_object[ self.name - ].obj_data["id"] + ].obj_data["id"] # type: ignore[attr-defined] and obj["frame_time"] == frame_time ): thickness = 5 @@ -109,10 +109,12 @@ class CameraState: if ( self.camera_config.onvif.autotracking.zooming != ZoomingModeEnum.disabled + and self.camera_config.detect.width is not None + and self.camera_config.detect.height is not None ): max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[ self.name - ]["max_target_box"] + ]["max_target_box"] # type: ignore[index] side_length = max_target_box * ( max( self.camera_config.detect.width, @@ -221,14 +223,14 @@ class CameraState: ) if draw_options.get("timestamp"): - color = self.camera_config.timestamp_style.color + ts_color = self.camera_config.timestamp_style.color draw_timestamp( frame_copy, frame_time, self.camera_config.timestamp_style.format, font_effect=self.camera_config.timestamp_style.effect, font_thickness=self.camera_config.timestamp_style.thickness, - font_color=(color.blue, color.green, color.red), + font_color=(ts_color.blue, ts_color.green, ts_color.red), position=self.camera_config.timestamp_style.position, ) @@ -273,10 +275,10 @@ class CameraState: return frame_copy - def finished(self, obj_id): + def finished(self, obj_id: str) -> None: del self.tracked_objects[obj_id] - def on(self, event_type: str, callback: Callable): + def on(self, event_type: str, callback: Callable[..., Any]) -> None: self.callbacks[event_type].append(callback) def update( @@ -286,7 +288,7 @@ class CameraState: current_detections: dict[str, dict[str, Any]], motion_boxes: list[tuple[int, int, int, int]], regions: list[tuple[int, int, int, int]], - ): + ) -> None: current_frame = self.frame_manager.get( frame_name, self.camera_config.frame_shape_yuv ) @@ -313,7 +315,7 @@ class CameraState: f"{self.name}: New object, adding {frame_time} to frame cache for {id}" ) self.frame_cache[frame_time] = { - "frame": np.copy(current_frame), + "frame": np.copy(current_frame), # type: ignore[arg-type] "object_id": id, } @@ -356,7 +358,8 @@ class CameraState: if thumb_update and current_frame is not None: # ensure this frame is stored in the cache if ( - updated_obj.thumbnail_data["frame_time"] == frame_time + updated_obj.thumbnail_data is not None + and updated_obj.thumbnail_data["frame_time"] == frame_time and frame_time not in self.frame_cache ): logger.debug( @@ -397,7 +400,7 @@ class CameraState: # TODO: can i switch to looking this up and only changing when an event ends? # maintain best objects - camera_activity: dict[str, list[Any]] = { + camera_activity: dict[str, Any] = { "motion": len(motion_boxes) > 0, "objects": [], } @@ -411,10 +414,7 @@ class CameraState: sub_label = None if obj.obj_data.get("sub_label"): - if ( - obj.obj_data.get("sub_label")[0] - in self.config.model.all_attributes - ): + if obj.obj_data["sub_label"][0] in self.config.model.all_attributes: label = obj.obj_data["sub_label"][0] else: label = f"{object_type}-verified" @@ -449,14 +449,19 @@ class CameraState: # if the object is a higher score than the current best score # or the current object is older than desired, use the new object if ( - is_better_thumbnail( + current_best.thumbnail_data is not None + and obj.thumbnail_data is not None + and is_better_thumbnail( object_type, current_best.thumbnail_data, obj.thumbnail_data, self.camera_config.frame_shape, ) - or (now - current_best.thumbnail_data["frame_time"]) - > self.camera_config.best_image_timeout + or ( + current_best.thumbnail_data is not None + and (now - current_best.thumbnail_data["frame_time"]) + > self.camera_config.best_image_timeout + ) ): self.send_mqtt_snapshot(obj, object_type) else: @@ -472,7 +477,9 @@ class CameraState: if obj.thumbnail_data is not None } current_best_frames = { - obj.thumbnail_data["frame_time"] for obj in self.best_objects.values() + obj.thumbnail_data["frame_time"] + for obj in self.best_objects.values() + if obj.thumbnail_data is not None } thumb_frames_to_delete = [ t @@ -540,7 +547,7 @@ class CameraState: with open( os.path.join( CLIPS_DIR, - f"{self.camera_config.name}-{event_id}-clean.webp", + f"{self.name}-{event_id}-clean.webp", ), "wb", ) as p: @@ -549,7 +556,7 @@ class CameraState: # create thumbnail with max height of 175 and save width = int(175 * img_frame.shape[1] / img_frame.shape[0]) thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA) - thumb_path = os.path.join(THUMB_DIR, self.camera_config.name) + thumb_path = os.path.join(THUMB_DIR, self.name) os.makedirs(thumb_path, exist_ok=True) cv2.imwrite(os.path.join(thumb_path, f"{event_id}.webp"), thumb) diff --git a/frigate/ffmpeg_presets.py b/frigate/ffmpeg_presets.py index 0652ec645..4ebbd6c80 100644 --- a/frigate/ffmpeg_presets.py +++ b/frigate/ffmpeg_presets.py @@ -3,7 +3,7 @@ import logging import os from enum import Enum -from typing import Any +from typing import Any, Optional from frigate.const import ( FFMPEG_HVC1_ARGS, @@ -215,7 +215,7 @@ def parse_preset_hardware_acceleration_decode( width: int, height: int, gpu: int, -) -> list[str]: +) -> Optional[list[str]]: """Return the correct preset if in preset format otherwise return None.""" if not isinstance(arg, str): return None @@ -242,9 +242,9 @@ def parse_preset_hardware_acceleration_scale( else: scale = PRESETS_HW_ACCEL_SCALE.get(arg, PRESETS_HW_ACCEL_SCALE["default"]) - scale = scale.format(fps, width, height).split(" ") - scale.extend(detect_args) - return scale + scale_args = scale.format(fps, width, height).split(" ") + scale_args.extend(detect_args) + return scale_args class EncodeTypeEnum(str, Enum): @@ -420,7 +420,7 @@ PRESETS_INPUT = { } -def parse_preset_input(arg: Any, detect_fps: int) -> list[str]: +def parse_preset_input(arg: Any, detect_fps: int) -> Optional[list[str]]: """Return the correct preset if in preset format otherwise return None.""" if not isinstance(arg, str): return None @@ -530,7 +530,9 @@ PRESETS_RECORD_OUTPUT = { } -def parse_preset_output_record(arg: Any, force_record_hvc1: bool) -> list[str]: +def parse_preset_output_record( + arg: Any, force_record_hvc1: bool +) -> Optional[list[str]]: """Return the correct preset if in preset format otherwise return None.""" if not isinstance(arg, str): return None diff --git a/frigate/mypy.ini b/frigate/mypy.ini index 9c44e9f38..cde4a3fe6 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -22,68 +22,46 @@ warn_unreachable = true no_implicit_reexport = true [mypy-frigate.*] +ignore_errors = false + +[mypy-frigate.api.*] ignore_errors = true -[mypy-frigate.__main__] -ignore_errors = false -disallow_untyped_calls = false +[mypy-frigate.config.*] +ignore_errors = true -[mypy-frigate.app] -ignore_errors = false -disallow_untyped_calls = false +[mypy-frigate.data_processing.*] +ignore_errors = true -[mypy-frigate.const] -ignore_errors = false +[mypy-frigate.db.*] +ignore_errors = true -[mypy-frigate.comms.*] -ignore_errors = false +[mypy-frigate.debug_replay] +ignore_errors = true -[mypy-frigate.events] -ignore_errors = false +[mypy-frigate.detectors.*] +ignore_errors = true -[mypy-frigate.genai.*] -ignore_errors = false +[mypy-frigate.embeddings.*] +ignore_errors = true -[mypy-frigate.jobs.*] -ignore_errors = false +[mypy-frigate.events.*] +ignore_errors = true -[mypy-frigate.motion.*] -ignore_errors = false +[mypy-frigate.http] +ignore_errors = true -[mypy-frigate.object_detection.*] -ignore_errors = false +[mypy-frigate.ptz.*] +ignore_errors = true -[mypy-frigate.output.*] -ignore_errors = false +[mypy-frigate.stats.*] +ignore_errors = true -[mypy-frigate.ptz] -ignore_errors = false +[mypy-frigate.test.*] +ignore_errors = true -[mypy-frigate.log] -ignore_errors = false +[mypy-frigate.util.*] +ignore_errors = true -[mypy-frigate.models] -ignore_errors = false - -[mypy-frigate.plus] -ignore_errors = false - -[mypy-frigate.stats] -ignore_errors = false - -[mypy-frigate.track.*] -ignore_errors = false - -[mypy-frigate.types] -ignore_errors = false - -[mypy-frigate.version] -ignore_errors = false - -[mypy-frigate.watchdog] -ignore_errors = false -disallow_untyped_calls = false - - -[mypy-frigate.service_manager.*] -ignore_errors = false +[mypy-frigate.video.*] +ignore_errors = true diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index 9122934a1..e41a5bf39 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -7,6 +7,7 @@ import os import threading from multiprocessing.synchronize import Event as MpEvent from pathlib import Path +from typing import Any from playhouse.sqlite_ext import SqliteExtDatabase @@ -60,7 +61,9 @@ class RecordingCleanup(threading.Thread): db.execute_sql("PRAGMA wal_checkpoint(TRUNCATE);") db.close() - def expire_review_segments(self, config: CameraConfig, now: datetime) -> set[Path]: + def expire_review_segments( + self, config: CameraConfig, now: datetime.datetime + ) -> set[Path]: """Delete review segments that are expired""" alert_expire_date = ( now - datetime.timedelta(days=config.record.alerts.retain.days) @@ -68,7 +71,7 @@ class RecordingCleanup(threading.Thread): detection_expire_date = ( now - datetime.timedelta(days=config.record.detections.retain.days) ).timestamp() - expired_reviews: ReviewSegment = ( + expired_reviews = ( ReviewSegment.select(ReviewSegment.id, ReviewSegment.thumb_path) .where(ReviewSegment.camera == config.name) .where( @@ -109,13 +112,13 @@ class RecordingCleanup(threading.Thread): continuous_expire_date: float, motion_expire_date: float, config: CameraConfig, - reviews: ReviewSegment, + reviews: list[Any], ) -> set[Path]: """Delete recordings for existing camera based on retention config.""" # Get the timestamp for cutoff of retained days # Get recordings to check for expiration - recordings: Recordings = ( + recordings = ( Recordings.select( Recordings.id, Recordings.start_time, @@ -148,13 +151,12 @@ class RecordingCleanup(threading.Thread): review_start = 0 deleted_recordings = set() kept_recordings: list[tuple[float, float]] = [] - recording: Recordings for recording in recordings: keep = False mode = None # Now look for a reason to keep this recording segment for idx in range(review_start, len(reviews)): - review: ReviewSegment = reviews[idx] + review = reviews[idx] severity = review.severity pre_capture = config.record.get_review_pre_capture(severity) post_capture = config.record.get_review_post_capture(severity) @@ -214,7 +216,7 @@ class RecordingCleanup(threading.Thread): Recordings.id << deleted_recordings_list[i : i + max_deletes] ).execute() - previews: list[Previews] = ( + previews = ( Previews.select( Previews.id, Previews.start_time, @@ -290,13 +292,13 @@ class RecordingCleanup(threading.Thread): expire_before = ( datetime.datetime.now() - datetime.timedelta(days=expire_days) ).timestamp() - no_camera_recordings: Recordings = ( + no_camera_recordings = ( Recordings.select( Recordings.id, Recordings.path, ) .where( - Recordings.camera.not_in(list(self.config.cameras.keys())), + Recordings.camera.not_in(list(self.config.cameras.keys())), # type: ignore[call-arg, arg-type, misc] Recordings.end_time < expire_before, ) .namedtuples() @@ -341,7 +343,7 @@ class RecordingCleanup(threading.Thread): ).timestamp() # Get all the reviews to check against - reviews: ReviewSegment = ( + reviews = ( ReviewSegment.select( ReviewSegment.start_time, ReviewSegment.end_time, diff --git a/frigate/record/export.py b/frigate/record/export.py index f8a72a79a..24bdb423f 100644 --- a/frigate/record/export.py +++ b/frigate/record/export.py @@ -85,7 +85,7 @@ def validate_ffmpeg_args(args: str) -> tuple[bool, str]: return True, "" -def lower_priority(): +def lower_priority() -> None: os.nice(PROCESS_PRIORITY_LOW) @@ -150,7 +150,7 @@ class RecordingExporter(threading.Thread): ): # has preview mp4 try: - preview: Previews = ( + preview = ( Previews.select( Previews.camera, Previews.path, @@ -231,20 +231,19 @@ class RecordingExporter(threading.Thread): def get_record_export_command( self, video_path: str, use_hwaccel: bool = True - ) -> list[str]: + ) -> tuple[list[str], str | list[str]]: # handle case where internal port is a string with ip:port internal_port = self.config.networking.listen.internal if type(internal_port) is str: internal_port = int(internal_port.split(":")[-1]) + playlist_lines: list[str] = [] if (self.end_time - self.start_time) <= MAX_PLAYLIST_SECONDS: - playlist_lines = f"http://127.0.0.1:{internal_port}/vod/{self.camera}/start/{self.start_time}/end/{self.end_time}/index.m3u8" + playlist_url = f"http://127.0.0.1:{internal_port}/vod/{self.camera}/start/{self.start_time}/end/{self.end_time}/index.m3u8" ffmpeg_input = ( - f"-y -protocol_whitelist pipe,file,http,tcp -i {playlist_lines}" + f"-y -protocol_whitelist pipe,file,http,tcp -i {playlist_url}" ) else: - playlist_lines = [] - # get full set of recordings export_recordings = ( Recordings.select( @@ -305,7 +304,7 @@ class RecordingExporter(threading.Thread): def get_preview_export_command( self, video_path: str, use_hwaccel: bool = True - ) -> list[str]: + ) -> tuple[list[str], list[str]]: playlist_lines = [] codec = "-c copy" @@ -355,7 +354,6 @@ class RecordingExporter(threading.Thread): .iterator() ) - preview: Previews for preview in export_previews: playlist_lines.append(f"file '{preview.path}'") @@ -493,7 +491,7 @@ class RecordingExporter(threading.Thread): logger.debug(f"Finished exporting {video_path}") -def migrate_exports(ffmpeg: FfmpegConfig, camera_names: list[str]): +def migrate_exports(ffmpeg: FfmpegConfig, camera_names: list[str]) -> None: Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True) exports = [] diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 6290a2405..e3409652e 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -266,7 +266,7 @@ class RecordingMaintainer(threading.Thread): # get all reviews with the end time after the start of the oldest cache file # or with end_time None - reviews: ReviewSegment = ( + reviews = ( ReviewSegment.select( ReviewSegment.start_time, ReviewSegment.end_time, @@ -301,7 +301,9 @@ class RecordingMaintainer(threading.Thread): RecordingsDataTypeEnum.saved.value, ) - recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) + recordings_to_insert: list[Optional[dict[str, Any]]] = await asyncio.gather( + *tasks + ) # fire and forget recordings entries self.requestor.send_data( @@ -314,8 +316,8 @@ class RecordingMaintainer(threading.Thread): self.end_time_cache.pop(cache_path, None) async def validate_and_move_segment( - self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any] - ) -> Optional[Recordings]: + self, camera: str, reviews: Any, recording: dict[str, Any] + ) -> Optional[dict[str, Any]]: cache_path: str = recording["cache_path"] start_time: datetime.datetime = recording["start_time"] @@ -456,6 +458,8 @@ class RecordingMaintainer(threading.Thread): if end_time < retain_cutoff: self.drop_segment(cache_path) + return None + def _compute_motion_heatmap( self, camera: str, motion_boxes: list[tuple[int, int, int, int]] ) -> dict[str, int] | None: @@ -481,7 +485,7 @@ class RecordingMaintainer(threading.Thread): frame_width = camera_config.detect.width frame_height = camera_config.detect.height - if frame_width <= 0 or frame_height <= 0: + if not frame_width or frame_width <= 0 or not frame_height or frame_height <= 0: return None GRID_SIZE = 16 @@ -575,13 +579,13 @@ class RecordingMaintainer(threading.Thread): duration: float, cache_path: str, store_mode: RetainModeEnum, - ) -> Optional[Recordings]: + ) -> Optional[dict[str, Any]]: segment_info = self.segment_stats(camera, start_time, end_time) # check if the segment shouldn't be stored if segment_info.should_discard_segment(store_mode): self.drop_segment(cache_path) - return + return None # directory will be in utc due to start_time being in utc directory = os.path.join( @@ -620,7 +624,8 @@ class RecordingMaintainer(threading.Thread): if p.returncode != 0: logger.error(f"Unable to convert {cache_path} to {file_path}") - logger.error((await p.stderr.read()).decode("ascii")) + if p.stderr: + logger.error((await p.stderr.read()).decode("ascii")) return None else: logger.debug( @@ -684,11 +689,16 @@ class RecordingMaintainer(threading.Thread): stale_frame_count_threshold = 10 # empty the object recordings info queue while True: - (topic, data) = self.detection_subscriber.check_for_update( + result = self.detection_subscriber.check_for_update( timeout=FAST_QUEUE_TIMEOUT ) - if not topic: + if not result: + break + + topic, data = result + + if not topic or not data: break if topic == DetectionTypeEnum.video.value: diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 4dc1d8e6a..cfc59744c 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -31,7 +31,7 @@ from frigate.const import ( ) from frigate.models import ReviewSegment from frigate.review.types import SeverityEnum -from frigate.track.object_processing import ManualEventState, TrackedObject +from frigate.track.object_processing import ManualEventState from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop logger = logging.getLogger(__name__) @@ -69,7 +69,9 @@ class PendingReviewSegment: self.last_alert_time = frame_time # thumbnail - self._frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8) + self._frame: np.ndarray[Any, Any] = np.zeros( + (THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8 + ) self.has_frame = False self.frame_active_count = 0 self.frame_path = os.path.join( @@ -77,8 +79,11 @@ class PendingReviewSegment: ) def update_frame( - self, camera_config: CameraConfig, frame, objects: list[TrackedObject] - ): + self, + camera_config: CameraConfig, + frame: np.ndarray, + objects: list[dict[str, Any]], + ) -> None: min_x = camera_config.frame_shape[1] min_y = camera_config.frame_shape[0] max_x = 0 @@ -114,7 +119,7 @@ class PendingReviewSegment: self.frame_path, self._frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60] ) - def save_full_frame(self, camera_config: CameraConfig, frame): + def save_full_frame(self, camera_config: CameraConfig, frame: np.ndarray) -> None: color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0]) self._frame = cv2.resize( @@ -165,13 +170,13 @@ class ActiveObjects: self, frame_time: float, camera_config: CameraConfig, - all_objects: list[TrackedObject], + all_objects: list[dict[str, Any]], ): self.camera_config = camera_config # get current categorization of objects to know if # these objects are currently being categorized - self.categorized_objects = { + self.categorized_objects: dict[str, list[dict[str, Any]]] = { "alerts": [], "detections": [], } @@ -250,7 +255,7 @@ class ActiveObjects: return False - def get_all_objects(self) -> list[TrackedObject]: + def get_all_objects(self) -> list[dict[str, Any]]: return ( self.categorized_objects["alerts"] + self.categorized_objects["detections"] ) @@ -309,7 +314,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update, segment.camera) + self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type] self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -318,8 +323,8 @@ class ReviewSegmentMaintainer(threading.Thread): self, segment: PendingReviewSegment, camera_config: CameraConfig, - frame, - objects: list[TrackedObject], + frame: Optional[np.ndarray], + objects: list[dict[str, Any]], prev_data: dict[str, Any], ) -> None: """Update segment.""" @@ -337,7 +342,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update, segment.camera) + self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type] self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -346,7 +351,7 @@ class ReviewSegmentMaintainer(threading.Thread): self, segment: PendingReviewSegment, prev_data: dict[str, Any], - ) -> float: + ) -> Any: """End segment.""" final_data = segment.get_data(ended=True) end_time = final_data[ReviewSegment.end_time.name] @@ -360,24 +365,25 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update, segment.camera) + self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type] self.requestor.send_data(f"{segment.camera}/review_status", "NONE") self.active_review_segments[segment.camera] = None return end_time - def forcibly_end_segment(self, camera: str) -> float: + def forcibly_end_segment(self, camera: str) -> Any: """Forcibly end the pending segment for a camera.""" segment = self.active_review_segments.get(camera) if segment: prev_data = segment.get_data(False) return self._publish_segment_end(segment, prev_data) + return None def update_existing_segment( self, segment: PendingReviewSegment, frame_name: str, frame_time: float, - objects: list[TrackedObject], + objects: list[dict[str, Any]], ) -> None: """Validate if existing review segment should continue.""" camera_config = self.config.cameras[segment.camera] @@ -492,8 +498,11 @@ class ReviewSegmentMaintainer(threading.Thread): except FileNotFoundError: return - if segment.severity == SeverityEnum.alert and frame_time > ( - segment.last_alert_time + camera_config.review.alerts.cutoff_time + if ( + segment.severity == SeverityEnum.alert + and segment.last_alert_time is not None + and frame_time + > (segment.last_alert_time + camera_config.review.alerts.cutoff_time) ): needs_new_detection = ( segment.last_detection_time > segment.last_alert_time @@ -516,23 +525,18 @@ class ReviewSegmentMaintainer(threading.Thread): new_zones.update(o["current_zones"]) if new_detections: - self.active_review_segments[activity.camera_config.name] = ( - PendingReviewSegment( - activity.camera_config.name, - end_time, - SeverityEnum.detection, - new_detections, - sub_labels={}, - audio=set(), - zones=list(new_zones), - ) + new_segment = PendingReviewSegment( + segment.camera, + end_time, + SeverityEnum.detection, + new_detections, + sub_labels={}, + audio=set(), + zones=list(new_zones), ) - self._publish_segment_start( - self.active_review_segments[activity.camera_config.name] - ) - self.active_review_segments[ - activity.camera_config.name - ].last_detection_time = last_detection_time + self.active_review_segments[segment.camera] = new_segment + self._publish_segment_start(new_segment) + new_segment.last_detection_time = last_detection_time elif segment.severity == SeverityEnum.detection and frame_time > ( segment.last_detection_time + camera_config.review.detections.cutoff_time @@ -544,7 +548,7 @@ class ReviewSegmentMaintainer(threading.Thread): camera: str, frame_name: str, frame_time: float, - objects: list[TrackedObject], + objects: list[dict[str, Any]], ) -> None: """Check if a new review segment should be created.""" camera_config = self.config.cameras[camera] @@ -581,7 +585,7 @@ class ReviewSegmentMaintainer(threading.Thread): zones.append(zone) if severity: - self.active_review_segments[camera] = PendingReviewSegment( + new_segment = PendingReviewSegment( camera, frame_time, severity, @@ -590,6 +594,7 @@ class ReviewSegmentMaintainer(threading.Thread): audio=set(), zones=zones, ) + self.active_review_segments[camera] = new_segment try: yuv_frame = self.frame_manager.get( @@ -600,11 +605,11 @@ class ReviewSegmentMaintainer(threading.Thread): logger.debug(f"Failed to get frame {frame_name} from SHM") return - self.active_review_segments[camera].update_frame( + new_segment.update_frame( camera_config, yuv_frame, activity.get_all_objects() ) self.frame_manager.close(frame_name) - self._publish_segment_start(self.active_review_segments[camera]) + self._publish_segment_start(new_segment) except FileNotFoundError: return @@ -621,9 +626,14 @@ class ReviewSegmentMaintainer(threading.Thread): for camera in updated_topics["enabled"]: self.forcibly_end_segment(camera) - (topic, data) = self.detection_subscriber.check_for_update(timeout=1) + result = self.detection_subscriber.check_for_update(timeout=1) - if not topic: + if not result: + continue + + topic, data = result + + if not topic or not data: continue if topic == DetectionTypeEnum.video.value: @@ -712,10 +722,13 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.api: # manual_info["label"] contains 'label: sub_label' # so split out the label without modifying manual_info + det_labels = self.config.cameras[ + camera + ].review.detections.labels if ( self.config.cameras[camera].review.detections.enabled - and manual_info["label"].split(": ")[0] - in self.config.cameras[camera].review.detections.labels + and det_labels is not None + and manual_info["label"].split(": ")[0] in det_labels ): current_segment.last_detection_time = manual_info[ "end_time" @@ -744,14 +757,15 @@ class ReviewSegmentMaintainer(threading.Thread): ): # manual_info["label"] contains 'label: sub_label' # so split out the label without modifying manual_info + det_labels = self.config.cameras[ + camera + ].review.detections.labels if ( not self.config.cameras[ camera ].review.detections.enabled - or manual_info["label"].split(": ")[0] - not in self.config.cameras[ - camera - ].review.detections.labels + or det_labels is None + or manual_info["label"].split(": ")[0] not in det_labels ): current_segment.severity = SeverityEnum.alert elif ( @@ -828,17 +842,18 @@ class ReviewSegmentMaintainer(threading.Thread): severity = None # manual_info["label"] contains 'label: sub_label' # so split out the label without modifying manual_info + det_labels = self.config.cameras[camera].review.detections.labels if ( self.config.cameras[camera].review.detections.enabled - and manual_info["label"].split(": ")[0] - in self.config.cameras[camera].review.detections.labels + and det_labels is not None + and manual_info["label"].split(": ")[0] in det_labels ): severity = SeverityEnum.detection elif self.config.cameras[camera].review.alerts.enabled: severity = SeverityEnum.alert if severity: - self.active_review_segments[camera] = PendingReviewSegment( + api_segment = PendingReviewSegment( camera, frame_time, severity, @@ -847,32 +862,25 @@ class ReviewSegmentMaintainer(threading.Thread): [], set(), ) + self.active_review_segments[camera] = api_segment if manual_info["state"] == ManualEventState.start: self.indefinite_events[camera][manual_info["event_id"]] = ( manual_info["label"] ) # temporarily make it so this event can not end - self.active_review_segments[ - camera - ].last_alert_time = sys.maxsize - self.active_review_segments[ - camera - ].last_detection_time = sys.maxsize + api_segment.last_alert_time = sys.maxsize + api_segment.last_detection_time = sys.maxsize elif manual_info["state"] == ManualEventState.complete: - self.active_review_segments[ - camera - ].last_alert_time = manual_info["end_time"] - self.active_review_segments[ - camera - ].last_detection_time = manual_info["end_time"] + api_segment.last_alert_time = manual_info["end_time"] + api_segment.last_detection_time = manual_info["end_time"] else: logger.warning( f"Manual event API has been called for {camera}, but alerts and detections are disabled. This manual event will not appear as an alert or detection." ) elif topic == DetectionTypeEnum.lpr: if self.config.cameras[camera].review.detections.enabled: - self.active_review_segments[camera] = PendingReviewSegment( + lpr_segment = PendingReviewSegment( camera, frame_time, SeverityEnum.detection, @@ -881,25 +889,18 @@ class ReviewSegmentMaintainer(threading.Thread): [], set(), ) + self.active_review_segments[camera] = lpr_segment if manual_info["state"] == ManualEventState.start: self.indefinite_events[camera][manual_info["event_id"]] = ( manual_info["label"] ) # temporarily make it so this event can not end - self.active_review_segments[ - camera - ].last_alert_time = sys.maxsize - self.active_review_segments[ - camera - ].last_detection_time = sys.maxsize + lpr_segment.last_alert_time = sys.maxsize + lpr_segment.last_detection_time = sys.maxsize elif manual_info["state"] == ManualEventState.complete: - self.active_review_segments[ - camera - ].last_alert_time = manual_info["end_time"] - self.active_review_segments[ - camera - ].last_detection_time = manual_info["end_time"] + lpr_segment.last_alert_time = manual_info["end_time"] + lpr_segment.last_detection_time = manual_info["end_time"] else: logger.warning( f"Dedicated LPR camera API has been called for {camera}, but detections are disabled. LPR events will not appear as a detection." diff --git a/frigate/storage.py b/frigate/storage.py index dad3c6e9c..8cc199a1b 100644 --- a/frigate/storage.py +++ b/frigate/storage.py @@ -3,6 +3,7 @@ import logging import shutil import threading +from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from peewee import SQL, fn @@ -23,7 +24,7 @@ MAX_CALCULATED_BANDWIDTH = 10000 # 10Gb/hr class StorageMaintainer(threading.Thread): """Maintain frigates recording storage.""" - def __init__(self, config: FrigateConfig, stop_event) -> None: + def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None: super().__init__(name="storage_maintainer") self.config = config self.stop_event = stop_event @@ -114,7 +115,7 @@ class StorageMaintainer(threading.Thread): logger.debug( f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}." ) - return remaining_storage < hourly_bandwidth + return remaining_storage < float(hourly_bandwidth) def reduce_storage_consumption(self) -> None: """Remove oldest hour of recordings.""" @@ -124,7 +125,7 @@ class StorageMaintainer(threading.Thread): [b["bandwidth"] for b in self.camera_storage_stats.values()] ) - recordings: Recordings = ( + recordings = ( Recordings.select( Recordings.id, Recordings.camera, @@ -138,7 +139,7 @@ class StorageMaintainer(threading.Thread): .iterator() ) - retained_events: Event = ( + retained_events = ( Event.select( Event.start_time, Event.end_time, @@ -278,7 +279,7 @@ class StorageMaintainer(threading.Thread): Recordings.id << deleted_recordings_list[i : i + max_deletes] ).execute() - def run(self): + def run(self) -> None: """Check every 5 minutes if storage needs to be cleaned up.""" if self.config.safe_mode: logger.info("Safe mode enabled, skipping storage maintenance") diff --git a/frigate/timeline.py b/frigate/timeline.py index 3ec866176..6a62da2df 100644 --- a/frigate/timeline.py +++ b/frigate/timeline.py @@ -8,7 +8,7 @@ from multiprocessing.synchronize import Event as MpEvent from typing import Any from frigate.config import FrigateConfig -from frigate.events.maintainer import EventStateEnum, EventTypeEnum +from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.models import Timeline from frigate.util.builtin import to_relative_box @@ -28,7 +28,7 @@ class TimelineProcessor(threading.Thread): self.config = config self.queue = queue self.stop_event = stop_event - self.pre_event_cache: dict[str, list[dict[str, Any]]] = {} + self.pre_event_cache: dict[str, list[dict[Any, Any]]] = {} def run(self) -> None: while not self.stop_event.is_set(): @@ -56,7 +56,7 @@ class TimelineProcessor(threading.Thread): def insert_or_save( self, - entry: dict[str, Any], + entry: dict[Any, Any], prev_event_data: dict[Any, Any], event_data: dict[Any, Any], ) -> None: @@ -84,11 +84,15 @@ class TimelineProcessor(threading.Thread): event_type: str, prev_event_data: dict[Any, Any], event_data: dict[Any, Any], - ) -> bool: + ) -> None: """Handle object detection.""" camera_config = self.config.cameras.get(camera) - if camera_config is None: - return False + if ( + camera_config is None + or camera_config.detect.width is None + or camera_config.detect.height is None + ): + return event_id = event_data["id"] # Base timeline entry data that all entries will share diff --git a/frigate/track/tracked_object.py b/frigate/track/tracked_object.py index 7d46b72fd..44d270875 100644 --- a/frigate/track/tracked_object.py +++ b/frigate/track/tracked_object.py @@ -67,8 +67,8 @@ class TrackedObject: self.has_snapshot = False self.top_score = self.computed_score = 0.0 self.thumbnail_data: dict[str, Any] | None = None - self.last_updated = 0 - self.last_published = 0 + self.last_updated: float = 0 + self.last_published: float = 0 self.frame = None self.active = True self.pending_loitering = False