WIP cleaning up tracked object

This commit is contained in:
Nicolas Mowen 2025-08-15 08:29:12 -06:00
parent c52247f29b
commit f9dd9681a7
2 changed files with 47 additions and 45 deletions

View File

@ -53,7 +53,7 @@ ignore_errors = false
[mypy-frigate.stats] [mypy-frigate.stats]
ignore_errors = false ignore_errors = false
[mypy-frigate.track] [mypy-frigate.track.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.types] [mypy-frigate.types]

View File

@ -5,18 +5,18 @@ import math
import os import os
from collections import defaultdict from collections import defaultdict
from statistics import median from statistics import median
from typing import Any, Optional from typing import Any, Optional, cast
import cv2 import cv2
import numpy as np import numpy as np
from frigate.config import ( from frigate.config import (
CameraConfig, CameraConfig,
ModelConfig,
SnapshotsConfig, SnapshotsConfig,
UIConfig, UIConfig,
) )
from frigate.const import CLIPS_DIR, THUMB_DIR from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.detectors.detector_config import ModelConfig
from frigate.review.types import SeverityEnum from frigate.review.types import SeverityEnum
from frigate.util.builtin import sanitize_float from frigate.util.builtin import sanitize_float
from frigate.util.image import ( from frigate.util.image import (
@ -46,11 +46,11 @@ class TrackedObject:
model_config: ModelConfig, model_config: ModelConfig,
camera_config: CameraConfig, camera_config: CameraConfig,
ui_config: UIConfig, ui_config: UIConfig,
frame_cache, frame_cache: dict[float, dict[str, Any]],
obj_data: dict[str, Any], obj_data: dict[str, Any],
): ) -> None:
# set the score history then remove as it is not part of object state # set the score history then remove as it is not part of object state
self.score_history = obj_data["score_history"] self.score_history: list[float] = obj_data["score_history"]
del obj_data["score_history"] del obj_data["score_history"]
self.obj_data = obj_data self.obj_data = obj_data
@ -61,24 +61,24 @@ class TrackedObject:
self.frame_cache = frame_cache self.frame_cache = frame_cache
self.zone_presence: dict[str, int] = {} self.zone_presence: dict[str, int] = {}
self.zone_loitering: dict[str, int] = {} self.zone_loitering: dict[str, int] = {}
self.current_zones = [] self.current_zones: list[str] = []
self.entered_zones = [] self.entered_zones: list[str] = []
self.attributes = defaultdict(float) self.attributes: dict[str, float] = defaultdict(float)
self.false_positive = True self.false_positive = True
self.has_clip = False self.has_clip = False
self.has_snapshot = False self.has_snapshot = False
self.top_score = self.computed_score = 0.0 self.top_score = self.computed_score = 0.0
self.thumbnail_data = None self.thumbnail_data: dict[str, Any] | None = None
self.last_updated = 0 self.last_updated = 0
self.last_published = 0 self.last_published = 0
self.frame = None self.frame = None
self.active = True self.active = True
self.pending_loitering = False self.pending_loitering = False
self.speed_history = [] self.speed_history: list[float] = []
self.current_estimated_speed = 0 self.current_estimated_speed: float = 0
self.average_estimated_speed = 0 self.average_estimated_speed: float = 0
self.velocity_angle = 0 self.velocity_angle = 0
self.path_data = [] self.path_data: list[tuple[Any, float]] = []
self.previous = self.to_dict() self.previous = self.to_dict()
@property @property
@ -111,7 +111,7 @@ class TrackedObject:
return None return None
def _is_false_positive(self): def _is_false_positive(self) -> bool:
# once a true positive, always a true positive # once a true positive, always a true positive
if not self.false_positive: if not self.false_positive:
return False return False
@ -119,11 +119,13 @@ class TrackedObject:
threshold = self.camera_config.objects.filters[self.obj_data["label"]].threshold threshold = self.camera_config.objects.filters[self.obj_data["label"]].threshold
return self.computed_score < threshold return self.computed_score < threshold
def compute_score(self): def compute_score(self) -> float:
"""get median of scores for object.""" """get median of scores for object."""
return median(self.score_history) return median(self.score_history)
def update(self, current_frame_time: float, obj_data, has_valid_frame: bool): def update(
self, current_frame_time: float, obj_data: dict[str, Any], has_valid_frame: bool
) -> tuple[bool, bool, bool, bool]:
thumb_update = False thumb_update = False
significant_change = False significant_change = False
path_update = False path_update = False
@ -305,7 +307,7 @@ class TrackedObject:
k: self.attributes[k] for k in self.logos if k in self.attributes k: self.attributes[k] for k in self.logos if k in self.attributes
} }
if len(recognized_logos) > 0: if len(recognized_logos) > 0:
max_logo = max(recognized_logos, key=recognized_logos.get) max_logo = max(recognized_logos, key=recognized_logos.get) # type: ignore[arg-type]
# don't overwrite sub label if it is already set # don't overwrite sub label if it is already set
if ( if (
@ -342,28 +344,30 @@ class TrackedObject:
# update path # update path
width = self.camera_config.detect.width width = self.camera_config.detect.width
height = self.camera_config.detect.height height = self.camera_config.detect.height
bottom_center = (
round(obj_data["centroid"][0] / width, 4),
round(obj_data["box"][3] / height, 4),
)
# calculate a reasonable movement threshold (e.g., 5% of the frame diagonal) if width is not None and height is not None:
threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height) bottom_center = (
round(obj_data["centroid"][0] / width, 4),
if not self.path_data: round(obj_data["box"][3] / height, 4),
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
elif (
math.dist(self.path_data[-1][0], bottom_center) >= threshold
or len(self.path_data) == 1
):
# check Euclidean distance before appending
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
logger.debug(
f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}"
) )
# calculate a reasonable movement threshold (e.g., 5% of the frame diagonal)
threshold = 0.05 * math.sqrt(width**2 + height**2) / max(width, height)
if not self.path_data:
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
elif (
math.dist(self.path_data[-1][0], bottom_center) >= threshold
or len(self.path_data) == 1
):
# check Euclidean distance before appending
self.path_data.append((bottom_center, obj_data["frame_time"]))
path_update = True
logger.debug(
f"Point tracking: {obj_data['id']}, {bottom_center}, {obj_data['frame_time']}"
)
self.obj_data.update(obj_data) self.obj_data.update(obj_data)
self.current_zones = current_zones self.current_zones = current_zones
logger.debug( logger.debug(
@ -371,7 +375,7 @@ class TrackedObject:
) )
return (thumb_update, significant_change, path_update, autotracker_update) return (thumb_update, significant_change, path_update, autotracker_update)
def to_dict(self): def to_dict(self) -> dict[str, Any]:
event = { event = {
"id": self.obj_data["id"], "id": self.obj_data["id"],
"camera": self.camera_config.name, "camera": self.camera_config.name,
@ -413,10 +417,8 @@ class TrackedObject:
return not self.is_stationary() return not self.is_stationary()
def is_stationary(self) -> bool: def is_stationary(self) -> bool:
return ( count = cast(int | float, self.obj_data["motionless_count"])
self.obj_data["motionless_count"] return count > (self.camera_config.detect.stationary.threshold or 50)
> self.camera_config.detect.stationary.threshold
)
def get_thumbnail(self, ext: str) -> bytes | None: def get_thumbnail(self, ext: str) -> bytes | None:
img_bytes = self.get_img_bytes( img_bytes = self.get_img_bytes(
@ -532,18 +534,18 @@ class TrackedObject:
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
) )
if timestamp: if timestamp:
color = self.camera_config.timestamp_style.color colors = self.camera_config.timestamp_style.color
draw_timestamp( draw_timestamp(
best_frame, best_frame,
self.thumbnail_data["frame_time"], self.thumbnail_data["frame_time"],
self.camera_config.timestamp_style.format, self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect, font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness, font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red), font_color=(colors.blue, colors.green, colors.red),
position=self.camera_config.timestamp_style.position, position=self.camera_config.timestamp_style.position,
) )
quality_params = None quality_params = []
if ext == "jpg": if ext == "jpg":
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70] quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]