Implement stationary car classifier to base stationary state on visual changes and not just bounding box stability

This commit is contained in:
Nicolas Mowen 2025-09-24 11:05:49 -06:00
parent 9a22404015
commit 72c6fb4e22

View File

@ -12,6 +12,7 @@ from norfair.tracker import Detection, TrackedObject, Tracker
from rich import print from rich import print
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
from scipy.ndimage import gaussian_filter
from frigate.camera import PTZMetrics from frigate.camera import PTZMetrics
from frigate.config import CameraConfig from frigate.config import CameraConfig
@ -33,6 +34,135 @@ THRESHOLD_ACTIVE_CHECK_IOU = 0.9
MAX_STATIONARY_HISTORY = 10 MAX_STATIONARY_HISTORY = 10
class StationaryMotionClassifier:
"""Fallback classifier to prevent false flips from stationary to active.
Uses appearance consistency on a fixed spatial region (historical median box)
to detect actual movement, ignoring bounding box detection variations.
"""
CROP_SIZE = 96
NCC_KEEP_THRESHOLD = 0.90 # High correlation = keep stationary
NCC_ACTIVE_THRESHOLD = 0.85 # Low correlation = consider active
SHIFT_KEEP_THRESHOLD = 0.02 # Small shift = keep stationary
SHIFT_ACTIVE_THRESHOLD = 0.04 # Large shift = consider active
DRIFT_ACTIVE_THRESHOLD = 0.12 # Cumulative drift over 5 frames
DISAGREE_FRAMES_TO_FLIP = 2
def __init__(self) -> None:
self.anchor_crops: dict[str, np.ndarray] = {}
self.anchor_boxes: dict[str, tuple[int, int, int, int]] = {}
self.disagree_counts: dict[str, int] = {}
self.shift_histories: dict[str, list[float]] = {}
# Pre-compute Hanning window for phase correlation
hann = np.hanning(self.CROP_SIZE).astype(np.float64)
self._hann2d = np.outer(hann, hann)
def reset(self, id: str) -> None:
if id in self.anchor_crops:
del self.anchor_crops[id]
if id in self.anchor_boxes:
del self.anchor_boxes[id]
self.disagree_counts[id] = 0
self.shift_histories[id] = []
def _extract_y_crop(
self, yuv_frame: np.ndarray, box: tuple[int, int, int, int]
) -> np.ndarray:
"""Extract and normalize Y-plane crop from bounding box."""
y_height = yuv_frame.shape[0] // 3 * 2
width = yuv_frame.shape[1]
x1 = max(0, min(width - 1, box[0]))
y1 = max(0, min(y_height - 1, box[1]))
x2 = max(0, min(width - 1, box[2]))
y2 = max(0, min(y_height - 1, box[3]))
if x2 <= x1:
x2 = min(width - 1, x1 + 1)
if y2 <= y1:
y2 = min(y_height - 1, y1 + 1)
# Extract Y-plane crop, resize, and blur
y_plane = yuv_frame[0:y_height, 0:width]
crop = y_plane[y1:y2, x1:x2]
crop_resized = cv2.resize(
crop, (self.CROP_SIZE, self.CROP_SIZE), interpolation=cv2.INTER_AREA
)
return gaussian_filter(crop_resized, sigma=0.5)
def ensure_anchor(
self, id: str, yuv_frame: np.ndarray, median_box: tuple[int, int, int, int]
) -> None:
"""Initialize anchor crop from stable median box when object becomes stationary."""
if id not in self.anchor_crops:
self.anchor_boxes[id] = median_box
self.anchor_crops[id] = self._extract_y_crop(yuv_frame, median_box)
self.disagree_counts[id] = 0
self.shift_histories[id] = []
def on_active(self, id: str) -> None:
"""Reset state when object becomes active to allow re-anchoring."""
self.reset(id)
def evaluate(
self, id: str, yuv_frame: np.ndarray, current_box: tuple[int, int, int, int]
) -> bool:
"""Return True to keep stationary, False to flip to active.
Compares the same spatial region (historical median box) across frames
to detect actual movement, ignoring bounding box variations.
"""
if id not in self.anchor_crops or id not in self.anchor_boxes:
return True
# Compare same spatial region across frames
anchor_box = self.anchor_boxes[id]
anchor_crop = self.anchor_crops[id]
curr_crop = self._extract_y_crop(yuv_frame, anchor_box)
# Compute appearance and motion metrics
ncc = cv2.matchTemplate(curr_crop, anchor_crop, cv2.TM_CCOEFF_NORMED)[0, 0]
a64 = anchor_crop.astype(np.float64) * self._hann2d
c64 = curr_crop.astype(np.float64) * self._hann2d
(shift_x, shift_y), _ = cv2.phaseCorrelate(a64, c64)
shift_norm = float(np.hypot(shift_x, shift_y)) / float(self.CROP_SIZE)
# Update rolling shift history
history = self.shift_histories.get(id, [])
history.append(shift_norm)
if len(history) > 5:
history = history[-5:]
self.shift_histories[id] = history
drift_sum = float(sum(history))
# Early exit for clear stationary case
if ncc >= self.NCC_KEEP_THRESHOLD and shift_norm < self.SHIFT_KEEP_THRESHOLD:
self.disagree_counts[id] = 0
return True
# Check for movement indicators
movement_detected = (
ncc < self.NCC_ACTIVE_THRESHOLD
or shift_norm >= self.SHIFT_ACTIVE_THRESHOLD
or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD
)
if movement_detected:
cnt = self.disagree_counts.get(id, 0) + 1
self.disagree_counts[id] = cnt
if (
cnt >= self.DISAGREE_FRAMES_TO_FLIP
or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD
):
return False
else:
self.disagree_counts[id] = 0
return True
# Normalizes distance from estimate relative to object size # Normalizes distance from estimate relative to object size
# Other ideas: # Other ideas:
# - if estimates are inaccurate for first N detections, compare with last_detection (may be fine) # - if estimates are inaccurate for first N detections, compare with last_detection (may be fine)
@ -119,6 +249,7 @@ class NorfairTracker(ObjectTracker):
self.ptz_motion_estimator: PtzMotionEstimator | None = None self.ptz_motion_estimator: PtzMotionEstimator | None = None
self.camera_name = config.name self.camera_name = config.name
self.track_id_map: dict[str, str] = {} self.track_id_map: dict[str, str] = {}
self.stationary_classifier = StationaryMotionClassifier()
# Define tracker configurations for static camera # Define tracker configurations for static camera
self.object_type_configs = { self.object_type_configs = {
@ -321,7 +452,13 @@ class NorfairTracker(ObjectTracker):
# tracks the current position of the object based on the last N bounding boxes # tracks the current position of the object based on the last N bounding boxes
# returns False if the object has moved outside its previous position # returns False if the object has moved outside its previous position
def update_position(self, id: str, box: list[int], stationary: bool) -> bool: def update_position(
self,
id: str,
box: list[int],
stationary: bool,
yuv_frame: np.ndarray | None,
) -> bool:
xmin, ymin, xmax, ymax = box xmin, ymin, xmax, ymax = box
position = self.positions[id] position = self.positions[id]
self.stationary_box_history[id].append(box) self.stationary_box_history[id].append(box)
@ -331,30 +468,40 @@ class NorfairTracker(ObjectTracker):
-MAX_STATIONARY_HISTORY: -MAX_STATIONARY_HISTORY:
] ]
avg_iou = intersection_over_union( avg_box = average_boxes(self.stationary_box_history[id])
box, average_boxes(self.stationary_box_history[id]) avg_iou = intersection_over_union(box, avg_box)
) median_box = median_of_boxes(self.stationary_box_history[id])
# Establish anchor early when stationary and stable
if stationary and yuv_frame is not None:
history = self.stationary_box_history[id]
if id not in self.stationary_classifier.anchor_crops and len(history) >= 5:
stability_iou = intersection_over_union(avg_box, median_box)
if stability_iou >= 0.7:
self.stationary_classifier.ensure_anchor(id, yuv_frame, median_box)
# object has minimal or zero iou # object has minimal or zero iou
# assume object is active # assume object is active
if avg_iou < THRESHOLD_KNOWN_ACTIVE_IOU: if avg_iou < THRESHOLD_KNOWN_ACTIVE_IOU:
self.positions[id] = { if stationary:
"xmins": [xmin], if not self.stationary_classifier.evaluate(id, yuv_frame, tuple(box)):
"ymins": [ymin], self.positions[id] = {
"xmaxs": [xmax], "xmins": [xmin],
"ymaxs": [ymax], "ymins": [ymin],
"xmin": xmin, "xmaxs": [xmax],
"ymin": ymin, "ymaxs": [ymax],
"xmax": xmax, "xmin": xmin,
"ymax": ymax, "ymin": ymin,
} "xmax": xmax,
return False "ymax": ymax,
}
return False
threshold = ( threshold = (
THRESHOLD_STATIONARY_CHECK_IOU if stationary else THRESHOLD_ACTIVE_CHECK_IOU THRESHOLD_STATIONARY_CHECK_IOU if stationary else THRESHOLD_ACTIVE_CHECK_IOU
) )
# object has iou below threshold, check median to reduce outliers # object has iou below threshold, check median and optionally crop similarity
if avg_iou < threshold: if avg_iou < threshold:
median_iou = intersection_over_union( median_iou = intersection_over_union(
( (
@ -363,23 +510,28 @@ class NorfairTracker(ObjectTracker):
position["xmax"], position["xmax"],
position["ymax"], position["ymax"],
), ),
median_of_boxes(self.stationary_box_history[id]), median_box,
) )
# if the median iou drops below the threshold # if the median iou drops below the threshold
# assume object is no longer stationary # assume object is no longer stationary
if median_iou < threshold: if median_iou < threshold:
self.positions[id] = { # Before flipping to active, check with classifier if we have YUV frame
"xmins": [xmin], if stationary and yuv_frame is not None:
"ymins": [ymin], if not self.stationary_classifier.evaluate(
"xmaxs": [xmax], id, yuv_frame, tuple(box)
"ymaxs": [ymax], ):
"xmin": xmin, self.positions[id] = {
"ymin": ymin, "xmins": [xmin],
"xmax": xmax, "ymins": [ymin],
"ymax": ymax, "xmaxs": [xmax],
} "ymaxs": [ymax],
return False "xmin": xmin,
"ymin": ymin,
"xmax": xmax,
"ymax": ymax,
}
return False
# if there are more than 5 and less than 10 entries for the position, add the bounding box # if there are more than 5 and less than 10 entries for the position, add the bounding box
# and recompute the position box # and recompute the position box
@ -416,7 +568,12 @@ class NorfairTracker(ObjectTracker):
return False return False
def update(self, track_id: str, obj: dict[str, Any]) -> None: def update(
self,
track_id: str,
obj: dict[str, Any],
yuv_frame: np.ndarray | None,
) -> None:
id = self.track_id_map[track_id] id = self.track_id_map[track_id]
self.disappeared[id] = 0 self.disappeared[id] = 0
stationary = ( stationary = (
@ -424,7 +581,7 @@ class NorfairTracker(ObjectTracker):
>= self.detect_config.stationary.threshold >= self.detect_config.stationary.threshold
) )
# update the motionless count if the object has not moved to a new position # update the motionless count if the object has not moved to a new position
if self.update_position(id, obj["box"], stationary): if self.update_position(id, obj["box"], stationary, yuv_frame):
self.tracked_objects[id]["motionless_count"] += 1 self.tracked_objects[id]["motionless_count"] += 1
if self.is_expired(id): if self.is_expired(id):
self.deregister(id, track_id) self.deregister(id, track_id)
@ -440,6 +597,7 @@ class NorfairTracker(ObjectTracker):
self.tracked_objects[id]["position_changes"] += 1 self.tracked_objects[id]["position_changes"] += 1
self.tracked_objects[id]["motionless_count"] = 0 self.tracked_objects[id]["motionless_count"] = 0
self.stationary_box_history[id] = [] self.stationary_box_history[id] = []
self.stationary_classifier.on_active(id)
self.tracked_objects[id].update(obj) self.tracked_objects[id].update(obj)
@ -467,6 +625,14 @@ class NorfairTracker(ObjectTracker):
) -> None: ) -> None:
# Group detections by object type # Group detections by object type
detections_by_type: dict[str, list[Detection]] = {} detections_by_type: dict[str, list[Detection]] = {}
yuv_frame: np.ndarray | None = None
if self.ptz_metrics.autotracker_enabled.value or any(
obj[0] == "car" for obj in detections
):
yuv_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
for obj in detections: for obj in detections:
label = obj[0] label = obj[0]
if label not in detections_by_type: if label not in detections_by_type:
@ -481,9 +647,6 @@ class NorfairTracker(ObjectTracker):
embedding = None embedding = None
if self.ptz_metrics.autotracker_enabled.value: if self.ptz_metrics.autotracker_enabled.value:
yuv_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
embedding = get_histogram( embedding = get_histogram(
yuv_frame, obj[2][0], obj[2][1], obj[2][2], obj[2][3] yuv_frame, obj[2][0], obj[2][1], obj[2][2], obj[2][3]
) )
@ -575,7 +738,7 @@ class NorfairTracker(ObjectTracker):
self.tracked_objects[id]["estimate"] = new_obj["estimate"] self.tracked_objects[id]["estimate"] = new_obj["estimate"]
# else update it # else update it
else: else:
self.update(str(t.global_id), new_obj) self.update(str(t.global_id), new_obj, yuv_frame)
# clear expired tracks # clear expired tracks
expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids] expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids]