implement clustering of plate variants per event

should reduce OCR inconsistencies and improve plate recognition stability by using string similarity to cluster similar variants (10 per event id) and choosing the highest confidence representative as the final plate
This commit is contained in:
Josh Hawkins 2025-09-17 14:45:29 -05:00
parent e6ccd40dbb
commit 41b4c9d64c

View File

@ -209,7 +209,7 @@ class LicensePlateProcessingMixin:
boxes = self._detect(image)
if len(boxes) == 0:
logger.debug("No boxes found by OCR detector model")
logger.debug(f"{camera}: No boxes found by OCR detector model")
return [], [], []
if len(boxes) > 0:
@ -381,7 +381,7 @@ class LicensePlateProcessingMixin:
):
if len(plate) < self.lpr_config.min_plate_length:
logger.debug(
f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
f"{camera}: Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
)
continue
@ -389,7 +389,7 @@ class LicensePlateProcessingMixin:
try:
if not re.fullmatch(self.lpr_config.format, plate):
logger.debug(
f"Filtered out '{plate}' due to format mismatch"
f"{camera}: Filtered out '{plate}' due to format mismatch"
)
continue
except re.error:
@ -996,7 +996,9 @@ class LicensePlateProcessingMixin:
image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
return image
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
def _detect_license_plate(
self, camera: string, input: np.ndarray
) -> tuple[int, int, int, int]:
"""
Use a lightweight YOLOv9 model to detect license plates for users without Frigate+
@ -1066,118 +1068,88 @@ class LicensePlateProcessingMixin:
).clip(0, [input.shape[1], input.shape[0]] * 2)
logger.debug(
f"Found license plate. Bounding box: {expanded_box.astype(int)}"
f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}"
)
return tuple(expanded_box.astype(int))
else:
return None # No detection above the threshold
def _should_keep_previous_plate(
self, id, top_plate, top_char_confidences, top_area, avg_confidence
):
"""Determine if the previous plate should be kept over the current one."""
if id not in self.detected_license_plates:
return False
def _get_cluster_rep(
self, plates: List[dict]
) -> Tuple[str, float, List[float], int]:
"""
Cluster plate variants and select the representative from the best cluster.
"""
if len(plates) == 0:
return "", 0.0, [], 0
prev_data = self.detected_license_plates[id]
prev_plate = prev_data["plate"]
prev_char_confidences = prev_data["char_confidences"]
prev_area = prev_data["area"]
prev_avg_confidence = (
sum(prev_char_confidences) / len(prev_char_confidences)
if prev_char_confidences
else 0
if len(plates) == 1:
p = plates[0]
return p["plate"], p["conf"], p["char_confidences"], p["area"]
# Log initial variants
logger.debug(f"Clustering {len(plates)} plate variants:")
for i, p in enumerate(plates):
logger.debug(
f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})"
)
CLUSTER_THRESHOLD = 0.85
clusters = []
for i, plate in enumerate(plates):
merged = False
for j, cluster in enumerate(clusters):
sims = [jaro_winkler(plate["plate"], v["plate"]) for v in cluster]
if len(sims) > 0:
avg_sim = sum(sims) / len(sims)
if avg_sim >= CLUSTER_THRESHOLD:
cluster.append(plate)
logger.debug(
f" Merged variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f}) into cluster {j + 1} (avg_sim: {avg_sim:.3f})"
)
merged = True
break
if not merged:
clusters.append([plate])
logger.debug(
f" Started new cluster {len(clusters)} with variant {i + 1} '{plate['plate']}' (conf: {plate['conf']:.3f})"
)
if not clusters:
return "", 0.0, [], 0
# Log cluster summaries
for j, cluster in enumerate(clusters):
cluster_size = len(cluster)
max_conf = max(v["conf"] for v in cluster)
sample_variants = [v["plate"] for v in cluster[:3]] # First 3 for brevity
logger.debug(
f" Cluster {j + 1}: size {cluster_size}, max conf {max_conf:.3f}, variants: {sample_variants}{'...' if cluster_size > 3 else ''}"
)
# Best cluster: largest size, tiebroken by max conf
def cluster_score(c):
return (len(c), max(v["conf"] for v in c))
best_cluster_idx = max(
range(len(clusters)), key=lambda j: cluster_score(clusters[j])
)
# 1. Normalize metrics
# Length score: Equal lengths = 0.5, penalize extra characters if low confidence
length_diff = len(top_plate) - len(prev_plate)
max_length_diff = 3
curr_length_score = 0.5 + (length_diff / (2 * max_length_diff))
curr_length_score = max(0, min(1, curr_length_score))
prev_length_score = 0.5 - (length_diff / (2 * max_length_diff))
prev_length_score = max(0, min(1, prev_length_score))
# Adjust length score based on confidence of extra characters
conf_threshold = 0.75 # Minimum confidence for a character to be "trusted"
top_plate_char_count = len(top_plate.replace(" ", ""))
prev_plate_char_count = len(prev_plate.replace(" ", ""))
if top_plate_char_count > prev_plate_char_count:
extra_confidences = top_char_confidences[prev_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences) # Lowest extra char confidence
if extra_conf < conf_threshold:
curr_length_score *= extra_conf / conf_threshold # Penalize if weak
elif prev_plate_char_count > top_plate_char_count:
extra_confidences = prev_char_confidences[top_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences)
if extra_conf < conf_threshold:
prev_length_score *= extra_conf / conf_threshold
# Area score: Normalize by max area
max_area = max(top_area, prev_area)
curr_area_score = top_area / max_area if max_area > 0 else 0
prev_area_score = prev_area / max_area if max_area > 0 else 0
# Confidence scores
curr_conf_score = avg_confidence
prev_conf_score = prev_avg_confidence
# Character confidence comparison (average over shared length)
min_length = min(len(top_plate), len(prev_plate))
if min_length > 0:
curr_char_conf = sum(top_char_confidences[:min_length]) / min_length
prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length
else:
curr_char_conf = prev_char_conf = 0
# Penalize any character below threshold
curr_min_conf = min(top_char_confidences) if top_char_confidences else 0
prev_min_conf = min(prev_char_confidences) if prev_char_confidences else 0
curr_conf_penalty = (
1.0 if curr_min_conf >= conf_threshold else (curr_min_conf / conf_threshold)
)
prev_conf_penalty = (
1.0 if prev_min_conf >= conf_threshold else (prev_min_conf / conf_threshold)
)
# 2. Define weights (boost confidence importance)
weights = {
"length": 0.2,
"area": 0.2,
"avg_confidence": 0.35,
"char_confidence": 0.25,
}
# 3. Calculate weighted scores with penalty
curr_score = (
curr_length_score * weights["length"]
+ curr_area_score * weights["area"]
+ curr_conf_score * weights["avg_confidence"]
+ curr_char_conf * weights["char_confidence"]
) * curr_conf_penalty
prev_score = (
prev_length_score * weights["length"]
+ prev_area_score * weights["area"]
+ prev_conf_score * weights["avg_confidence"]
+ prev_char_conf * weights["char_confidence"]
) * prev_conf_penalty
# 4. Log the comparison
best_cluster = clusters[best_cluster_idx]
best_size, best_max_conf = cluster_score(best_cluster)
logger.debug(
f"Plate comparison - Current: {top_plate} (score: {curr_score:.3f}, min_conf: {curr_min_conf:.2f}) vs "
f"Previous: {prev_plate} (score: {prev_score:.3f}, min_conf: {prev_min_conf:.2f}) "
f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), "
f"Area: {top_area} vs {prev_area}, "
f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}, "
f"Char Conf: {curr_char_conf:.2f} vs {prev_char_conf:.2f}"
f" Selected best cluster {best_cluster_idx + 1}: size {best_size}, max conf {best_max_conf:.3f}"
)
# 5. Return True if previous plate scores higher
return prev_score > curr_score
# Rep: highest conf in best cluster
rep = max(best_cluster, key=lambda v: v["conf"])
logger.debug(
f" Selected rep from best cluster: '{rep['plate']}' (conf: {rep['conf']:.3f})"
)
logger.debug(
f" Final clustered plate: '{rep['plate']}' (conf: {rep['conf']:.3f})"
)
return rep["plate"], rep["conf"], rep["char_confidences"], rep["area"]
def _generate_plate_event(self, camera: str, plate: str, plate_score: float) -> str:
"""Generate a unique ID for a plate event based on camera and text."""
@ -1228,7 +1200,7 @@ class LicensePlateProcessingMixin:
)
yolov9_start = datetime.datetime.now().timestamp()
license_plate = self._detect_license_plate(rgb)
license_plate = self._detect_license_plate(camera, rgb)
logger.debug(
f"{camera}: YOLOv9 LPD inference time: {(datetime.datetime.now().timestamp() - yolov9_start) * 1000:.2f} ms"
@ -1451,7 +1423,7 @@ class LicensePlateProcessingMixin:
f"{camera}: Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
logger.debug("No text detected")
logger.debug(f"{camera}: No text detected")
return
top_plate, top_char_confidences, top_area = (
@ -1494,9 +1466,7 @@ class LicensePlateProcessingMixin:
)
break
if plate_id is None:
plate_id = self._generate_plate_event(
obj_data, top_plate, avg_confidence
)
plate_id = self._generate_plate_event(camera, top_plate, avg_confidence)
logger.debug(
f"{camera}: New plate event for dedicated LPR camera {plate_id}: {top_plate}"
)
@ -1508,25 +1478,66 @@ class LicensePlateProcessingMixin:
id = plate_id
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
if self._should_keep_previous_plate(
id, top_plate, top_char_confidences, top_area, avg_confidence
):
logger.debug(f"{camera}: Keeping previous plate")
return
is_new = id not in self.detected_license_plates
# Collect variant
variant = {
"plate": top_plate,
"conf": avg_confidence,
"char_confidences": top_char_confidences,
"area": top_area,
"timestamp": current_time,
}
# Initialize or append to plates
self.detected_license_plates.setdefault(id, {"plates": [], "camera": camera})
self.detected_license_plates[id]["plates"].append(variant)
# Prune old variants
if len(self.detected_license_plates[id]["plates"]) > 10:
self.detected_license_plates[id]["plates"] = self.detected_license_plates[
id
]["plates"][-10:]
# Cluster and select rep
plates = self.detected_license_plates[id]["plates"]
rep_plate, rep_conf, rep_char_confs, rep_area = self._get_cluster_rep(plates)
if rep_plate != top_plate:
logger.debug(
f"{camera}: Clustering changed top plate '{top_plate}' (conf: {avg_confidence:.3f}) to rep '{rep_plate}' (conf: {rep_conf:.3f})"
)
# Update stored rep
self.detected_license_plates[id].update(
{
"plate": rep_plate,
"char_confidences": rep_char_confs,
"area": rep_area,
"last_seen": current_time if dedicated_lpr else None,
}
)
if not dedicated_lpr:
self.detected_license_plates[id]["obj_data"] = obj_data
if is_new:
if camera not in self.camera_current_cars:
self.camera_current_cars[camera] = []
self.camera_current_cars[camera].append(id)
# Determine subLabel based on known plates, use regex matching
# Default to the detected plate, use label name if there's a match
sub_label = None
try:
sub_label = next(
(
label
for label, plates in self.lpr_config.known_plates.items()
for label, plates_list in self.lpr_config.known_plates.items()
if any(
re.match(f"^{plate}$", top_plate)
or distance(plate, top_plate) <= self.lpr_config.match_distance
for plate in plates
re.match(f"^{plate}$", rep_plate)
or distance(plate, rep_plate) <= self.lpr_config.match_distance
for plate in plates_list
)
),
None,
@ -1535,12 +1546,11 @@ class LicensePlateProcessingMixin:
logger.error(
f"{camera}: Invalid regex in known plates configuration: {self.lpr_config.known_plates}"
)
sub_label = None
# If it's a known plate, publish to sub_label
if sub_label is not None:
self.sub_label_publisher.publish(
(id, sub_label, avg_confidence), EventMetadataTypeEnum.sub_label.value
(id, sub_label, rep_conf), EventMetadataTypeEnum.sub_label.value
)
# always publish to recognized_license_plate field
@ -1550,8 +1560,8 @@ class LicensePlateProcessingMixin:
{
"type": TrackedObjectUpdateTypesEnum.lpr,
"name": sub_label,
"plate": top_plate,
"score": avg_confidence,
"plate": rep_plate,
"score": rep_conf,
"id": id,
"camera": camera,
"timestamp": start,
@ -1559,7 +1569,7 @@ class LicensePlateProcessingMixin:
),
)
self.sub_label_publisher.publish(
(id, "recognized_license_plate", top_plate, avg_confidence),
(id, "recognized_license_plate", rep_plate, rep_conf),
EventMetadataTypeEnum.attribute.value,
)
@ -1569,7 +1579,7 @@ class LicensePlateProcessingMixin:
and "license_plate" not in self.config.cameras[camera].objects.track
):
logger.debug(
f"{camera}: Writing snapshot for {id}, {top_plate}, {current_time}"
f"{camera}: Writing snapshot for {id}, {rep_plate}, {current_time}"
)
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, encoded_img = cv2.imencode(".jpg", frame_bgr)
@ -1578,21 +1588,6 @@ class LicensePlateProcessingMixin:
EventMetadataTypeEnum.save_lpr_snapshot.value,
)
if id not in self.detected_license_plates:
if camera not in self.camera_current_cars:
self.camera_current_cars[camera] = []
self.camera_current_cars[camera].append(id)
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
"obj_data": obj_data,
"camera": camera,
"last_seen": current_time if dedicated_lpr else None,
}
def handle_request(self, topic, request_data) -> dict[str, Any] | None:
return