From 72c6fb4e226bf0e748f5da812153e4688c8d53f2 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 24 Sep 2025 11:05:49 -0600 Subject: [PATCH] Implement stationary car classifier to base stationary state on visual changes and not just bounding box stability --- frigate/track/norfair_tracker.py | 231 ++++++++++++++++++++++++++----- 1 file changed, 197 insertions(+), 34 deletions(-) diff --git a/frigate/track/norfair_tracker.py b/frigate/track/norfair_tracker.py index cdbd35f10..a883d324d 100644 --- a/frigate/track/norfair_tracker.py +++ b/frigate/track/norfair_tracker.py @@ -12,6 +12,7 @@ from norfair.tracker import Detection, TrackedObject, Tracker from rich import print from rich.console import Console from rich.table import Table +from scipy.ndimage import gaussian_filter from frigate.camera import PTZMetrics from frigate.config import CameraConfig @@ -33,6 +34,135 @@ THRESHOLD_ACTIVE_CHECK_IOU = 0.9 MAX_STATIONARY_HISTORY = 10 +class StationaryMotionClassifier: + """Fallback classifier to prevent false flips from stationary to active. + + Uses appearance consistency on a fixed spatial region (historical median box) + to detect actual movement, ignoring bounding box detection variations. + """ + + CROP_SIZE = 96 + NCC_KEEP_THRESHOLD = 0.90 # High correlation = keep stationary + NCC_ACTIVE_THRESHOLD = 0.85 # Low correlation = consider active + SHIFT_KEEP_THRESHOLD = 0.02 # Small shift = keep stationary + SHIFT_ACTIVE_THRESHOLD = 0.04 # Large shift = consider active + DRIFT_ACTIVE_THRESHOLD = 0.12 # Cumulative drift over 5 frames + DISAGREE_FRAMES_TO_FLIP = 2 + + def __init__(self) -> None: + self.anchor_crops: dict[str, np.ndarray] = {} + self.anchor_boxes: dict[str, tuple[int, int, int, int]] = {} + self.disagree_counts: dict[str, int] = {} + self.shift_histories: dict[str, list[float]] = {} + + # Pre-compute Hanning window for phase correlation + hann = np.hanning(self.CROP_SIZE).astype(np.float64) + self._hann2d = np.outer(hann, hann) + + def reset(self, id: str) -> None: + if id in self.anchor_crops: + del self.anchor_crops[id] + if id in self.anchor_boxes: + del self.anchor_boxes[id] + self.disagree_counts[id] = 0 + self.shift_histories[id] = [] + + def _extract_y_crop( + self, yuv_frame: np.ndarray, box: tuple[int, int, int, int] + ) -> np.ndarray: + """Extract and normalize Y-plane crop from bounding box.""" + y_height = yuv_frame.shape[0] // 3 * 2 + width = yuv_frame.shape[1] + x1 = max(0, min(width - 1, box[0])) + y1 = max(0, min(y_height - 1, box[1])) + x2 = max(0, min(width - 1, box[2])) + y2 = max(0, min(y_height - 1, box[3])) + + if x2 <= x1: + x2 = min(width - 1, x1 + 1) + if y2 <= y1: + y2 = min(y_height - 1, y1 + 1) + + # Extract Y-plane crop, resize, and blur + y_plane = yuv_frame[0:y_height, 0:width] + crop = y_plane[y1:y2, x1:x2] + crop_resized = cv2.resize( + crop, (self.CROP_SIZE, self.CROP_SIZE), interpolation=cv2.INTER_AREA + ) + return gaussian_filter(crop_resized, sigma=0.5) + + def ensure_anchor( + self, id: str, yuv_frame: np.ndarray, median_box: tuple[int, int, int, int] + ) -> None: + """Initialize anchor crop from stable median box when object becomes stationary.""" + if id not in self.anchor_crops: + self.anchor_boxes[id] = median_box + self.anchor_crops[id] = self._extract_y_crop(yuv_frame, median_box) + self.disagree_counts[id] = 0 + self.shift_histories[id] = [] + + def on_active(self, id: str) -> None: + """Reset state when object becomes active to allow re-anchoring.""" + self.reset(id) + + def evaluate( + self, id: str, yuv_frame: np.ndarray, current_box: tuple[int, int, int, int] + ) -> bool: + """Return True to keep stationary, False to flip to active. + + Compares the same spatial region (historical median box) across frames + to detect actual movement, ignoring bounding box variations. + """ + + if id not in self.anchor_crops or id not in self.anchor_boxes: + return True + + # Compare same spatial region across frames + anchor_box = self.anchor_boxes[id] + anchor_crop = self.anchor_crops[id] + curr_crop = self._extract_y_crop(yuv_frame, anchor_box) + + # Compute appearance and motion metrics + ncc = cv2.matchTemplate(curr_crop, anchor_crop, cv2.TM_CCOEFF_NORMED)[0, 0] + a64 = anchor_crop.astype(np.float64) * self._hann2d + c64 = curr_crop.astype(np.float64) * self._hann2d + (shift_x, shift_y), _ = cv2.phaseCorrelate(a64, c64) + shift_norm = float(np.hypot(shift_x, shift_y)) / float(self.CROP_SIZE) + + # Update rolling shift history + history = self.shift_histories.get(id, []) + history.append(shift_norm) + if len(history) > 5: + history = history[-5:] + self.shift_histories[id] = history + drift_sum = float(sum(history)) + + # Early exit for clear stationary case + if ncc >= self.NCC_KEEP_THRESHOLD and shift_norm < self.SHIFT_KEEP_THRESHOLD: + self.disagree_counts[id] = 0 + return True + + # Check for movement indicators + movement_detected = ( + ncc < self.NCC_ACTIVE_THRESHOLD + or shift_norm >= self.SHIFT_ACTIVE_THRESHOLD + or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD + ) + + if movement_detected: + cnt = self.disagree_counts.get(id, 0) + 1 + self.disagree_counts[id] = cnt + if ( + cnt >= self.DISAGREE_FRAMES_TO_FLIP + or drift_sum >= self.DRIFT_ACTIVE_THRESHOLD + ): + return False + else: + self.disagree_counts[id] = 0 + + return True + + # Normalizes distance from estimate relative to object size # Other ideas: # - if estimates are inaccurate for first N detections, compare with last_detection (may be fine) @@ -119,6 +249,7 @@ class NorfairTracker(ObjectTracker): self.ptz_motion_estimator: PtzMotionEstimator | None = None self.camera_name = config.name self.track_id_map: dict[str, str] = {} + self.stationary_classifier = StationaryMotionClassifier() # Define tracker configurations for static camera self.object_type_configs = { @@ -321,7 +452,13 @@ class NorfairTracker(ObjectTracker): # tracks the current position of the object based on the last N bounding boxes # returns False if the object has moved outside its previous position - def update_position(self, id: str, box: list[int], stationary: bool) -> bool: + def update_position( + self, + id: str, + box: list[int], + stationary: bool, + yuv_frame: np.ndarray | None, + ) -> bool: xmin, ymin, xmax, ymax = box position = self.positions[id] self.stationary_box_history[id].append(box) @@ -331,30 +468,40 @@ class NorfairTracker(ObjectTracker): -MAX_STATIONARY_HISTORY: ] - avg_iou = intersection_over_union( - box, average_boxes(self.stationary_box_history[id]) - ) + avg_box = average_boxes(self.stationary_box_history[id]) + avg_iou = intersection_over_union(box, avg_box) + median_box = median_of_boxes(self.stationary_box_history[id]) + + # Establish anchor early when stationary and stable + if stationary and yuv_frame is not None: + history = self.stationary_box_history[id] + if id not in self.stationary_classifier.anchor_crops and len(history) >= 5: + stability_iou = intersection_over_union(avg_box, median_box) + if stability_iou >= 0.7: + self.stationary_classifier.ensure_anchor(id, yuv_frame, median_box) # object has minimal or zero iou # assume object is active if avg_iou < THRESHOLD_KNOWN_ACTIVE_IOU: - self.positions[id] = { - "xmins": [xmin], - "ymins": [ymin], - "xmaxs": [xmax], - "ymaxs": [ymax], - "xmin": xmin, - "ymin": ymin, - "xmax": xmax, - "ymax": ymax, - } - return False + if stationary: + if not self.stationary_classifier.evaluate(id, yuv_frame, tuple(box)): + self.positions[id] = { + "xmins": [xmin], + "ymins": [ymin], + "xmaxs": [xmax], + "ymaxs": [ymax], + "xmin": xmin, + "ymin": ymin, + "xmax": xmax, + "ymax": ymax, + } + return False threshold = ( THRESHOLD_STATIONARY_CHECK_IOU if stationary else THRESHOLD_ACTIVE_CHECK_IOU ) - # object has iou below threshold, check median to reduce outliers + # object has iou below threshold, check median and optionally crop similarity if avg_iou < threshold: median_iou = intersection_over_union( ( @@ -363,23 +510,28 @@ class NorfairTracker(ObjectTracker): position["xmax"], position["ymax"], ), - median_of_boxes(self.stationary_box_history[id]), + median_box, ) # if the median iou drops below the threshold # assume object is no longer stationary if median_iou < threshold: - self.positions[id] = { - "xmins": [xmin], - "ymins": [ymin], - "xmaxs": [xmax], - "ymaxs": [ymax], - "xmin": xmin, - "ymin": ymin, - "xmax": xmax, - "ymax": ymax, - } - return False + # Before flipping to active, check with classifier if we have YUV frame + if stationary and yuv_frame is not None: + if not self.stationary_classifier.evaluate( + id, yuv_frame, tuple(box) + ): + self.positions[id] = { + "xmins": [xmin], + "ymins": [ymin], + "xmaxs": [xmax], + "ymaxs": [ymax], + "xmin": xmin, + "ymin": ymin, + "xmax": xmax, + "ymax": ymax, + } + return False # if there are more than 5 and less than 10 entries for the position, add the bounding box # and recompute the position box @@ -416,7 +568,12 @@ class NorfairTracker(ObjectTracker): return False - def update(self, track_id: str, obj: dict[str, Any]) -> None: + def update( + self, + track_id: str, + obj: dict[str, Any], + yuv_frame: np.ndarray | None, + ) -> None: id = self.track_id_map[track_id] self.disappeared[id] = 0 stationary = ( @@ -424,7 +581,7 @@ class NorfairTracker(ObjectTracker): >= self.detect_config.stationary.threshold ) # update the motionless count if the object has not moved to a new position - if self.update_position(id, obj["box"], stationary): + if self.update_position(id, obj["box"], stationary, yuv_frame): self.tracked_objects[id]["motionless_count"] += 1 if self.is_expired(id): self.deregister(id, track_id) @@ -440,6 +597,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id]["position_changes"] += 1 self.tracked_objects[id]["motionless_count"] = 0 self.stationary_box_history[id] = [] + self.stationary_classifier.on_active(id) self.tracked_objects[id].update(obj) @@ -467,6 +625,14 @@ class NorfairTracker(ObjectTracker): ) -> None: # Group detections by object type detections_by_type: dict[str, list[Detection]] = {} + yuv_frame: np.ndarray | None = None + + if self.ptz_metrics.autotracker_enabled.value or any( + obj[0] == "car" for obj in detections + ): + yuv_frame = self.frame_manager.get( + frame_name, self.camera_config.frame_shape_yuv + ) for obj in detections: label = obj[0] if label not in detections_by_type: @@ -481,9 +647,6 @@ class NorfairTracker(ObjectTracker): embedding = None if self.ptz_metrics.autotracker_enabled.value: - yuv_frame = self.frame_manager.get( - frame_name, self.camera_config.frame_shape_yuv - ) embedding = get_histogram( yuv_frame, obj[2][0], obj[2][1], obj[2][2], obj[2][3] ) @@ -575,7 +738,7 @@ class NorfairTracker(ObjectTracker): self.tracked_objects[id]["estimate"] = new_obj["estimate"] # else update it else: - self.update(str(t.global_id), new_obj) + self.update(str(t.global_id), new_obj, yuv_frame) # clear expired tracks expired_ids = [k for k in self.track_id_map.keys() if k not in active_ids]