From f9dd9681a7ff3136d733d94d9e33639f36e10a29 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Fri, 15 Aug 2025 08:29:12 -0600 Subject: [PATCH] WIP cleaning up tracked object --- frigate/mypy.ini | 2 +- frigate/track/tracked_object.py | 90 +++++++++++++++++---------------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/frigate/mypy.ini b/frigate/mypy.ini index 9165713d3..5bad10f49 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -53,7 +53,7 @@ ignore_errors = false [mypy-frigate.stats] ignore_errors = false -[mypy-frigate.track] +[mypy-frigate.track.*] ignore_errors = false [mypy-frigate.types] diff --git a/frigate/track/tracked_object.py b/frigate/track/tracked_object.py index afe85228e..626b0bd6a 100644 --- a/frigate/track/tracked_object.py +++ b/frigate/track/tracked_object.py @@ -5,18 +5,18 @@ import math import os from collections import defaultdict from statistics import median -from typing import Any, Optional +from typing import Any, Optional, cast import cv2 import numpy as np from frigate.config import ( CameraConfig, - ModelConfig, SnapshotsConfig, UIConfig, ) from frigate.const import CLIPS_DIR, THUMB_DIR +from frigate.detectors.detector_config import ModelConfig from frigate.review.types import SeverityEnum from frigate.util.builtin import sanitize_float from frigate.util.image import ( @@ -46,11 +46,11 @@ class TrackedObject: model_config: ModelConfig, camera_config: CameraConfig, ui_config: UIConfig, - frame_cache, + frame_cache: dict[float, dict[str, Any]], obj_data: dict[str, Any], - ): + ) -> None: # set the score history then remove as it is not part of object state - self.score_history = obj_data["score_history"] + self.score_history: list[float] = obj_data["score_history"] del obj_data["score_history"] self.obj_data = obj_data @@ -61,24 +61,24 @@ class TrackedObject: self.frame_cache = frame_cache self.zone_presence: dict[str, int] = {} self.zone_loitering: dict[str, int] = {} - self.current_zones = [] - self.entered_zones = [] - self.attributes = defaultdict(float) + self.current_zones: list[str] = [] + self.entered_zones: list[str] = [] + self.attributes: dict[str, float] = defaultdict(float) self.false_positive = True self.has_clip = False self.has_snapshot = False self.top_score = self.computed_score = 0.0 - self.thumbnail_data = None + self.thumbnail_data: dict[str, Any] | None = None self.last_updated = 0 self.last_published = 0 self.frame = None self.active = True self.pending_loitering = False - self.speed_history = [] - self.current_estimated_speed = 0 - self.average_estimated_speed = 0 + self.speed_history: list[float] = [] + self.current_estimated_speed: float = 0 + self.average_estimated_speed: float = 0 self.velocity_angle = 0 - self.path_data = [] + self.path_data: list[tuple[Any, float]] = [] self.previous = self.to_dict() @property @@ -111,7 +111,7 @@ class TrackedObject: return None - def _is_false_positive(self): + def _is_false_positive(self) -> bool: # once a true positive, always a true positive if not self.false_positive: return False @@ -119,11 +119,13 @@ class TrackedObject: threshold = self.camera_config.objects.filters[self.obj_data["label"]].threshold return self.computed_score < threshold - def compute_score(self): + def compute_score(self) -> float: """get median of scores for object.""" return median(self.score_history) - def update(self, current_frame_time: float, obj_data, has_valid_frame: bool): + def update( + self, current_frame_time: float, obj_data: dict[str, Any], has_valid_frame: bool + ) -> tuple[bool, bool, bool, bool]: thumb_update = False significant_change = False path_update = False @@ -305,7 +307,7 @@ class TrackedObject: k: self.attributes[k] for k in self.logos if k in self.attributes } if len(recognized_logos) > 0: - max_logo = max(recognized_logos, key=recognized_logos.get) + max_logo = max(recognized_logos, key=recognized_logos.get) # type: ignore[arg-type] # don't overwrite sub label if it is already set if ( @@ -342,28 +344,30 @@ class TrackedObject: # update path width = self.camera_config.detect.width height = self.camera_config.detect.height - bottom_center = ( - round(obj_data["centroid"][0] / width, 4), - round(obj_data["box"][3] / height, 4), - ) - # calculate a reasonable movement threshold (e.g., 5% of the frame diagonal) - threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height) - - if not self.path_data: - self.path_data.append((bottom_center, obj_data["frame_time"])) - path_update = True - elif ( - math.dist(self.path_data[-1][0], bottom_center) >= threshold - or len(self.path_data) == 1 - ): - # check Euclidean distance before appending - self.path_data.append((bottom_center, obj_data["frame_time"])) - path_update = True - logger.debug( - f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}" + if width is not None and height is not None: + bottom_center = ( + round(obj_data["centroid"][0] / width, 4), + round(obj_data["box"][3] / height, 4), ) + # calculate a reasonable movement threshold (e.g., 5% of the frame diagonal) + threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height) + + if not self.path_data: + self.path_data.append((bottom_center, obj_data["frame_time"])) + path_update = True + elif ( + math.dist(self.path_data[-1][0], bottom_center) >= threshold + or len(self.path_data) == 1 + ): + # check Euclidean distance before appending + self.path_data.append((bottom_center, obj_data["frame_time"])) + path_update = True + logger.debug( + f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}" + ) + self.obj_data.update(obj_data) self.current_zones = current_zones logger.debug( @@ -371,7 +375,7 @@ class TrackedObject: ) return (thumb_update, significant_change, path_update, autotracker_update) - def to_dict(self): + def to_dict(self) -> dict[str, Any]: event = { "id": self.obj_data["id"], "camera": self.camera_config.name, @@ -413,10 +417,8 @@ class TrackedObject: return not self.is_stationary() def is_stationary(self) -> bool: - return ( - self.obj_data["motionless_count"] - > self.camera_config.detect.stationary.threshold - ) + count = cast(int | float, self.obj_data["motionless_count"]) + return count > (self.camera_config.detect.stationary.threshold or 50) def get_thumbnail(self, ext: str) -> bytes | None: img_bytes = self.get_img_bytes( @@ -532,18 +534,18 @@ class TrackedObject: best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA ) if timestamp: - color = self.camera_config.timestamp_style.color + colors = self.camera_config.timestamp_style.color draw_timestamp( best_frame, self.thumbnail_data["frame_time"], self.camera_config.timestamp_style.format, font_effect=self.camera_config.timestamp_style.effect, font_thickness=self.camera_config.timestamp_style.thickness, - font_color=(color.blue, color.green, color.red), + font_color=(colors.blue, colors.green, colors.red), position=self.camera_config.timestamp_style.position, ) - quality_params = None + quality_params = [] if ext == "jpg": quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]