From 41b4c9d64cd9dff69713eaa4077156ff95e31e39 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Wed, 17 Sep 2025 14:45:29 -0500 Subject: [PATCH] implement clustering of plate variants per event should reduce OCR inconsistencies and improve plate recognition stability by using string similarity to cluster similar variants (10 per event id) and choosing the highest confidence representative as the final plate --- .../common/license_plate/mixin.py | 281 +++++++++--------- 1 file changed, 138 insertions(+), 143 deletions(-) diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index 142f8038d..3c894bbf5 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -209,7 +209,7 @@ class LicensePlateProcessingMixin: boxes = self._detect(image) if len(boxes) == 0: - logger.debug("No boxes found by OCR detector model") + logger.debug(f"{camera}: No boxes found by OCR detector model") return [], [], [] if len(boxes) > 0: @@ -381,7 +381,7 @@ class LicensePlateProcessingMixin: ): if len(plate) < self.lpr_config.min_plate_length: logger.debug( - f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" + f"{camera}: Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" ) continue @@ -389,7 +389,7 @@ class LicensePlateProcessingMixin: try: if not re.fullmatch(self.lpr_config.format, plate): logger.debug( - f"Filtered out '{plate}' due to format mismatch" + f"{camera}: Filtered out '{plate}' due to format mismatch" ) continue except re.error: @@ -996,7 +996,9 @@ class LicensePlateProcessingMixin: image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE) return image - def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: + def _detect_license_plate( + self, camera: string, input: np.ndarray + ) -> tuple[int, int, int, int]: """ Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ @@ -1066,118 +1068,88 @@ class LicensePlateProcessingMixin: ).clip(0, [input.shape[1], input.shape[0]] * 2) logger.debug( - f"Found license plate. Bounding box: {expanded_box.astype(int)}" + f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}" ) return tuple(expanded_box.astype(int)) else: return None # No detection above the threshold - def _should_keep_previous_plate( - self, id, top_plate, top_char_confidences, top_area, avg_confidence - ): - """Determine if the previous plate should be kept over the current one.""" - if id not in self.detected_license_plates: - return False + def _get_cluster_rep( + self, plates: List[dict] + ) -> Tuple[str, float, List[float], int]: + """ + Cluster plate variants and select the representative from the best cluster. + """ + if len(plates) == 0: + return "", 0.0, [], 0 - prev_data = self.detected_license_plates[id] - prev_plate = prev_data["plate"] - prev_char_confidences = prev_data["char_confidences"] - prev_area = prev_data["area"] - prev_avg_confidence = ( - sum(prev_char_confidences) / len(prev_char_confidences) - if prev_char_confidences - else 0 + if len(plates) == 1: + p = plates[0] + return p["plate"], p["conf"], p["char_confidences"], p["area"] + + # Log initial variants + logger.debug(f"Clustering {len(plates)} plate variants:") + for i, p in enumerate(plates): + logger.debug( + f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})" + ) + + CLUSTER_THRESHOLD = 0.85 + clusters = [] + for i, plate in enumerate(plates): + merged = False + for j, cluster in enumerate(clusters): + sims = [jaro_winkler(plate["plate"], v["plate"]) for v in cluster] + if len(sims) > 0: + avg_sim = sum(sims) / len(sims) + if avg_sim >= CLUSTER_THRESHOLD: + cluster.append(plate) + logger.debug( + f" Merged variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f}) into cluster {j + 1} (avg_sim: {avg_sim:.3f})" + ) + merged = True + break + if not merged: + clusters.append([plate]) + logger.debug( + f" Started new cluster {len(clusters)} with variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f})" + ) + + if not clusters: + return "", 0.0, [], 0 + + # Log cluster summaries + for j, cluster in enumerate(clusters): + cluster_size = len(cluster) + max_conf = max(v["conf"] for v in cluster) + sample_variants = [v["plate"] for v in cluster[:3]] # First 3 for brevity + logger.debug( + f" Cluster {j + 1}: size {cluster_size}, max conf {max_conf:.3f}, variants: {sample_variants}{'...' if cluster_size > 3 else ''}" + ) + + # Best cluster: largest size, tiebroken by max conf + def cluster_score(c): + return (len(c), max(v["conf"] for v in c)) + + best_cluster_idx = max( + range(len(clusters)), key=lambda j: cluster_score(clusters[j]) ) - - # 1. Normalize metrics - # Length score: Equal lengths = 0.5, penalize extra characters if low confidence - length_diff = len(top_plate) - len(prev_plate) - max_length_diff = 3 - curr_length_score = 0.5 + (length_diff / (2 * max_length_diff)) - curr_length_score = max(0, min(1, curr_length_score)) - prev_length_score = 0.5 - (length_diff / (2 * max_length_diff)) - prev_length_score = max(0, min(1, prev_length_score)) - - # Adjust length score based on confidence of extra characters - conf_threshold = 0.75 # Minimum confidence for a character to be "trusted" - top_plate_char_count = len(top_plate.replace(" ", "")) - prev_plate_char_count = len(prev_plate.replace(" ", "")) - - if top_plate_char_count > prev_plate_char_count: - extra_confidences = top_char_confidences[prev_plate_char_count:] - if extra_confidences: # Ensure the slice is not empty - extra_conf = min(extra_confidences) # Lowest extra char confidence - if extra_conf < conf_threshold: - curr_length_score *= extra_conf / conf_threshold # Penalize if weak - elif prev_plate_char_count > top_plate_char_count: - extra_confidences = prev_char_confidences[top_plate_char_count:] - if extra_confidences: # Ensure the slice is not empty - extra_conf = min(extra_confidences) - if extra_conf < conf_threshold: - prev_length_score *= extra_conf / conf_threshold - - # Area score: Normalize by max area - max_area = max(top_area, prev_area) - curr_area_score = top_area / max_area if max_area > 0 else 0 - prev_area_score = prev_area / max_area if max_area > 0 else 0 - - # Confidence scores - curr_conf_score = avg_confidence - prev_conf_score = prev_avg_confidence - - # Character confidence comparison (average over shared length) - min_length = min(len(top_plate), len(prev_plate)) - if min_length > 0: - curr_char_conf = sum(top_char_confidences[:min_length]) / min_length - prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length - else: - curr_char_conf = prev_char_conf = 0 - - # Penalize any character below threshold - curr_min_conf = min(top_char_confidences) if top_char_confidences else 0 - prev_min_conf = min(prev_char_confidences) if prev_char_confidences else 0 - curr_conf_penalty = ( - 1.0 if curr_min_conf >= conf_threshold else (curr_min_conf / conf_threshold) - ) - prev_conf_penalty = ( - 1.0 if prev_min_conf >= conf_threshold else (prev_min_conf / conf_threshold) - ) - - # 2. Define weights (boost confidence importance) - weights = { - "length": 0.2, - "area": 0.2, - "avg_confidence": 0.35, - "char_confidence": 0.25, - } - - # 3. Calculate weighted scores with penalty - curr_score = ( - curr_length_score * weights["length"] - + curr_area_score * weights["area"] - + curr_conf_score * weights["avg_confidence"] - + curr_char_conf * weights["char_confidence"] - ) * curr_conf_penalty - - prev_score = ( - prev_length_score * weights["length"] - + prev_area_score * weights["area"] - + prev_conf_score * weights["avg_confidence"] - + prev_char_conf * weights["char_confidence"] - ) * prev_conf_penalty - - # 4. Log the comparison + best_cluster = clusters[best_cluster_idx] + best_size, best_max_conf = cluster_score(best_cluster) logger.debug( - f"Plate comparison - Current: {top_plate} (score: {curr_score:.3f}, min_conf: {curr_min_conf:.2f}) vs " - f"Previous: {prev_plate} (score: {prev_score:.3f}, min_conf: {prev_min_conf:.2f}) " - f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), " - f"Area: {top_area} vs {prev_area}, " - f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}, " - f"Char Conf: {curr_char_conf:.2f} vs {prev_char_conf:.2f}" + f" Selected best cluster {best_cluster_idx + 1}: size {best_size}, max conf {best_max_conf:.3f}" ) - # 5. Return True if previous plate scores higher - return prev_score > curr_score + # Rep: highest conf in best cluster + rep = max(best_cluster, key=lambda v: v["conf"]) + logger.debug( + f" Selected rep from best cluster: '{rep['plate']}' (conf: {rep['conf']:.3f})" + ) + logger.debug( + f" Final clustered plate: '{rep['plate']}' (conf: {rep['conf']:.3f})" + ) + + return rep["plate"], rep["conf"], rep["char_confidences"], rep["area"] def _generate_plate_event(self, camera: str, plate: str, plate_score: float) -> str: """Generate a unique ID for a plate event based on camera and text.""" @@ -1228,7 +1200,7 @@ class LicensePlateProcessingMixin: ) yolov9_start = datetime.datetime.now().timestamp() - license_plate = self._detect_license_plate(rgb) + license_plate = self._detect_license_plate(camera, rgb) logger.debug( f"{camera}: YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms" @@ -1451,7 +1423,7 @@ class LicensePlateProcessingMixin: f"{camera}: Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)" ) else: - logger.debug("No text detected") + logger.debug(f"{camera}: No text detected") return top_plate, top_char_confidences, top_area = ( @@ -1494,9 +1466,7 @@ class LicensePlateProcessingMixin: ) break if plate_id is None: - plate_id = self._generate_plate_event( - obj_data, top_plate, avg_confidence - ) + plate_id = self._generate_plate_event(camera, top_plate, avg_confidence) logger.debug( f"{camera}: New plate event for dedicated LPR camera {plate_id}: {top_plate}" ) @@ -1508,25 +1478,66 @@ class LicensePlateProcessingMixin: id = plate_id - # Check if we have a previously detected plate for this ID - if id in self.detected_license_plates: - if self._should_keep_previous_plate( - id, top_plate, top_char_confidences, top_area, avg_confidence - ): - logger.debug(f"{camera}: Keeping previous plate") - return + is_new = id not in self.detected_license_plates + + # Collect variant + variant = { + "plate": top_plate, + "conf": avg_confidence, + "char_confidences": top_char_confidences, + "area": top_area, + "timestamp": current_time, + } + + # Initialize or append to plates + self.detected_license_plates.setdefault(id, {"plates": [], "camera": camera}) + self.detected_license_plates[id]["plates"].append(variant) + + # Prune old variants + if len(self.detected_license_plates[id]["plates"]) > 10: + self.detected_license_plates[id]["plates"] = self.detected_license_plates[ + id + ]["plates"][-10:] + + # Cluster and select rep + plates = self.detected_license_plates[id]["plates"] + rep_plate, rep_conf, rep_char_confs, rep_area = self._get_cluster_rep(plates) + + if rep_plate != top_plate: + logger.debug( + f"{camera}: Clustering changed top plate '{top_plate}' (conf: {avg_confidence:.3f}) to rep '{rep_plate}' (conf: {rep_conf:.3f})" + ) + + # Update stored rep + self.detected_license_plates[id].update( + { + "plate": rep_plate, + "char_confidences": rep_char_confs, + "area": rep_area, + "last_seen": current_time if dedicated_lpr else None, + } + ) + + if not dedicated_lpr: + self.detected_license_plates[id]["obj_data"] = obj_data + + if is_new: + if camera not in self.camera_current_cars: + self.camera_current_cars[camera] = [] + self.camera_current_cars[camera].append(id) # Determine subLabel based on known plates, use regex matching # Default to the detected plate, use label name if there's a match + sub_label = None try: sub_label = next( ( label - for label, plates in self.lpr_config.known_plates.items() + for label, plates_list in self.lpr_config.known_plates.items() if any( - re.match(f"^{plate}$", top_plate) - or distance(plate, top_plate) <= self.lpr_config.match_distance - for plate in plates + re.match(f"^{plate}$", rep_plate) + or distance(plate, rep_plate) <= self.lpr_config.match_distance + for plate in plates_list ) ), None, @@ -1535,12 +1546,11 @@ class LicensePlateProcessingMixin: logger.error( f"{camera}: Invalid regex in known plates configuration: {self.lpr_config.known_plates}" ) - sub_label = None # If it's a known plate, publish to sub_label if sub_label is not None: self.sub_label_publisher.publish( - (id, sub_label, avg_confidence), EventMetadataTypeEnum.sub_label.value + (id, sub_label, rep_conf), EventMetadataTypeEnum.sub_label.value ) # always publish to recognized_license_plate field @@ -1550,8 +1560,8 @@ class LicensePlateProcessingMixin: { "type": TrackedObjectUpdateTypesEnum.lpr, "name": sub_label, - "plate": top_plate, - "score": avg_confidence, + "plate": rep_plate, + "score": rep_conf, "id": id, "camera": camera, "timestamp": start, @@ -1559,7 +1569,7 @@ class LicensePlateProcessingMixin: ), ) self.sub_label_publisher.publish( - (id, "recognized_license_plate", top_plate, avg_confidence), + (id, "recognized_license_plate", rep_plate, rep_conf), EventMetadataTypeEnum.attribute.value, ) @@ -1569,7 +1579,7 @@ class LicensePlateProcessingMixin: and "license_plate" not in self.config.cameras[camera].objects.track ): logger.debug( - f"{camera}: Writing snapshot for {id}, {top_plate}, {current_time}" + f"{camera}: Writing snapshot for {id}, {rep_plate}, {current_time}" ) frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) _, encoded_img = cv2.imencode(".jpg", frame_bgr) @@ -1578,21 +1588,6 @@ class LicensePlateProcessingMixin: EventMetadataTypeEnum.save_lpr_snapshot.value, ) - if id not in self.detected_license_plates: - if camera not in self.camera_current_cars: - self.camera_current_cars[camera] = [] - - self.camera_current_cars[camera].append(id) - - self.detected_license_plates[id] = { - "plate": top_plate, - "char_confidences": top_char_confidences, - "area": top_area, - "obj_data": obj_data, - "camera": camera, - "last_seen": current_time if dedicated_lpr else None, - } - def handle_request(self, topic, request_data) -> dict[str, Any] | None: return