add support for multi-line plates

This commit is contained in:
Josh Hawkins 2025-04-15 09:26:03 -05:00
parent d10a35efea
commit 1d54ae7bb2

View File

@ -53,7 +53,7 @@ class LicensePlateProcessingMixin:
def _detect(self, image: np.ndarray) -> List[np.ndarray]: def _detect(self, image: np.ndarray) -> List[np.ndarray]:
""" """
Detect possible license plates in the input image by first resizing and normalizing it, Detect possible areas of text in the input image by first resizing and normalizing it,
running a detection model, and filtering out low-probability regions. running a detection model, and filtering out low-probability regions.
Args: Args:
@ -80,6 +80,13 @@ class LicensePlateProcessingMixin:
outputs = self.model_runner.detection_model([normalized_image])[0] outputs = self.model_runner.detection_model([normalized_image])[0]
outputs = outputs[0, :, :] outputs = outputs[0, :, :]
if False:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/probability_map_{current_time}.jpg",
(outputs * 255).astype(np.uint8),
)
boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self._filter_polygon(boxes, (h, w)) return self._filter_polygon(boxes, (h, w))
@ -125,9 +132,6 @@ class LicensePlateProcessingMixin:
input_shape = [3, 48, 320] input_shape = [3, 48, 320]
num_images = len(images) num_images = len(images)
# sort images by aspect ratio for processing
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
for index in range(0, num_images, self.batch_size): for index in range(0, num_images, self.batch_size):
input_h, input_w = input_shape[1], input_shape[2] input_h, input_w = input_shape[1], input_shape[2]
max_wh_ratio = input_w / input_h max_wh_ratio = input_w / input_h
@ -135,13 +139,13 @@ class LicensePlateProcessingMixin:
# calculate the maximum aspect ratio in the current batch # calculate the maximum aspect ratio in the current batch
for i in range(index, min(num_images, index + self.batch_size)): for i in range(index, min(num_images, index + self.batch_size)):
h, w = images[indices[i]].shape[0:2] h, w = images[i].shape[0:2]
max_wh_ratio = max(max_wh_ratio, w * 1.0 / h) max_wh_ratio = max(max_wh_ratio, w * 1.0 / h)
# preprocess the images based on the max aspect ratio # preprocess the images based on the max aspect ratio
for i in range(index, min(num_images, index + self.batch_size)): for i in range(index, min(num_images, index + self.batch_size)):
norm_image = self._preprocess_recognition_image( norm_image = self._preprocess_recognition_image(
camera, images[indices[i]], max_wh_ratio camera, images[i], max_wh_ratio
) )
norm_image = norm_image[np.newaxis, :] norm_image = norm_image[np.newaxis, :]
norm_images.append(norm_image) norm_images.append(norm_image)
@ -150,16 +154,20 @@ class LicensePlateProcessingMixin:
return self.ctc_decoder(outputs) return self.ctc_decoder(outputs)
def _process_license_plate( def _process_license_plate(
self, camera: string, id: string, image: np.ndarray self, camera: str, id: str, image: np.ndarray
) -> Tuple[List[str], List[float], List[int]]: ) -> Tuple[List[str], List[List[float]], List[int]]:
""" """
Complete pipeline for detecting, classifying, and recognizing license plates in the input image. Complete pipeline for detecting, classifying, and recognizing license plates in the input image.
Combines multi-line plates into a single plate string, grouping boxes by vertical alignment and ordering top to bottom,
but only combines boxes if their average confidence scores meet the threshold and their heights are similar.
Args: Args:
camera (str): Camera identifier.
id (str): Event identifier.
image (np.ndarray): The input image in which to detect, classify, and recognize license plates. image (np.ndarray): The input image in which to detect, classify, and recognize license plates.
Returns: Returns:
Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates. Tuple[List[str], List[List[float]], List[int]]: Detected license plate texts, character-level confidence scores for each plate (flattened into a single list per plate), and areas of the plates.
""" """
if ( if (
self.model_runner.detection_model.runner is None self.model_runner.detection_model.runner is None
@ -186,69 +194,162 @@ class LicensePlateProcessingMixin:
boxes, plate_width=plate_width, gap_fraction=0.1 boxes, plate_width=plate_width, gap_fraction=0.1
) )
boxes = self._sort_boxes(list(boxes))
plate_images = [self._crop_license_plate(image, x) for x in boxes]
current_time = int(datetime.datetime.now().timestamp()) current_time = int(datetime.datetime.now().timestamp())
if WRITE_DEBUG_IMAGES:
debug_image = image.copy()
for box in boxes:
box = box.astype(int)
x_min, y_min = np.min(box[:, 0]), np.min(box[:, 1])
x_max, y_max = np.max(box[:, 0]), np.max(box[:, 1])
cv2.rectangle(
debug_image,
(x_min, y_min),
(x_max, y_max),
color=(0, 255, 0),
thickness=2,
)
cv2.imwrite(
f"debug/frames/license_plate_boxes_{current_time}.jpg", debug_image
)
boxes = self._sort_boxes(list(boxes))
# Step 1: Compute box heights and group boxes by vertical alignment and height similarity
box_info = []
for i, box in enumerate(boxes):
y_coords = box[:, 1]
y_min, y_max = np.min(y_coords), np.max(y_coords)
height = y_max - y_min
box_info.append((y_min, y_max, height, i))
# Initial grouping based on y-coordinate overlap and height similarity
initial_groups = []
current_group = [box_info[0]]
height_tolerance = 0.25 # Allow 25% difference in height for grouping
for i in range(1, len(box_info)):
prev_y_min, prev_y_max, prev_height, _ = current_group[-1]
curr_y_min, _, curr_height, _ = box_info[i]
# Check y-coordinate overlap
overlap_threshold = 0.1 * (prev_y_max - prev_y_min)
overlaps = curr_y_min <= prev_y_max + overlap_threshold
# Check height similarity
height_ratio = min(prev_height, curr_height) / max(prev_height, curr_height)
height_similar = height_ratio >= (1 - height_tolerance)
if overlaps and height_similar:
current_group.append(box_info[i])
else:
initial_groups.append(current_group)
current_group = [box_info[i]]
initial_groups.append(current_group)
# Step 2: Process each initial group, filter by confidence
all_license_plates = []
all_confidences = []
all_areas = []
processed_indices = set()
recognition_threshold = self.lpr_config.recognition_threshold
for group in initial_groups:
# Sort group by y-coordinate (top to bottom)
group.sort(key=lambda x: x[0])
group_indices = [item[3] for item in group]
# Skip if all indices in this group have already been processed
if all(idx in processed_indices for idx in group_indices):
continue
# Crop images for the group
group_boxes = [boxes[i] for i in group_indices]
group_plate_images = [
self._crop_license_plate(image, box) for box in group_boxes
]
if WRITE_DEBUG_IMAGES: if WRITE_DEBUG_IMAGES:
for i, img in enumerate(plate_images): for i, img in enumerate(group_plate_images):
cv2.imwrite( cv2.imwrite(
f"debug/frames/license_plate_cropped_{current_time}_{i + 1}.jpg", f"debug/frames/license_plate_cropped_{current_time}_{group_indices[i] + 1}.jpg",
img, img,
) )
if self.config.lpr.debug_save_plates: if self.config.lpr.debug_save_plates:
logger.debug(f"{camera}: Saving plates for event {id}") logger.debug(f"{camera}: Saving plates for event {id}")
Path(os.path.join(CLIPS_DIR, f"lpr/{camera}/{id}")).mkdir( Path(os.path.join(CLIPS_DIR, f"lpr/{camera}/{id}")).mkdir(
parents=True, exist_ok=True parents=True, exist_ok=True
) )
for i, img in enumerate(group_plate_images):
for i, img in enumerate(plate_images):
cv2.imwrite( cv2.imwrite(
os.path.join( os.path.join(
CLIPS_DIR, f"lpr/{camera}/{id}/{current_time}_{i + 1}.jpg" CLIPS_DIR,
f"lpr/{camera}/{id}/{current_time}_{group_indices[i] + 1}.jpg",
), ),
img, img,
) )
# keep track of the index of each image for correct area calc later # Recognize text in each cropped image
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in plate_images]) results, confidences = self._recognize(camera, group_plate_images)
reverse_mapping = {
idx: original_idx for original_idx, idx in enumerate(sorted_indices)
}
results, confidences = self._recognize(camera, plate_images) if not results:
continue
if results: if not confidences:
license_plates = [""] * len(plate_images) confidences = [[0.0] for _ in results]
average_confidences = [[0.0]] * len(plate_images)
areas = [0] * len(plate_images)
# map results back to original image order # Compute average confidence for each box's recognized text
for i, (plate, conf) in enumerate(zip(results, confidences)): avg_confidences = []
original_idx = reverse_mapping[i] for conf_list in confidences:
avg_conf = sum(conf_list) / len(conf_list) if conf_list else 0.0
avg_confidences.append(avg_conf)
height, width = plate_images[original_idx].shape[:2] # Filter boxes based on the recognition threshold
area = height * width qualifying_indices = []
qualifying_results = []
qualifying_confidences = []
for i, (avg_conf, result, conf_list) in enumerate(
zip(avg_confidences, results, confidences)
):
if avg_conf >= recognition_threshold:
qualifying_indices.append(group_indices[i])
qualifying_results.append(result)
qualifying_confidences.append(conf_list)
average_confidence = conf if not qualifying_results:
continue
# set to True to write each cropped image for debugging processed_indices.update(qualifying_indices)
if False:
filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg"
cv2.imwrite(filename, plate_images[original_idx])
license_plates[original_idx] = plate # Combine the qualifying results into a single plate string
average_confidences[original_idx] = average_confidence combined_plate = " ".join(qualifying_results)
areas[original_idx] = area
# Filter out plates that have a length of less than min_plate_length characters flat_confidences = [
# or that don't match the expected format (if defined) conf for conf_list in qualifying_confidences for conf in conf_list
# Sort by area, then by plate length, then by confidence all desc ]
# Compute the combined area for qualifying boxes
qualifying_boxes = [boxes[i] for i in qualifying_indices]
qualifying_plate_images = [
self._crop_license_plate(image, box) for box in qualifying_boxes
]
group_areas = [
img.shape[0] * img.shape[1] for img in qualifying_plate_images
]
combined_area = sum(group_areas)
all_license_plates.append(combined_plate)
all_confidences.append(flat_confidences)
all_areas.append(combined_area)
# Step 3: Filter and sort the combined plates
if all_license_plates:
filtered_data = [] filtered_data = []
for plate, conf, area in zip(license_plates, average_confidences, areas): for plate, conf_list, area in zip(
all_license_plates, all_confidences, all_areas
):
if len(plate) < self.lpr_config.min_plate_length: if len(plate) < self.lpr_config.min_plate_length:
logger.debug( logger.debug(
f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})"
@ -261,11 +362,11 @@ class LicensePlateProcessingMixin:
logger.debug(f"Filtered out '{plate}' due to format mismatch") logger.debug(f"Filtered out '{plate}' due to format mismatch")
continue continue
filtered_data.append((plate, conf, area)) filtered_data.append((plate, conf_list, area))
sorted_data = sorted( sorted_data = sorted(
filtered_data, filtered_data,
key=lambda x: (x[2], len(x[0]), x[1]), key=lambda x: (x[2], len(x[0]), sum(x[1]) / len(x[1]) if x[1] else 0),
reverse=True, reverse=True,
) )
@ -428,40 +529,34 @@ class LicensePlateProcessingMixin:
contour = contours[index] contour = contours[index]
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length. # get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, min_side = self._get_min_boxes(contour) points, sside = self._get_min_boxes(contour)
if sside < self.min_size:
if min_side < self.min_size:
continue continue
points = np.array(points) points = np.array(points, dtype=np.float32)
score = self._box_score(output, contour) score = self._box_score(output, contour)
if self.box_thresh > score: if self.box_thresh > score:
continue continue
polygon = Polygon(points) points = self._expand_box(points)
distance = polygon.area / polygon.length
# Use pyclipper to shrink the polygon slightly based on the computed distance. # Get the minimum area rectangle again after expansion
offset = PyclipperOffset() points, sside = self._get_min_boxes(points.reshape(-1, 1, 2))
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) if sside < self.min_size + 2:
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
# get the minimum bounding box around the shrunken polygon.
box, min_side = self._get_min_boxes(points)
if min_side < self.min_size + 2:
continue continue
box = np.array(box) points = np.array(points, dtype=np.float32)
# normalize and clip box coordinates to fit within the destination image size. # normalize and clip box coordinates to fit within the destination image size.
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) points[:, 0] = np.clip(
box[:, 1] = np.clip( np.round(points[:, 0] / width * dest_width), 0, dest_width
np.round(box[:, 1] / height * dest_height), 0, dest_height )
points[:, 1] = np.clip(
np.round(points[:, 1] / height * dest_height), 0, dest_height
) )
boxes.append(box.astype("int32")) boxes.append(points.astype("int32"))
scores.append(score) scores.append(score)
return np.array(boxes, dtype="int32"), scores return np.array(boxes, dtype="int32"), scores
@ -969,14 +1064,19 @@ class LicensePlateProcessingMixin:
# Adjust length score based on confidence of extra characters # Adjust length score based on confidence of extra characters
conf_threshold = 0.75 # Minimum confidence for a character to be "trusted" conf_threshold = 0.75 # Minimum confidence for a character to be "trusted"
if len(top_plate) > len(prev_plate): top_plate_char_count = len(top_plate.replace(" ", ""))
extra_conf = min( prev_plate_char_count = len(prev_plate.replace(" ", ""))
top_char_confidences[len(prev_plate) :]
) # Lowest extra char confidence if top_plate_char_count > prev_plate_char_count:
extra_confidences = top_char_confidences[prev_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences) # Lowest extra char confidence
if extra_conf < conf_threshold: if extra_conf < conf_threshold:
curr_length_score *= extra_conf / conf_threshold # Penalize if weak curr_length_score *= extra_conf / conf_threshold # Penalize if weak
elif len(prev_plate) > len(top_plate): elif prev_plate_char_count > top_plate_char_count:
extra_conf = min(prev_char_confidences[len(top_plate) :]) extra_confidences = prev_char_confidences[top_plate_char_count:]
if extra_confidences: # Ensure the slice is not empty
extra_conf = min(extra_confidences)
if extra_conf < conf_threshold: if extra_conf < conf_threshold:
prev_length_score *= extra_conf / conf_threshold prev_length_score *= extra_conf / conf_threshold