From 1d54ae7bb26c1e58d3df88f33b4b9df74feb0f24 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Tue, 15 Apr 2025 09:26:03 -0500 Subject: [PATCH] add support for multi-line plates --- .../common/license_plate/mixin.py | 274 ++++++++++++------ 1 file changed, 187 insertions(+), 87 deletions(-) diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index 8d514b197e..64ccde368f 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -53,7 +53,7 @@ class LicensePlateProcessingMixin: def _detect(self, image: np.ndarray) -> List[np.ndarray]: """ - Detect possible license plates in the input image by first resizing and normalizing it, + Detect possible areas of text in the input image by first resizing and normalizing it, running a detection model, and filtering out low-probability regions. Args: @@ -80,6 +80,13 @@ class LicensePlateProcessingMixin: outputs = self.model_runner.detection_model([normalized_image])[0] outputs = outputs[0, :, :] + if False: + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/probability_map_{current_time}.jpg", + (outputs * 255).astype(np.uint8), + ) + boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) return self._filter_polygon(boxes, (h, w)) @@ -125,9 +132,6 @@ class LicensePlateProcessingMixin: input_shape = [3, 48, 320] num_images = len(images) - # sort images by aspect ratio for processing - indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) - for index in range(0, num_images, self.batch_size): input_h, input_w = input_shape[1], input_shape[2] max_wh_ratio = input_w / input_h @@ -135,13 +139,13 @@ class LicensePlateProcessingMixin: # calculate the maximum aspect ratio in the current batch for i in range(index, min(num_images, index + self.batch_size)): - h, w = images[indices[i]].shape[0:2] + h, w = images[i].shape[0:2] max_wh_ratio = max(max_wh_ratio, w * 1.0 / h) # preprocess the images based on the max aspect ratio for i in range(index, min(num_images, index + self.batch_size)): norm_image = self._preprocess_recognition_image( - camera, images[indices[i]], max_wh_ratio + camera, images[i], max_wh_ratio ) norm_image = norm_image[np.newaxis, :] norm_images.append(norm_image) @@ -150,16 +154,20 @@ class LicensePlateProcessingMixin: return self.ctc_decoder(outputs) def _process_license_plate( - self, camera: string, id: string, image: np.ndarray - ) -> Tuple[List[str], List[float], List[int]]: + self, camera: str, id: str, image: np.ndarray + ) -> Tuple[List[str], List[List[float]], List[int]]: """ Complete pipeline for detecting, classifying, and recognizing license plates in the input image. + Combines multi-line plates into a single plate string, grouping boxes by vertical alignment and ordering top to bottom, + but only combines boxes if their average confidence scores meet the threshold and their heights are similar. Args: + camera (str): Camera identifier. + id (str): Event identifier. image (np.ndarray): The input image in which to detect, classify, and recognize license plates. Returns: - Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates. + Tuple[List[str], List[List[float]], List[int]]: Detected license plate texts, character-level confidence scores for each plate (flattened into a single list per plate), and areas of the plates. """ if ( self.model_runner.detection_model.runner is None @@ -186,69 +194,162 @@ class LicensePlateProcessingMixin: boxes, plate_width=plate_width, gap_fraction=0.1 ) - boxes = self._sort_boxes(list(boxes)) - plate_images = [self._crop_license_plate(image, x) for x in boxes] - current_time = int(datetime.datetime.now().timestamp()) - if WRITE_DEBUG_IMAGES: - for i, img in enumerate(plate_images): - cv2.imwrite( - f"debug/frames/license_plate_cropped_{current_time}_{i + 1}.jpg", - img, + debug_image = image.copy() + for box in boxes: + box = box.astype(int) + x_min, y_min = np.min(box[:, 0]), np.min(box[:, 1]) + x_max, y_max = np.max(box[:, 0]), np.max(box[:, 1]) + cv2.rectangle( + debug_image, + (x_min, y_min), + (x_max, y_max), + color=(0, 255, 0), + thickness=2, ) - if self.config.lpr.debug_save_plates: - logger.debug(f"{camera}: Saving plates for event {id}") - - Path(os.path.join(CLIPS_DIR, f"lpr/{camera}/{id}")).mkdir( - parents=True, exist_ok=True + cv2.imwrite( + f"debug/frames/license_plate_boxes_{current_time}.jpg", debug_image ) - for i, img in enumerate(plate_images): - cv2.imwrite( - os.path.join( - CLIPS_DIR, f"lpr/{camera}/{id}/{current_time}_{i + 1}.jpg" - ), - img, + boxes = self._sort_boxes(list(boxes)) + + # Step 1: Compute box heights and group boxes by vertical alignment and height similarity + box_info = [] + for i, box in enumerate(boxes): + y_coords = box[:, 1] + y_min, y_max = np.min(y_coords), np.max(y_coords) + height = y_max - y_min + box_info.append((y_min, y_max, height, i)) + + # Initial grouping based on y-coordinate overlap and height similarity + initial_groups = [] + current_group = [box_info[0]] + height_tolerance = 0.25 # Allow 25% difference in height for grouping + + for i in range(1, len(box_info)): + prev_y_min, prev_y_max, prev_height, _ = current_group[-1] + curr_y_min, _, curr_height, _ = box_info[i] + + # Check y-coordinate overlap + overlap_threshold = 0.1 * (prev_y_max - prev_y_min) + overlaps = curr_y_min <= prev_y_max + overlap_threshold + + # Check height similarity + height_ratio = min(prev_height, curr_height) / max(prev_height, curr_height) + height_similar = height_ratio >= (1 - height_tolerance) + + if overlaps and height_similar: + current_group.append(box_info[i]) + else: + initial_groups.append(current_group) + current_group = [box_info[i]] + initial_groups.append(current_group) + + # Step 2: Process each initial group, filter by confidence + all_license_plates = [] + all_confidences = [] + all_areas = [] + processed_indices = set() + + recognition_threshold = self.lpr_config.recognition_threshold + + for group in initial_groups: + # Sort group by y-coordinate (top to bottom) + group.sort(key=lambda x: x[0]) + group_indices = [item[3] for item in group] + + # Skip if all indices in this group have already been processed + if all(idx in processed_indices for idx in group_indices): + continue + + # Crop images for the group + group_boxes = [boxes[i] for i in group_indices] + group_plate_images = [ + self._crop_license_plate(image, box) for box in group_boxes + ] + + if WRITE_DEBUG_IMAGES: + for i, img in enumerate(group_plate_images): + cv2.imwrite( + f"debug/frames/license_plate_cropped_{current_time}_{group_indices[i] + 1}.jpg", + img, + ) + + if self.config.lpr.debug_save_plates: + logger.debug(f"{camera}: Saving plates for event {id}") + Path(os.path.join(CLIPS_DIR, f"lpr/{camera}/{id}")).mkdir( + parents=True, exist_ok=True ) + for i, img in enumerate(group_plate_images): + cv2.imwrite( + os.path.join( + CLIPS_DIR, + f"lpr/{camera}/{id}/{current_time}_{group_indices[i] + 1}.jpg", + ), + img, + ) - # keep track of the index of each image for correct area calc later - sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in plate_images]) - reverse_mapping = { - idx: original_idx for original_idx, idx in enumerate(sorted_indices) - } + # Recognize text in each cropped image + results, confidences = self._recognize(camera, group_plate_images) - results, confidences = self._recognize(camera, plate_images) + if not results: + continue - if results: - license_plates = [""] * len(plate_images) - average_confidences = [[0.0]] * len(plate_images) - areas = [0] * len(plate_images) + if not confidences: + confidences = [[0.0] for _ in results] - # map results back to original image order - for i, (plate, conf) in enumerate(zip(results, confidences)): - original_idx = reverse_mapping[i] + # Compute average confidence for each box's recognized text + avg_confidences = [] + for conf_list in confidences: + avg_conf = sum(conf_list) / len(conf_list) if conf_list else 0.0 + avg_confidences.append(avg_conf) - height, width = plate_images[original_idx].shape[:2] - area = height * width + # Filter boxes based on the recognition threshold + qualifying_indices = [] + qualifying_results = [] + qualifying_confidences = [] + for i, (avg_conf, result, conf_list) in enumerate( + zip(avg_confidences, results, confidences) + ): + if avg_conf >= recognition_threshold: + qualifying_indices.append(group_indices[i]) + qualifying_results.append(result) + qualifying_confidences.append(conf_list) - average_confidence = conf + if not qualifying_results: + continue - # set to True to write each cropped image for debugging - if False: - filename = f"debug/frames/plate_{original_idx}_{plate}_{area}.jpg" - cv2.imwrite(filename, plate_images[original_idx]) + processed_indices.update(qualifying_indices) - license_plates[original_idx] = plate - average_confidences[original_idx] = average_confidence - areas[original_idx] = area + # Combine the qualifying results into a single plate string + combined_plate = " ".join(qualifying_results) - # Filter out plates that have a length of less than min_plate_length characters - # or that don't match the expected format (if defined) - # Sort by area, then by plate length, then by confidence all desc + flat_confidences = [ + conf for conf_list in qualifying_confidences for conf in conf_list + ] + + # Compute the combined area for qualifying boxes + qualifying_boxes = [boxes[i] for i in qualifying_indices] + qualifying_plate_images = [ + self._crop_license_plate(image, box) for box in qualifying_boxes + ] + group_areas = [ + img.shape[0] * img.shape[1] for img in qualifying_plate_images + ] + combined_area = sum(group_areas) + + all_license_plates.append(combined_plate) + all_confidences.append(flat_confidences) + all_areas.append(combined_area) + + # Step 3: Filter and sort the combined plates + if all_license_plates: filtered_data = [] - for plate, conf, area in zip(license_plates, average_confidences, areas): + for plate, conf_list, area in zip( + all_license_plates, all_confidences, all_areas + ): if len(plate) < self.lpr_config.min_plate_length: logger.debug( f"Filtered out '{plate}' due to length ({len(plate)} < {self.lpr_config.min_plate_length})" @@ -261,11 +362,11 @@ class LicensePlateProcessingMixin: logger.debug(f"Filtered out '{plate}' due to format mismatch") continue - filtered_data.append((plate, conf, area)) + filtered_data.append((plate, conf_list, area)) sorted_data = sorted( filtered_data, - key=lambda x: (x[2], len(x[0]), x[1]), + key=lambda x: (x[2], len(x[0]), sum(x[1]) / len(x[1]) if x[1] else 0), reverse=True, ) @@ -428,40 +529,34 @@ class LicensePlateProcessingMixin: contour = contours[index] # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. - points, min_side = self._get_min_boxes(contour) - - if min_side < self.min_size: + points, sside = self._get_min_boxes(contour) + if sside < self.min_size: continue - points = np.array(points) + points = np.array(points, dtype=np.float32) score = self._box_score(output, contour) if self.box_thresh > score: continue - polygon = Polygon(points) - distance = polygon.area / polygon.length + points = self._expand_box(points) - # Use pyclipper to shrink the polygon slightly based on the computed distance. - offset = PyclipperOffset() - offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) - points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2)) - - # get the minimum bounding box around the shrunken polygon. - box, min_side = self._get_min_boxes(points) - - if min_side < self.min_size + 2: + # Get the minimum area rectangle again after expansion + points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) + if sside < self.min_size + 2: continue - box = np.array(box) + points = np.array(points, dtype=np.float32) # normalize and clip box coordinates to fit within the destination image size. - box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) - box[:, 1] = np.clip( - np.round(box[:, 1] / height * dest_height), 0, dest_height + points[:, 0] = np.clip( + np.round(points[:, 0] / width * dest_width), 0, dest_width + ) + points[:, 1] = np.clip( + np.round(points[:, 1] / height * dest_height), 0, dest_height ) - boxes.append(box.astype("int32")) + boxes.append(points.astype("int32")) scores.append(score) return np.array(boxes, dtype="int32"), scores @@ -969,16 +1064,21 @@ class LicensePlateProcessingMixin: # Adjust length score based on confidence of extra characters conf_threshold = 0.75 # Minimum confidence for a character to be "trusted" - if len(top_plate) > len(prev_plate): - extra_conf = min( - top_char_confidences[len(prev_plate) :] - ) # Lowest extra char confidence - if extra_conf < conf_threshold: - curr_length_score *= extra_conf / conf_threshold # Penalize if weak - elif len(prev_plate) > len(top_plate): - extra_conf = min(prev_char_confidences[len(top_plate) :]) - if extra_conf < conf_threshold: - prev_length_score *= extra_conf / conf_threshold + top_plate_char_count = len(top_plate.replace(" ", "")) + prev_plate_char_count = len(prev_plate.replace(" ", "")) + + if top_plate_char_count > prev_plate_char_count: + extra_confidences = top_char_confidences[prev_plate_char_count:] + if extra_confidences: # Ensure the slice is not empty + extra_conf = min(extra_confidences) # Lowest extra char confidence + if extra_conf < conf_threshold: + curr_length_score *= extra_conf / conf_threshold # Penalize if weak + elif prev_plate_char_count > top_plate_char_count: + extra_confidences = prev_char_confidences[top_plate_char_count:] + if extra_confidences: # Ensure the slice is not empty + extra_conf = min(extra_confidences) + if extra_conf < conf_threshold: + prev_length_score *= extra_conf / conf_threshold # Area score: Normalize by max area max_area = max(top_area, prev_area)