diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index 142f8038d..3c894bbf5 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -209,7 +209,7 @@ class LicensePlateProcessingMixin: boxes = self._detect(image) if len(boxes) == 0: - logger.debug("No boxes found by OCR detector model") + logger.debug(f"{camera}: No boxes found by OCR detector model") return [], [], [] if len(boxes) > 0: @@ -381,7 +381,7 @@ class LicensePlateProcessingMixin: ): if len(plate) < self.lpr_config.min_plate_length: logger.debug( - f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" + f"{camera}: Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" ) continue @@ -389,7 +389,7 @@ class LicensePlateProcessingMixin: try: if not re.fullmatch(self.lpr_config.format, plate): logger.debug( - f"Filtered out '{plate}' due to format mismatch" + f"{camera}: Filtered out '{plate}' due to format mismatch" ) continue except re.error: @@ -996,7 +996,9 @@ class LicensePlateProcessingMixin: image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) return image - def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: + def _detect_license_plate( + self, camera: string, input: np.ndarray + ) -> tuple[int, int, int, int]: """ Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ @@ -1066,118 +1068,88 @@ class LicensePlateProcessingMixin: ).clip(0, [input.shape[1], input.shape[0]] * 2) logger.debug( - f"Found license plate. Bounding box: {expanded_box.astype(int)}" + f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}" ) return tuple(expanded_box.astype(int)) else: return None # No detection above the threshold - def _should_keep_previous_plate( - self, id, top_plate, top_char_confidences, top_area, avg_confidence - ): - """Determine if the previous plate should be kept over the current one.""" - if id not in self.detected_license_plates: - return False + def _get_cluster_rep( + self, plates: List[dict] + ) -> Tuple[str, float, List[float], int]: + """ + Cluster plate variants and select the representative from the best cluster. + """ + if len(plates) == 0: + return "", 0.0, [], 0 - prev_data = self.detected_license_plates[id] - prev_plate = prev_data["plate"] - prev_char_confidences = prev_data["char_confidences"] - prev_area = prev_data["area"] - prev_avg_confidence = ( - sum(prev_char_confidences) / len(prev_char_confidences) - if prev_char_confidences - else 0 + if len(plates) == 1: + p = plates[0] + return p["plate"], p["conf"], p["char_confidences"], p["area"] + + # Log initial variants + logger.debug(f"Clustering {len(plates)} plate variants:") + for i, p in enumerate(plates): + logger.debug( + f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})" + ) + + CLUSTER_THRESHOLD = 0.85 + clusters = [] + for i, plate in enumerate(plates): + merged = False + for j, cluster in enumerate(clusters): + sims = [jaro_winkler(plate["plate"], v["plate"]) for v in cluster] + if len(sims) > 0: + avg_sim = sum(sims) / len(sims) + if avg_sim >= CLUSTER_THRESHOLD: + cluster.append(plate) + logger.debug( + f" Merged variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f}) into cluster {j + 1} (avg_sim: {avg_sim:.3f})" + ) + merged = True + break + if not merged: + clusters.append([plate]) + logger.debug( + f" Started new cluster {len(clusters)} with variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f})" + ) + + if not clusters: + return "", 0.0, [], 0 + + # Log cluster summaries + for j, cluster in enumerate(clusters): + cluster_size = len(cluster) + max_conf = max(v["conf"] for v in cluster) + sample_variants = [v["plate"] for v in cluster[:3]] # First 3 for brevity + logger.debug( + f" Cluster {j + 1}: size {cluster_size}, max conf {max_conf:.3f}, variants: {sample_variants}{'...' if cluster_size > 3 else ''}" + ) + + # Best cluster: largest size, tiebroken by max conf + def cluster_score(c): + return (len(c), max(v["conf"] for v in c)) + + best_cluster_idx = max( + range(len(clusters)), key=lambda j: cluster_score(clusters[j]) ) - - # 1. Normalize metrics - # Length score: Equal lengths = 0.5, penalize extra characters if low confidence - length_diff = len(top_plate) - len(prev_plate) - max_length_diff = 3 - curr_length_score = 0.5 + (length_diff / (2 * max_length_diff)) - curr_length_score = max(0, min(1, curr_length_score)) - prev_length_score = 0.5 - (length_diff / (2 * max_length_diff)) - prev_length_score = max(0, min(1, prev_length_score)) - - # Adjust length score based on confidence of extra characters - conf_threshold = 0.75 # Minimum confidence for a character to be "trusted" - top_plate_char_count = len(top_plate.replace(" ", "")) - prev_plate_char_count = len(prev_plate.replace(" ", "")) - - if top_plate_char_count > prev_plate_char_count: - extra_confidences = top_char_confidences[prev_plate_char_count:] - if extra_confidences: # Ensure the slice is not empty - extra_conf = min(extra_confidences) # Lowest extra char confidence - if extra_conf < conf_threshold: - curr_length_score *= extra_conf / conf_threshold # Penalize if weak - elif prev_plate_char_count > top_plate_char_count: - extra_confidences = prev_char_confidences[top_plate_char_count:] - if extra_confidences: # Ensure the slice is not empty - extra_conf = min(extra_confidences) - if extra_conf < conf_threshold: - prev_length_score *= extra_conf / conf_threshold - - # Area score: Normalize by max area - max_area = max(top_area, prev_area) - curr_area_score = top_area / max_area if max_area > 0 else 0 - prev_area_score = prev_area / max_area if max_area > 0 else 0 - - # Confidence scores - curr_conf_score = avg_confidence - prev_conf_score = prev_avg_confidence - - # Character confidence comparison (average over shared length) - min_length = min(len(top_plate), len(prev_plate)) - if min_length > 0: - curr_char_conf = sum(top_char_confidences[:min_length]) / min_length - prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length - else: - curr_char_conf = prev_char_conf = 0 - - # Penalize any character below threshold - curr_min_conf = min(top_char_confidences) if top_char_confidences else 0 - prev_min_conf = min(prev_char_confidences) if prev_char_confidences else 0 - curr_conf_penalty = ( - 1.0 if curr_min_conf >= conf_threshold else (curr_min_conf / conf_threshold) - ) - prev_conf_penalty = ( - 1.0 if prev_min_conf >= conf_threshold else (prev_min_conf / conf_threshold) - ) - - # 2. Define weights (boost confidence importance) - weights = { - "length": 0.2, - "area": 0.2, - "avg_confidence": 0.35, - "char_confidence": 0.25, - } - - # 3. Calculate weighted scores with penalty - curr_score = ( - curr_length_score * weights["length"] - + curr_area_score * weights["area"] - + curr_conf_score * weights["avg_confidence"] - + curr_char_conf * weights["char_confidence"] - ) * curr_conf_penalty - - prev_score = ( - prev_length_score * weights["length"] - + prev_area_score * weights["area"] - + prev_conf_score * weights["avg_confidence"] - + prev_char_conf * weights["char_confidence"] - ) * prev_conf_penalty - - # 4. Log the comparison + best_cluster = clusters[best_cluster_idx] + best_size, best_max_conf = cluster_score(best_cluster) logger.debug( - f"Plate comparison - Current: {top_plate} (score: {curr_score:.3f}, min_conf: {curr_min_conf:.2f}) vs " - f"Previous: {prev_plate} (score: {prev_score:.3f}, min_conf: {prev_min_conf:.2f}) " - f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), " - f"Area: {top_area} vs {prev_area}, " - f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}, " - f"Char Conf: {curr_char_conf:.2f} vs {prev_char_conf:.2f}" + f" Selected best cluster {best_cluster_idx + 1}: size {best_size}, max conf {best_max_conf:.3f}" ) - # 5. Return True if previous plate scores higher - return prev_score > curr_score + # Rep: highest conf in best cluster + rep = max(best_cluster, key=lambda v: v["conf"]) + logger.debug( + f" Selected rep from best cluster: '{rep['plate']}' (conf: {rep['conf']:.3f})" + ) + logger.debug( + f" Final clustered plate: '{rep['plate']}' (conf: {rep['conf']:.3f})" + ) + + return rep["plate"], rep["conf"], rep["char_confidences"], rep["area"] def _generate_plate_event(self, camera: str, plate: str, plate_score: float) -> str: """Generate a unique ID for a plate event based on camera and text.""" @@ -1228,7 +1200,7 @@ class LicensePlateProcessingMixin: ) yolov9_start = datetime.datetime.now().timestamp() - license_plate = self._detect_license_plate(rgb) + license_plate = self._detect_license_plate(camera, rgb) logger.debug( f"{camera}: YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" @@ -1451,7 +1423,7 @@ class LicensePlateProcessingMixin: f"{camera}: Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)" ) else: - logger.debug("No text detected") + logger.debug(f"{camera}: No text detected") return top_plate, top_char_confidences, top_area = ( @@ -1494,9 +1466,7 @@ class LicensePlateProcessingMixin: ) break if plate_id is None: - plate_id = self._generate_plate_event( - obj_data, top_plate, avg_confidence - ) + plate_id = self._generate_plate_event(camera, top_plate, avg_confidence) logger.debug( f"{camera}: New plate event for dedicated LPR camera {plate_id}: {top_plate}" ) @@ -1508,25 +1478,66 @@ class LicensePlateProcessingMixin: id = plate_id - # Check if we have a previously detected plate for this ID - if id in self.detected_license_plates: - if self._should_keep_previous_plate( - id, top_plate, top_char_confidences, top_area, avg_confidence - ): - logger.debug(f"{camera}: Keeping previous plate") - return + is_new = id not in self.detected_license_plates + + # Collect variant + variant = { + "plate": top_plate, + "conf": avg_confidence, + "char_confidences": top_char_confidences, + "area": top_area, + "timestamp": current_time, + } + + # Initialize or append to plates + self.detected_license_plates.setdefault(id, {"plates": [], "camera": camera}) + self.detected_license_plates[id]["plates"].append(variant) + + # Prune old variants + if len(self.detected_license_plates[id]["plates"]) > 10: + self.detected_license_plates[id]["plates"] = self.detected_license_plates[ + id + ]["plates"][-10:] + + # Cluster and select rep + plates = self.detected_license_plates[id]["plates"] + rep_plate, rep_conf, rep_char_confs, rep_area = self._get_cluster_rep(plates) + + if rep_plate != top_plate: + logger.debug( + f"{camera}: Clustering changed top plate '{top_plate}' (conf: {avg_confidence:.3f}) to rep '{rep_plate}' (conf: {rep_conf:.3f})" + ) + + # Update stored rep + self.detected_license_plates[id].update( + { + "plate": rep_plate, + "char_confidences": rep_char_confs, + "area": rep_area, + "last_seen": current_time if dedicated_lpr else None, + } + ) + + if not dedicated_lpr: + self.detected_license_plates[id]["obj_data"] = obj_data + + if is_new: + if camera not in self.camera_current_cars: + self.camera_current_cars[camera] = [] + self.camera_current_cars[camera].append(id) # Determine subLabel based on known plates, use regex matching # Default to the detected plate, use label name if there's a match + sub_label = None try: sub_label = next( ( label - for label, plates in self.lpr_config.known_plates.items() + for label, plates_list in self.lpr_config.known_plates.items() if any( - re.match(f"^{plate}$", top_plate) - or distance(plate, top_plate) <= self.lpr_config.match_distance - for plate in plates + re.match(f"^{plate}$", rep_plate) + or distance(plate, rep_plate) <= self.lpr_config.match_distance + for plate in plates_list ) ), None, @@ -1535,12 +1546,11 @@ class LicensePlateProcessingMixin: logger.error( f"{camera}: Invalid regex in known plates configuration: {self.lpr_config.known_plates}" ) - sub_label = None # If it's a known plate, publish to sub_label if sub_label is not None: self.sub_label_publisher.publish( - (id, sub_label, avg_confidence), EventMetadataTypeEnum.sub_label.value + (id, sub_label, rep_conf), EventMetadataTypeEnum.sub_label.value ) # always publish to recognized_license_plate field @@ -1550,8 +1560,8 @@ class LicensePlateProcessingMixin: { "type": TrackedObjectUpdateTypesEnum.lpr, "name": sub_label, - "plate": top_plate, - "score": avg_confidence, + "plate": rep_plate, + "score": rep_conf, "id": id, "camera": camera, "timestamp": start, @@ -1559,7 +1569,7 @@ class LicensePlateProcessingMixin: ), ) self.sub_label_publisher.publish( - (id, "recognized_license_plate", top_plate, avg_confidence), + (id, "recognized_license_plate", rep_plate, rep_conf), EventMetadataTypeEnum.attribute.value, ) @@ -1569,7 +1579,7 @@ class LicensePlateProcessingMixin: and "license_plate" not in self.config.cameras[camera].objects.track ): logger.debug( - f"{camera}: Writing snapshot for {id}, {top_plate}, {current_time}" + f"{camera}: Writing snapshot for {id}, {rep_plate}, {current_time}" ) frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) _, encoded_img = cv2.imencode(".jpg", frame_bgr) @@ -1578,21 +1588,6 @@ class LicensePlateProcessingMixin: EventMetadataTypeEnum.save_lpr_snapshot.value, ) - if id not in self.detected_license_plates: - if camera not in self.camera_current_cars: - self.camera_current_cars[camera] = [] - - self.camera_current_cars[camera].append(id) - - self.detected_license_plates[id] = { - "plate": top_plate, - "char_confidences": top_char_confidences, - "area": top_area, - "obj_data": obj_data, - "camera": camera, - "last_seen": current_time if dedicated_lpr else None, - } - def handle_request(self, topic, request_data) -> dict[str, Any] | None: return