From 472b0ef5d2f1b4a0f0f18e725e3d08bb0d8f4793 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:50:22 -0500 Subject: [PATCH] initial foundation for alpr with paddleocr --- .cspell/frigate-dictionary.txt | 4 +- docker/main/requirements-wheels.txt | 3 + frigate/embeddings/alpr/alpr.py | 858 +++++++++++++++++++++++++++ frigate/embeddings/functions/onnx.py | 26 + 4 files changed, 890 insertions(+), 1 deletion(-) create mode 100644 frigate/embeddings/alpr/alpr.py diff --git a/.cspell/frigate-dictionary.txt b/.cspell/frigate-dictionary.txt index b019f8492..2849bc2ef 100644 --- a/.cspell/frigate-dictionary.txt +++ b/.cspell/frigate-dictionary.txt @@ -2,6 +2,7 @@ aarch absdiff airockchip Alloc +alpr Amcrest amdgpu analyzeduration @@ -186,6 +187,7 @@ openai opencv openvino OWASP +paddleocr paho passwordless popleft @@ -305,4 +307,4 @@ yolo yolonas yolox zeep -zerolatency +zerolatency \ No newline at end of file diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index c48b99c75..1d9244f1d 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -45,3 +45,6 @@ openai == 1.51.* # push notifications py-vapid == 1.9.* pywebpush == 2.0.* +# alpr +pyclipper == 1.3.* +shapely == 2.0.* diff --git a/frigate/embeddings/alpr/alpr.py b/frigate/embeddings/alpr/alpr.py new file mode 100644 index 000000000..e8c039542 --- /dev/null +++ b/frigate/embeddings/alpr/alpr.py @@ -0,0 +1,858 @@ +import math +from argparse import ArgumentParser +from typing import Any, Dict, List, Tuple + +import cv2 +import numpy as np +from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset +from shapely.geometry import Polygon + +from frigate.comms.inter_process import InterProcessRequestor +from frigate.embeddings.functions.onnx import GenericONNXEmbedding, ModelTypeEnum + + +class LicensePlateRecognition: + def __init__(self, config: Dict[str, Any], requestor: InterProcessRequestor): + self.config = config + self.requestor = requestor + self.detection_model = self._create_detection_model() + self.classification_model = self._create_classification_model() + self.recognition_model = self._create_recognition_model() + self.ctc_decoder = CTCDecoder() + + self.batch_size = 6 + + # Detection specific parameters + self.min_size = 3 + self.max_size = 960 + self.box_thresh = 0.8 + self.mask_thresh = 0.8 + self.mean = np.array([123.675, 116.28, 103.53]).reshape(1, -1).astype("float64") + self.std = 1 / np.array([58.395, 57.12, 57.375]).reshape(1, -1).astype( + "float64" + ) + + def _create_detection_model(self) -> GenericONNXEmbedding: + return GenericONNXEmbedding( + model_name="paddleocr-onnx", + model_file="detection.onnx", + download_urls={ + "detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.alpr_detect, + requestor=self.requestor, + device="CPU", + ) + + def _create_classification_model(self) -> GenericONNXEmbedding: + return GenericONNXEmbedding( + model_name="paddleocr-onnx", + model_file="classification.onnx", + download_urls={ + "classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.alpr_classify, + requestor=self.requestor, + device="CPU", + ) + + def _create_recognition_model(self) -> GenericONNXEmbedding: + return GenericONNXEmbedding( + model_name="paddleocr-onnx", + model_file="recognition.onnx", + download_urls={ + "recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx" + }, + model_size="large", + model_type=ModelTypeEnum.alpr_recognize, + requestor=self.requestor, + device="CPU", + ) + + def detect(self, image: np.ndarray) -> List[np.ndarray]: + """ + Detect possible license plates in the input image by first resizing and normalizing it, + running a detection model, and filtering out low-probability regions. + + Args: + image (np.ndarray): The input image in which license plates will be detected. + + Returns: + List[np.ndarray]: A list of bounding box coordinates representing detected license plates. + """ + h, w = image.shape[:2] + + if sum([h, w]) < 64: + image = self.zero_pad(image) + + resized_image = self.resize_image(image) + normalized_image = self.normalize_image(resized_image) + + outputs = self.detection_model([normalized_image])[0] + outputs = outputs[0, :, :] + + boxes, _ = self.boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) + return self.filter_polygon(boxes, (h, w)) + + def classify( + self, images: List[np.ndarray] + ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: + """ + Classify the orientation or category of each detected license plate. + + Args: + images (List[np.ndarray]): A list of images of detected license plates. + + Returns: + Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of rotated/normalized plate images + and classification results with confidence scores. + """ + num_images = len(images) + indices = np.argsort([x.shape[1] / x.shape[0] for x in images]) + + for i in range(0, num_images, self.batch_size): + norm_images = [] + for j in range(i, min(num_images, i + self.batch_size)): + norm_img = self._preprocess_classification_image(images[indices[j]]) + norm_img = norm_img[np.newaxis, :] + norm_images.append(norm_img) + + outputs = self.classification_model(norm_images) + + return self._process_classification_output(images, outputs) + + def recognize( + self, images: List[np.ndarray] + ) -> Tuple[List[str], List[List[float]]]: + """ + Recognize the characters on the detected license plates using the recognition model. + + Args: + images (List[np.ndarray]): A list of images of license plates to recognize. + + Returns: + Tuple[List[str], List[List[float]]]: A tuple of recognized license plate texts and confidence scores. + """ + input_shape = [3, 48, 320] + num_images = len(images) + + # sort images by aspect ratio for processing + indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) + + for index in range(0, num_images, self.batch_size): + input_h, input_w = input_shape[1], input_shape[2] + max_wh_ratio = input_w / input_h + norm_images = [] + + # calculate the maximum aspect ratio in the current batch + for i in range(index, min(num_images, index + self.batch_size)): + h, w = images[indices[i]].shape[0:2] + max_wh_ratio = max(max_wh_ratio, w * 1.0 / h) + + # preprocess the images based on the max aspect ratio + for i in range(index, min(num_images, index + self.batch_size)): + norm_image = self._preprocess_recognition_image( + images[indices[i]], max_wh_ratio + ) + norm_image = norm_image[np.newaxis, :] + norm_images.append(norm_image) + + outputs = self.recognition_model(norm_images) + return self.ctc_decoder(outputs) + + def process_license_plate( + self, image: np.ndarray + ) -> Tuple[List[str], List[float], List[int]]: + """ + Complete pipeline for detecting, classifying, and recognizing license plates in the input image. + + Args: + image (np.ndarray): The input image in which to detect, classify, and recognize license plates. + + Returns: + Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates. + """ + plate_points = self.detect(image) + if len(plate_points) == 0: + return [], [], [] + + plate_points = self.sort_polygon(list(plate_points)) + plate_images = [self._crop_license_plate(image, x) for x in plate_points] + rotated_images, _ = self.classify(plate_images) + + # keep track of the index of each image for correct area calc later + sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images]) + reverse_mapping = { + idx: original_idx for original_idx, idx in enumerate(sorted_indices) + } + + results, confidences = self.recognize(rotated_images) + + if results: + license_plates = [""] * len(rotated_images) + average_confidences = [0.0] * len(rotated_images) + areas = [0] * len(rotated_images) + + # map results back to original image order + for i, (plate, conf) in enumerate(zip(results, confidences)): + original_idx = reverse_mapping[i] + + height, width = rotated_images[original_idx].shape[:2] + area = height * width + + average_confidence = sum(conf) / len(conf) if conf else 0 + + # TODO: remove + if False: + save_image = cv2.cvtColor( + rotated_images[original_idx], cv2.COLOR_RGB2BGR + ) + filename = f"/config/plate_{original_idx}_{plate}_{area}.jpg" + cv2.imwrite(filename, save_image) + + license_plates[original_idx] = plate + average_confidences[original_idx] = average_confidence + areas[original_idx] = area + + return license_plates, average_confidences, areas + + return [], [], [] + + def resize_image(self, image: np.ndarray) -> np.ndarray: + """ + Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32. + + Args: + image (np.ndarray): The input image to resize. + + Returns: + np.ndarray: The resized image. + """ + h, w = image.shape[:2] + ratio = min(self.max_size / max(h, w), 1.0) + resize_h = max(int(round(int(h * ratio) / 32) * 32), 32) + resize_w = max(int(round(int(w * ratio) / 32) * 32), 32) + return cv2.resize(image, (resize_w, resize_h)) + + def normalize_image(self, image: np.ndarray) -> np.ndarray: + """ + Normalize the input image by subtracting the mean and multiplying by the standard deviation. + + Args: + image (np.ndarray): The input image to normalize. + + Returns: + np.ndarray: The normalized image, transposed to match the model's expected input format. + """ + image = image.astype("float32") + cv2.subtract(image, self.mean, image) + cv2.multiply(image, self.std, image) + return image.transpose((2, 0, 1))[np.newaxis, ...] + + def boxes_from_bitmap( + self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int + ) -> Tuple[np.ndarray, List[float]]: + """ + Process the binary mask to extract bounding boxes and associated confidence scores. + + Args: + output (np.ndarray): Output confidence map from the model. + mask (np.ndarray): Binary mask of detected regions. + dest_width (int): Target width for scaling the box coordinates. + dest_height (int): Target height for scaling the box coordinates. + + Returns: + Tuple[np.ndarray, List[float]]: Array of bounding boxes and list of corresponding scores. + """ + + mask = (mask * 255).astype(np.uint8) + height, width = mask.shape + outs = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) + + # handle different return values of findContours between OpenCV versions + contours = outs[0] if len(outs) == 2 else outs[1] + + boxes = [] + scores = [] + + for index in range(len(contours)): + contour = contours[index] + + # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. + points, min_side = self.get_min_boxes(contour) + + if min_side < self.min_size: + continue + + points = np.array(points) + + score = self.box_score(output, contour) + if self.box_thresh > score: + continue + + polygon = Polygon(points) + distance = polygon.area / polygon.length + + # Use pyclipper to shrink the polygon slightly based on the computed distance. + offset = PyclipperOffset() + offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) + points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2)) + + # get the minimum bounding box around the shrunken polygon. + box, min_side = self.get_min_boxes(points) + + if min_side < self.min_size + 2: + continue + + box = np.array(box) + + # normalize and clip box coordinates to fit within the destination image size. + box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) + box[:, 1] = np.clip( + np.round(box[:, 1] / height * dest_height), 0, dest_height + ) + + boxes.append(box.astype("int32")) + scores.append(score) + + return np.array(boxes, dtype="int32"), scores + + @staticmethod + def get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]: + """ + Calculate the minimum bounding box (rotated rectangle) for a given contour. + + Args: + contour (np.ndarray): The contour points of the detected shape. + + Returns: + Tuple[List[Tuple[float, float]], float]: A list of four points representing the + corners of the bounding box, and the length of the shortest side. + """ + bounding_box = cv2.minAreaRect(contour) + points = sorted(cv2.boxPoints(bounding_box), key=lambda x: x[0]) + index_1, index_4 = (0, 1) if points[1][1] > points[0][1] else (1, 0) + index_2, index_3 = (2, 3) if points[3][1] > points[2][1] else (3, 2) + box = [points[index_1], points[index_2], points[index_3], points[index_4]] + return box, min(bounding_box[1]) + + @staticmethod + def box_score(bitmap: np.ndarray, contour: np.ndarray) -> float: + """ + Calculate the average score within the bounding box of a contour. + + Args: + bitmap (np.ndarray): The output confidence map from the model. + contour (np.ndarray): The contour of the detected shape. + + Returns: + float: The average score of the pixels inside the contour region. + """ + h, w = bitmap.shape[:2] + contour = contour.reshape(-1, 2) + x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1]) + x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1]) + mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8) + cv2.fillPoly(mask, [contour - [x1, y1]], 1) + return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] + + @staticmethod + def expand_box(points: List[Tuple[float, float]]) -> np.ndarray: + """ + Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio. + + Args: + points (List[Tuple[float, float]]): Points of the polygon to expand. + + Returns: + np.ndarray: Expanded polygon points. + """ + polygon = Polygon(points) + distance = polygon.area / polygon.length + offset = PyclipperOffset() + offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON) + expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2)) + return expanded + + def filter_polygon( + self, points: List[np.ndarray], shape: Tuple[int, int] + ) -> np.ndarray: + """ + Filter a set of polygons to include only valid ones that fit within an image shape + and meet size constraints. + + Args: + points (List[np.ndarray]): List of polygons to filter. + shape (Tuple[int, int]): Shape of the image (height, width). + + Returns: + np.ndarray: List of filtered polygons. + """ + height, width = shape + return np.array( + [ + self.clockwise_order(point) + for point in points + if self.is_valid_polygon(point, width, height) + ] + ) + + @staticmethod + def is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool: + """ + Check if a polygon is valid, meaning it fits within the image bounds + and has sides of a minimum length. + + Args: + point (np.ndarray): The polygon to validate. + width (int): Image width. + height (int): Image height. + + Returns: + bool: Whether the polygon is valid or not. + """ + return ( + point[:, 0].min() >= 0 + and point[:, 0].max() < width + and point[:, 1].min() >= 0 + and point[:, 1].max() < height + and np.linalg.norm(point[0] - point[1]) > 3 + and np.linalg.norm(point[0] - point[3]) > 3 + ) + + @staticmethod + def clockwise_order(point: np.ndarray) -> np.ndarray: + """ + Arrange the points of a polygon in clockwise order based on their angular positions + around the polygon's center. + + Args: + point (np.ndarray): Array of points of the polygon. + + Returns: + np.ndarray: Points ordered in clockwise direction. + """ + center = point.mean(axis=0) + return point[ + np.argsort(np.arctan2(point[:, 1] - center[1], point[:, 0] - center[0])) + ] + + @staticmethod + def sort_polygon(points): + """ + Sort polygons based on their position in the image. If polygons are close in vertical + position (within 10 pixels), sort them by horizontal position. + + Args: + points: List of polygons to sort. + + Returns: + List: Sorted list of polygons. + """ + points.sort(key=lambda x: (x[0][1], x[0][0])) + for i in range(len(points) - 1): + for j in range(i, -1, -1): + if abs(points[j + 1][0][1] - points[j][0][1]) < 10 and ( + points[j + 1][0][0] < points[j][0][0] + ): + temp = points[j] + points[j] = points[j + 1] + points[j + 1] = temp + else: + break + return points + + @staticmethod + def zero_pad(image: np.ndarray) -> np.ndarray: + """ + Apply zero-padding to an image, ensuring its dimensions are at least 32x32. + The padding is added only if needed. + + Args: + image (np.ndarray): Input image. + + Returns: + np.ndarray: Zero-padded image. + """ + h, w, c = image.shape + pad = np.zeros((max(32, h), max(32, w), c), np.uint8) + pad[:h, :w, :] = image + return pad + + @staticmethod + def _preprocess_classification_image(image: np.ndarray) -> np.ndarray: + """ + Preprocess a single image for classification by resizing, normalizing, and padding. + + This method resizes the input image to a fixed height of 48 pixels while adjusting + the width dynamically up to a maximum of 192 pixels. The image is then normalized and + padded to fit the required input dimensions for classification. + + Args: + image (np.ndarray): Input image to preprocess. + + Returns: + np.ndarray: Preprocessed and padded image. + """ + # fixed height of 48, dynamic width up to 192 + input_shape = (3, 48, 192) + input_c, input_h, input_w = input_shape + + h, w = image.shape[:2] + ratio = w / h + resized_w = min(input_w, math.ceil(input_h * ratio)) + + resized_image = cv2.resize(image, (resized_w, input_h)) + + # handle single-channel images (grayscale) if needed + if input_c == 1 and resized_image.ndim == 2: + resized_image = resized_image[np.newaxis, :, :] + else: + resized_image = resized_image.transpose((2, 0, 1)) + + # normalize + resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 + + padded_image = np.zeros((input_c, input_h, input_w), dtype=np.float32) + padded_image[:, :, :resized_w] = resized_image + + return padded_image + + def _process_classification_output( + self, images: List[np.ndarray], outputs: List[np.ndarray] + ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: + """ + Process the classification model output by matching labels with confidence scores. + + This method processes the outputs from the classification model and rotates images + with high confidence of being labeled "180". It ensures that results are mapped to + the original image order. + + Args: + images (List[np.ndarray]): List of input images. + outputs (List[np.ndarray]): Corresponding model outputs. + + Returns: + Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of processed images and + classification results (label and confidence score). + """ + labels = ["0", "180"] + results = [["", 0.0]] * len(images) + indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) + + outputs = np.stack(outputs) + + outputs = [ + (labels[idx], outputs[i, idx]) + for i, idx in enumerate(outputs.argmax(axis=1)) + ] + + for i in range(0, len(images), self.batch_size): + for j in range(len(outputs)): + label, score = outputs[j] + results[indices[i + j]] = [label, score] + if "180" in label and score > self.config.get( + "classification_threshold", 0.98 + ): + images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1) + + return images, results + + def _preprocess_recognition_image( + self, image: np.ndarray, max_wh_ratio: float + ) -> np.ndarray: + """ + Preprocess an image for recognition by dynamically adjusting its width. + + This method adjusts the width of the image based on the maximum width-to-height ratio + while keeping the height fixed at 48 pixels. The image is then normalized and padded + to fit the required input dimensions for recognition. + + Args: + image (np.ndarray): Input image to preprocess. + max_wh_ratio (float): Maximum width-to-height ratio for resizing. + + Returns: + np.ndarray: Preprocessed and padded image. + """ + # fixed height of 48, dynamic width based on ratio + input_shape = [3, 48, 320] + input_h, input_w = input_shape[1], input_shape[2] + + assert image.shape[2] == input_shape[0], "Unexpected number of image channels." + + # dynamically adjust input width based on max_wh_ratio + input_w = int(input_h * max_wh_ratio) + + # check for model-specific input width + model_input_w = self.recognition_model.runner.ort.get_inputs()[0].shape[3] + if isinstance(model_input_w, int) and model_input_w > 0: + input_w = model_input_w + + h, w = image.shape[:2] + aspect_ratio = w / h + resized_w = min(input_w, math.ceil(input_h * aspect_ratio)) + + resized_image = cv2.resize(image, (resized_w, input_h)) + resized_image = resized_image.transpose((2, 0, 1)) + resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5 + + padded_image = np.zeros((input_shape[0], input_h, input_w), dtype=np.float32) + padded_image[:, :, :resized_w] = resized_image + + return padded_image + + @staticmethod + def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray: + """ + Crop the license plate from the image using four corner points. + + This method crops the region containing the license plate by using the perspective + transformation based on four corner points. If the resulting image is significantly + taller than wide, the image is rotated to the correct orientation. + + Args: + image (np.ndarray): Input image containing the license plate. + points (np.ndarray): Four corner points defining the plate's position. + + Returns: + np.ndarray: Cropped and potentially rotated license plate image. + """ + assert len(points) == 4, "shape of points must be 4*2" + points = points.astype(np.float32) + crop_width = int( + max( + np.linalg.norm(points[0] - points[1]), + np.linalg.norm(points[2] - points[3]), + ) + ) + crop_height = int( + max( + np.linalg.norm(points[0] - points[3]), + np.linalg.norm(points[1] - points[2]), + ) + ) + pts_std = np.float32( + [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]] + ) + matrix = cv2.getPerspectiveTransform(points, pts_std) + image = cv2.warpPerspective( + image, + matrix, + (crop_width, crop_height), + borderMode=cv2.BORDER_REPLICATE, + flags=cv2.INTER_CUBIC, + ) + height, width = image.shape[0:2] + if height * 1.0 / width >= 1.5: + image = np.rot90(image, k=3) + return image + + +class CTCDecoder: + """ + A decoder for interpreting the output of a CTC (Connectionist Temporal Classification) model. + + This decoder converts the model's output probabilities into readable sequences of characters + while removing duplicates and handling blank tokens. It also calculates the confidence scores + for each decoded character sequence. + """ + + def __init__(self): + """ + Initialize the CTCDecoder with a list of characters and a character map. + + The character set includes digits, letters, special characters, and a "blank" token + (used by the CTC model for decoding purposes). A character map is created to map + indices to characters. + """ + self.characters = [ + "blank", + "0", + "1", + "2", + "3", + "4", + "5", + "6", + "7", + "8", + "9", + ":", + ";", + "<", + "=", + ">", + "?", + "@", + "A", + "B", + "C", + "D", + "E", + "F", + "G", + "H", + "I", + "J", + "K", + "L", + "M", + "N", + "O", + "P", + "Q", + "R", + "S", + "T", + "U", + "V", + "W", + "X", + "Y", + "Z", + "[", + "\\", + "]", + "^", + "_", + "`", + "a", + "b", + "c", + "d", + "e", + "f", + "g", + "h", + "i", + "j", + "k", + "l", + "m", + "n", + "o", + "p", + "q", + "r", + "s", + "t", + "u", + "v", + "w", + "x", + "y", + "z", + "{", + "|", + "}", + "~", + "!", + '"', + "#", + "$", + "%", + "&", + "'", + "(", + ")", + "*", + "+", + ",", + "-", + ".", + "/", + " ", + " ", + ] + self.char_map = {i: char for i, char in enumerate(self.characters)} + + def __call__( + self, outputs: List[np.ndarray] + ) -> Tuple[List[str], List[List[float]]]: + """ + Decode a batch of model outputs into character sequences and their confidence scores. + + The method takes the output probability distributions for each time step and uses + the best path decoding strategy. It then merges repeating characters and ignores + blank tokens. Confidence scores for each decoded character are also calculated. + + Args: + outputs (List[np.ndarray]): A list of model outputs, where each element is + a probability distribution for each time step. + + Returns: + Tuple[List[str], List[List[float]]]: A tuple of decoded character sequences + and confidence scores for each sequence. + """ + results = [] + confidences = [] + for output in outputs: + seq_log_probs = np.log(output + 1e-8) + best_path = np.argmax(seq_log_probs, axis=1) + + merged_path = [] + merged_probs = [] + for t, char_index in enumerate(best_path): + if char_index != 0 and (t == 0 or char_index != best_path[t - 1]): + merged_path.append(char_index) + merged_probs.append(seq_log_probs[t, char_index]) + + result = "".join(self.char_map[idx] for idx in merged_path) + results.append(result) + + confidence = np.exp(merged_probs).tolist() + confidences.append(confidence) + + return results, confidences + + +def main(): + parser = ArgumentParser() + parser.add_argument("filepath", type=str, help="image file path") + args = parser.parse_args() + + lpr_config = { + "detection_model_path": "/workspace/frigate/frigate/embeddings/weights/detection.onnx", + "classification_model_path": "/workspace/frigate/frigate/embeddings/weights/classification.onnx", + "recognition_model_path": "/workspace/frigate/frigate/embeddings/weights/recognition.onnx", + "mask_thresh": 0.8, + "box_thresh": 0.8, + "min_size": 3, + "classification_threshold": 0.98, + } + + # Initialize LPR + license_plate_recognition = LicensePlateRecognition(lpr_config, {}) + + # Read and process image + frame = cv2.imread(args.filepath) + if frame is None: + print(f"Error: Could not read image file: {args.filepath}") + return + + cv2.cvtColor(frame, cv2.COLOR_BGR2RGB, frame) + + # Process the license plate + license_plates, confidences, areas = ( + license_plate_recognition.process_license_plate(frame) + ) + + # Print debug information to ensure data structure + print(f"License plates: {license_plates}") + print(f"Confidences: {confidences}") + print(f"Areas: {areas}") + + if license_plates: + for plate, confidence, area in zip(license_plates, confidences, areas): + print( + f"Detected license plate: {plate} (average confidence: {confidence:.2f}, area: {area} pixels)" + ) + else: + print("No license plate detected") + + +if __name__ == "__main__": + main() diff --git a/frigate/embeddings/functions/onnx.py b/frigate/embeddings/functions/onnx.py index 9fc71d502..b0cae62e2 100644 --- a/frigate/embeddings/functions/onnx.py +++ b/frigate/embeddings/functions/onnx.py @@ -38,6 +38,9 @@ class ModelTypeEnum(str, Enum): face = "face" vision = "vision" text = "text" + alpr_detect = "alpr_detect" + alpr_classify = "alpr_classify" + alpr_recognize = "alpr_recognize" class GenericONNXEmbedding: @@ -139,6 +142,12 @@ class GenericONNXEmbedding: self.feature_extractor = self._load_feature_extractor() elif self.model_type == ModelTypeEnum.face: self.feature_extractor = [] + elif self.model_type == ModelTypeEnum.alpr_detect: + self.feature_extractor = [] + elif self.model_type == ModelTypeEnum.alpr_classify: + self.feature_extractor = [] + elif self.model_type == ModelTypeEnum.alpr_recognize: + self.feature_extractor = [] self.runner = ONNXModelRunner( os.path.join(self.download_path, self.model_file), @@ -214,9 +223,26 @@ class GenericONNXEmbedding: frame = np.expand_dims(frame, axis=0) return [{"image_input": frame}] + elif self.model_type == ModelTypeEnum.alpr_detect: + preprocessed = [] + for x in raw_inputs: + preprocessed.append(x) + return [{"x": preprocessed[0]}] + elif self.model_type == ModelTypeEnum.alpr_classify: + processed = [] + for img in raw_inputs: + processed.append({"x": img}) + return processed + elif self.model_type == ModelTypeEnum.alpr_recognize: + processed = [] + for img in raw_inputs: + processed.append({"x": img}) + return processed else: raise ValueError(f"Unable to preprocess inputs for {self.model_type}") + # for face recognition + def _process_image(self, image, output: str = "RGB") -> Image.Image: if isinstance(image, str): if image.startswith("http"):