mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-15 15:45:27 +03:00
initial foundation for alpr with paddleocr
This commit is contained in:
parent
c3eae3ea2b
commit
0de4fe5774
@ -2,6 +2,7 @@ aarch
|
||||
absdiff
|
||||
airockchip
|
||||
Alloc
|
||||
alpr
|
||||
Amcrest
|
||||
amdgpu
|
||||
analyzeduration
|
||||
@ -186,6 +187,7 @@ openai
|
||||
opencv
|
||||
openvino
|
||||
OWASP
|
||||
paddleocr
|
||||
paho
|
||||
passwordless
|
||||
popleft
|
||||
@ -305,4 +307,4 @@ yolo
|
||||
yolonas
|
||||
yolox
|
||||
zeep
|
||||
zerolatency
|
||||
zerolatency
|
||||
@ -45,3 +45,6 @@ openai == 1.51.*
|
||||
# push notifications
|
||||
py-vapid == 1.9.*
|
||||
pywebpush == 2.0.*
|
||||
# alpr
|
||||
pyclipper == 1.3.*
|
||||
shapely == 2.0.*
|
||||
|
||||
858
frigate/embeddings/alpr/alpr.py
Normal file
858
frigate/embeddings/alpr/alpr.py
Normal file
@ -0,0 +1,858 @@
|
||||
import math
|
||||
from argparse import ArgumentParser
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
|
||||
from shapely.geometry import Polygon
|
||||
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.embeddings.functions.onnx import GenericONNXEmbedding, ModelTypeEnum
|
||||
|
||||
|
||||
class LicensePlateRecognition:
|
||||
def __init__(self, config: Dict[str, Any], requestor: InterProcessRequestor):
|
||||
self.config = config
|
||||
self.requestor = requestor
|
||||
self.detection_model = self._create_detection_model()
|
||||
self.classification_model = self._create_classification_model()
|
||||
self.recognition_model = self._create_recognition_model()
|
||||
self.ctc_decoder = CTCDecoder()
|
||||
|
||||
self.batch_size = 6
|
||||
|
||||
# Detection specific parameters
|
||||
self.min_size = 3
|
||||
self.max_size = 960
|
||||
self.box_thresh = 0.8
|
||||
self.mask_thresh = 0.8
|
||||
self.mean = np.array([123.675, 116.28, 103.53]).reshape(1, -1).astype("float64")
|
||||
self.std = 1 / np.array([58.395, 57.12, 57.375]).reshape(1, -1).astype(
|
||||
"float64"
|
||||
)
|
||||
|
||||
def _create_detection_model(self) -> GenericONNXEmbedding:
|
||||
return GenericONNXEmbedding(
|
||||
model_name="paddleocr-onnx",
|
||||
model_file="detection.onnx",
|
||||
download_urls={
|
||||
"detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx"
|
||||
},
|
||||
model_size="large",
|
||||
model_type=ModelTypeEnum.alpr_detect,
|
||||
requestor=self.requestor,
|
||||
device="CPU",
|
||||
)
|
||||
|
||||
def _create_classification_model(self) -> GenericONNXEmbedding:
|
||||
return GenericONNXEmbedding(
|
||||
model_name="paddleocr-onnx",
|
||||
model_file="classification.onnx",
|
||||
download_urls={
|
||||
"classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx"
|
||||
},
|
||||
model_size="large",
|
||||
model_type=ModelTypeEnum.alpr_classify,
|
||||
requestor=self.requestor,
|
||||
device="CPU",
|
||||
)
|
||||
|
||||
def _create_recognition_model(self) -> GenericONNXEmbedding:
|
||||
return GenericONNXEmbedding(
|
||||
model_name="paddleocr-onnx",
|
||||
model_file="recognition.onnx",
|
||||
download_urls={
|
||||
"recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx"
|
||||
},
|
||||
model_size="large",
|
||||
model_type=ModelTypeEnum.alpr_recognize,
|
||||
requestor=self.requestor,
|
||||
device="CPU",
|
||||
)
|
||||
|
||||
def detect(self, image: np.ndarray) -> List[np.ndarray]:
|
||||
"""
|
||||
Detect possible license plates in the input image by first resizing and normalizing it,
|
||||
running a detection model, and filtering out low-probability regions.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image in which license plates will be detected.
|
||||
|
||||
Returns:
|
||||
List[np.ndarray]: A list of bounding box coordinates representing detected license plates.
|
||||
"""
|
||||
h, w = image.shape[:2]
|
||||
|
||||
if sum([h, w]) < 64:
|
||||
image = self.zero_pad(image)
|
||||
|
||||
resized_image = self.resize_image(image)
|
||||
normalized_image = self.normalize_image(resized_image)
|
||||
|
||||
outputs = self.detection_model([normalized_image])[0]
|
||||
outputs = outputs[0, :, :]
|
||||
|
||||
boxes, _ = self.boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
|
||||
return self.filter_polygon(boxes, (h, w))
|
||||
|
||||
def classify(
|
||||
self, images: List[np.ndarray]
|
||||
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
|
||||
"""
|
||||
Classify the orientation or category of each detected license plate.
|
||||
|
||||
Args:
|
||||
images (List[np.ndarray]): A list of images of detected license plates.
|
||||
|
||||
Returns:
|
||||
Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of rotated/normalized plate images
|
||||
and classification results with confidence scores.
|
||||
"""
|
||||
num_images = len(images)
|
||||
indices = np.argsort([x.shape[1] / x.shape[0] for x in images])
|
||||
|
||||
for i in range(0, num_images, self.batch_size):
|
||||
norm_images = []
|
||||
for j in range(i, min(num_images, i + self.batch_size)):
|
||||
norm_img = self._preprocess_classification_image(images[indices[j]])
|
||||
norm_img = norm_img[np.newaxis, :]
|
||||
norm_images.append(norm_img)
|
||||
|
||||
outputs = self.classification_model(norm_images)
|
||||
|
||||
return self._process_classification_output(images, outputs)
|
||||
|
||||
def recognize(
|
||||
self, images: List[np.ndarray]
|
||||
) -> Tuple[List[str], List[List[float]]]:
|
||||
"""
|
||||
Recognize the characters on the detected license plates using the recognition model.
|
||||
|
||||
Args:
|
||||
images (List[np.ndarray]): A list of images of license plates to recognize.
|
||||
|
||||
Returns:
|
||||
Tuple[List[str], List[List[float]]]: A tuple of recognized license plate texts and confidence scores.
|
||||
"""
|
||||
input_shape = [3, 48, 320]
|
||||
num_images = len(images)
|
||||
|
||||
# sort images by aspect ratio for processing
|
||||
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
|
||||
|
||||
for index in range(0, num_images, self.batch_size):
|
||||
input_h, input_w = input_shape[1], input_shape[2]
|
||||
max_wh_ratio = input_w / input_h
|
||||
norm_images = []
|
||||
|
||||
# calculate the maximum aspect ratio in the current batch
|
||||
for i in range(index, min(num_images, index + self.batch_size)):
|
||||
h, w = images[indices[i]].shape[0:2]
|
||||
max_wh_ratio = max(max_wh_ratio, w * 1.0 / h)
|
||||
|
||||
# preprocess the images based on the max aspect ratio
|
||||
for i in range(index, min(num_images, index + self.batch_size)):
|
||||
norm_image = self._preprocess_recognition_image(
|
||||
images[indices[i]], max_wh_ratio
|
||||
)
|
||||
norm_image = norm_image[np.newaxis, :]
|
||||
norm_images.append(norm_image)
|
||||
|
||||
outputs = self.recognition_model(norm_images)
|
||||
return self.ctc_decoder(outputs)
|
||||
|
||||
def process_license_plate(
|
||||
self, image: np.ndarray
|
||||
) -> Tuple[List[str], List[float], List[int]]:
|
||||
"""
|
||||
Complete pipeline for detecting, classifying, and recognizing license plates in the input image.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image in which to detect, classify, and recognize license plates.
|
||||
|
||||
Returns:
|
||||
Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates.
|
||||
"""
|
||||
plate_points = self.detect(image)
|
||||
if len(plate_points) == 0:
|
||||
return [], [], []
|
||||
|
||||
plate_points = self.sort_polygon(list(plate_points))
|
||||
plate_images = [self._crop_license_plate(image, x) for x in plate_points]
|
||||
rotated_images, _ = self.classify(plate_images)
|
||||
|
||||
# keep track of the index of each image for correct area calc later
|
||||
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images])
|
||||
reverse_mapping = {
|
||||
idx: original_idx for original_idx, idx in enumerate(sorted_indices)
|
||||
}
|
||||
|
||||
results, confidences = self.recognize(rotated_images)
|
||||
|
||||
if results:
|
||||
license_plates = [""] * len(rotated_images)
|
||||
average_confidences = [0.0] * len(rotated_images)
|
||||
areas = [0] * len(rotated_images)
|
||||
|
||||
# map results back to original image order
|
||||
for i, (plate, conf) in enumerate(zip(results, confidences)):
|
||||
original_idx = reverse_mapping[i]
|
||||
|
||||
height, width = rotated_images[original_idx].shape[:2]
|
||||
area = height * width
|
||||
|
||||
average_confidence = sum(conf) / len(conf) if conf else 0
|
||||
|
||||
# TODO: remove
|
||||
if False:
|
||||
save_image = cv2.cvtColor(
|
||||
rotated_images[original_idx], cv2.COLOR_RGB2BGR
|
||||
)
|
||||
filename = f"/config/plate_{original_idx}_{plate}_{area}.jpg"
|
||||
cv2.imwrite(filename, save_image)
|
||||
|
||||
license_plates[original_idx] = plate
|
||||
average_confidences[original_idx] = average_confidence
|
||||
areas[original_idx] = area
|
||||
|
||||
return license_plates, average_confidences, areas
|
||||
|
||||
return [], [], []
|
||||
|
||||
def resize_image(self, image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image to resize.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The resized image.
|
||||
"""
|
||||
h, w = image.shape[:2]
|
||||
ratio = min(self.max_size / max(h, w), 1.0)
|
||||
resize_h = max(int(round(int(h * ratio) / 32) * 32), 32)
|
||||
resize_w = max(int(round(int(w * ratio) / 32) * 32), 32)
|
||||
return cv2.resize(image, (resize_w, resize_h))
|
||||
|
||||
def normalize_image(self, image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Normalize the input image by subtracting the mean and multiplying by the standard deviation.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): The input image to normalize.
|
||||
|
||||
Returns:
|
||||
np.ndarray: The normalized image, transposed to match the model's expected input format.
|
||||
"""
|
||||
image = image.astype("float32")
|
||||
cv2.subtract(image, self.mean, image)
|
||||
cv2.multiply(image, self.std, image)
|
||||
return image.transpose((2, 0, 1))[np.newaxis, ...]
|
||||
|
||||
def boxes_from_bitmap(
|
||||
self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int
|
||||
) -> Tuple[np.ndarray, List[float]]:
|
||||
"""
|
||||
Process the binary mask to extract bounding boxes and associated confidence scores.
|
||||
|
||||
Args:
|
||||
output (np.ndarray): Output confidence map from the model.
|
||||
mask (np.ndarray): Binary mask of detected regions.
|
||||
dest_width (int): Target width for scaling the box coordinates.
|
||||
dest_height (int): Target height for scaling the box coordinates.
|
||||
|
||||
Returns:
|
||||
Tuple[np.ndarray, List[float]]: Array of bounding boxes and list of corresponding scores.
|
||||
"""
|
||||
|
||||
mask = (mask * 255).astype(np.uint8)
|
||||
height, width = mask.shape
|
||||
outs = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
# handle different return values of findContours between OpenCV versions
|
||||
contours = outs[0] if len(outs) == 2 else outs[1]
|
||||
|
||||
boxes = []
|
||||
scores = []
|
||||
|
||||
for index in range(len(contours)):
|
||||
contour = contours[index]
|
||||
|
||||
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
|
||||
points, min_side = self.get_min_boxes(contour)
|
||||
|
||||
if min_side < self.min_size:
|
||||
continue
|
||||
|
||||
points = np.array(points)
|
||||
|
||||
score = self.box_score(output, contour)
|
||||
if self.box_thresh > score:
|
||||
continue
|
||||
|
||||
polygon = Polygon(points)
|
||||
distance = polygon.area / polygon.length
|
||||
|
||||
# Use pyclipper to shrink the polygon slightly based on the computed distance.
|
||||
offset = PyclipperOffset()
|
||||
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
|
||||
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
|
||||
|
||||
# get the minimum bounding box around the shrunken polygon.
|
||||
box, min_side = self.get_min_boxes(points)
|
||||
|
||||
if min_side < self.min_size + 2:
|
||||
continue
|
||||
|
||||
box = np.array(box)
|
||||
|
||||
# normalize and clip box coordinates to fit within the destination image size.
|
||||
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width)
|
||||
box[:, 1] = np.clip(
|
||||
np.round(box[:, 1] / height * dest_height), 0, dest_height
|
||||
)
|
||||
|
||||
boxes.append(box.astype("int32"))
|
||||
scores.append(score)
|
||||
|
||||
return np.array(boxes, dtype="int32"), scores
|
||||
|
||||
@staticmethod
|
||||
def get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]:
|
||||
"""
|
||||
Calculate the minimum bounding box (rotated rectangle) for a given contour.
|
||||
|
||||
Args:
|
||||
contour (np.ndarray): The contour points of the detected shape.
|
||||
|
||||
Returns:
|
||||
Tuple[List[Tuple[float, float]], float]: A list of four points representing the
|
||||
corners of the bounding box, and the length of the shortest side.
|
||||
"""
|
||||
bounding_box = cv2.minAreaRect(contour)
|
||||
points = sorted(cv2.boxPoints(bounding_box), key=lambda x: x[0])
|
||||
index_1, index_4 = (0, 1) if points[1][1] > points[0][1] else (1, 0)
|
||||
index_2, index_3 = (2, 3) if points[3][1] > points[2][1] else (3, 2)
|
||||
box = [points[index_1], points[index_2], points[index_3], points[index_4]]
|
||||
return box, min(bounding_box[1])
|
||||
|
||||
@staticmethod
|
||||
def box_score(bitmap: np.ndarray, contour: np.ndarray) -> float:
|
||||
"""
|
||||
Calculate the average score within the bounding box of a contour.
|
||||
|
||||
Args:
|
||||
bitmap (np.ndarray): The output confidence map from the model.
|
||||
contour (np.ndarray): The contour of the detected shape.
|
||||
|
||||
Returns:
|
||||
float: The average score of the pixels inside the contour region.
|
||||
"""
|
||||
h, w = bitmap.shape[:2]
|
||||
contour = contour.reshape(-1, 2)
|
||||
x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1])
|
||||
x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1])
|
||||
mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8)
|
||||
cv2.fillPoly(mask, [contour - [x1, y1]], 1)
|
||||
return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0]
|
||||
|
||||
@staticmethod
|
||||
def expand_box(points: List[Tuple[float, float]]) -> np.ndarray:
|
||||
"""
|
||||
Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio.
|
||||
|
||||
Args:
|
||||
points (List[Tuple[float, float]]): Points of the polygon to expand.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Expanded polygon points.
|
||||
"""
|
||||
polygon = Polygon(points)
|
||||
distance = polygon.area / polygon.length
|
||||
offset = PyclipperOffset()
|
||||
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
|
||||
expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2))
|
||||
return expanded
|
||||
|
||||
def filter_polygon(
|
||||
self, points: List[np.ndarray], shape: Tuple[int, int]
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Filter a set of polygons to include only valid ones that fit within an image shape
|
||||
and meet size constraints.
|
||||
|
||||
Args:
|
||||
points (List[np.ndarray]): List of polygons to filter.
|
||||
shape (Tuple[int, int]): Shape of the image (height, width).
|
||||
|
||||
Returns:
|
||||
np.ndarray: List of filtered polygons.
|
||||
"""
|
||||
height, width = shape
|
||||
return np.array(
|
||||
[
|
||||
self.clockwise_order(point)
|
||||
for point in points
|
||||
if self.is_valid_polygon(point, width, height)
|
||||
]
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool:
|
||||
"""
|
||||
Check if a polygon is valid, meaning it fits within the image bounds
|
||||
and has sides of a minimum length.
|
||||
|
||||
Args:
|
||||
point (np.ndarray): The polygon to validate.
|
||||
width (int): Image width.
|
||||
height (int): Image height.
|
||||
|
||||
Returns:
|
||||
bool: Whether the polygon is valid or not.
|
||||
"""
|
||||
return (
|
||||
point[:, 0].min() >= 0
|
||||
and point[:, 0].max() < width
|
||||
and point[:, 1].min() >= 0
|
||||
and point[:, 1].max() < height
|
||||
and np.linalg.norm(point[0] - point[1]) > 3
|
||||
and np.linalg.norm(point[0] - point[3]) > 3
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def clockwise_order(point: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Arrange the points of a polygon in clockwise order based on their angular positions
|
||||
around the polygon's center.
|
||||
|
||||
Args:
|
||||
point (np.ndarray): Array of points of the polygon.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Points ordered in clockwise direction.
|
||||
"""
|
||||
center = point.mean(axis=0)
|
||||
return point[
|
||||
np.argsort(np.arctan2(point[:, 1] - center[1], point[:, 0] - center[0]))
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def sort_polygon(points):
|
||||
"""
|
||||
Sort polygons based on their position in the image. If polygons are close in vertical
|
||||
position (within 10 pixels), sort them by horizontal position.
|
||||
|
||||
Args:
|
||||
points: List of polygons to sort.
|
||||
|
||||
Returns:
|
||||
List: Sorted list of polygons.
|
||||
"""
|
||||
points.sort(key=lambda x: (x[0][1], x[0][0]))
|
||||
for i in range(len(points) - 1):
|
||||
for j in range(i, -1, -1):
|
||||
if abs(points[j + 1][0][1] - points[j][0][1]) < 10 and (
|
||||
points[j + 1][0][0] < points[j][0][0]
|
||||
):
|
||||
temp = points[j]
|
||||
points[j] = points[j + 1]
|
||||
points[j + 1] = temp
|
||||
else:
|
||||
break
|
||||
return points
|
||||
|
||||
@staticmethod
|
||||
def zero_pad(image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Apply zero-padding to an image, ensuring its dimensions are at least 32x32.
|
||||
The padding is added only if needed.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): Input image.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Zero-padded image.
|
||||
"""
|
||||
h, w, c = image.shape
|
||||
pad = np.zeros((max(32, h), max(32, w), c), np.uint8)
|
||||
pad[:h, :w, :] = image
|
||||
return pad
|
||||
|
||||
@staticmethod
|
||||
def _preprocess_classification_image(image: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Preprocess a single image for classification by resizing, normalizing, and padding.
|
||||
|
||||
This method resizes the input image to a fixed height of 48 pixels while adjusting
|
||||
the width dynamically up to a maximum of 192 pixels. The image is then normalized and
|
||||
padded to fit the required input dimensions for classification.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): Input image to preprocess.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Preprocessed and padded image.
|
||||
"""
|
||||
# fixed height of 48, dynamic width up to 192
|
||||
input_shape = (3, 48, 192)
|
||||
input_c, input_h, input_w = input_shape
|
||||
|
||||
h, w = image.shape[:2]
|
||||
ratio = w / h
|
||||
resized_w = min(input_w, math.ceil(input_h * ratio))
|
||||
|
||||
resized_image = cv2.resize(image, (resized_w, input_h))
|
||||
|
||||
# handle single-channel images (grayscale) if needed
|
||||
if input_c == 1 and resized_image.ndim == 2:
|
||||
resized_image = resized_image[np.newaxis, :, :]
|
||||
else:
|
||||
resized_image = resized_image.transpose((2, 0, 1))
|
||||
|
||||
# normalize
|
||||
resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5
|
||||
|
||||
padded_image = np.zeros((input_c, input_h, input_w), dtype=np.float32)
|
||||
padded_image[:, :, :resized_w] = resized_image
|
||||
|
||||
return padded_image
|
||||
|
||||
def _process_classification_output(
|
||||
self, images: List[np.ndarray], outputs: List[np.ndarray]
|
||||
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
|
||||
"""
|
||||
Process the classification model output by matching labels with confidence scores.
|
||||
|
||||
This method processes the outputs from the classification model and rotates images
|
||||
with high confidence of being labeled "180". It ensures that results are mapped to
|
||||
the original image order.
|
||||
|
||||
Args:
|
||||
images (List[np.ndarray]): List of input images.
|
||||
outputs (List[np.ndarray]): Corresponding model outputs.
|
||||
|
||||
Returns:
|
||||
Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of processed images and
|
||||
classification results (label and confidence score).
|
||||
"""
|
||||
labels = ["0", "180"]
|
||||
results = [["", 0.0]] * len(images)
|
||||
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
|
||||
|
||||
outputs = np.stack(outputs)
|
||||
|
||||
outputs = [
|
||||
(labels[idx], outputs[i, idx])
|
||||
for i, idx in enumerate(outputs.argmax(axis=1))
|
||||
]
|
||||
|
||||
for i in range(0, len(images), self.batch_size):
|
||||
for j in range(len(outputs)):
|
||||
label, score = outputs[j]
|
||||
results[indices[i + j]] = [label, score]
|
||||
if "180" in label and score > self.config.get(
|
||||
"classification_threshold", 0.98
|
||||
):
|
||||
images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1)
|
||||
|
||||
return images, results
|
||||
|
||||
def _preprocess_recognition_image(
|
||||
self, image: np.ndarray, max_wh_ratio: float
|
||||
) -> np.ndarray:
|
||||
"""
|
||||
Preprocess an image for recognition by dynamically adjusting its width.
|
||||
|
||||
This method adjusts the width of the image based on the maximum width-to-height ratio
|
||||
while keeping the height fixed at 48 pixels. The image is then normalized and padded
|
||||
to fit the required input dimensions for recognition.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): Input image to preprocess.
|
||||
max_wh_ratio (float): Maximum width-to-height ratio for resizing.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Preprocessed and padded image.
|
||||
"""
|
||||
# fixed height of 48, dynamic width based on ratio
|
||||
input_shape = [3, 48, 320]
|
||||
input_h, input_w = input_shape[1], input_shape[2]
|
||||
|
||||
assert image.shape[2] == input_shape[0], "Unexpected number of image channels."
|
||||
|
||||
# dynamically adjust input width based on max_wh_ratio
|
||||
input_w = int(input_h * max_wh_ratio)
|
||||
|
||||
# check for model-specific input width
|
||||
model_input_w = self.recognition_model.runner.ort.get_inputs()[0].shape[3]
|
||||
if isinstance(model_input_w, int) and model_input_w > 0:
|
||||
input_w = model_input_w
|
||||
|
||||
h, w = image.shape[:2]
|
||||
aspect_ratio = w / h
|
||||
resized_w = min(input_w, math.ceil(input_h * aspect_ratio))
|
||||
|
||||
resized_image = cv2.resize(image, (resized_w, input_h))
|
||||
resized_image = resized_image.transpose((2, 0, 1))
|
||||
resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5
|
||||
|
||||
padded_image = np.zeros((input_shape[0], input_h, input_w), dtype=np.float32)
|
||||
padded_image[:, :, :resized_w] = resized_image
|
||||
|
||||
return padded_image
|
||||
|
||||
@staticmethod
|
||||
def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Crop the license plate from the image using four corner points.
|
||||
|
||||
This method crops the region containing the license plate by using the perspective
|
||||
transformation based on four corner points. If the resulting image is significantly
|
||||
taller than wide, the image is rotated to the correct orientation.
|
||||
|
||||
Args:
|
||||
image (np.ndarray): Input image containing the license plate.
|
||||
points (np.ndarray): Four corner points defining the plate's position.
|
||||
|
||||
Returns:
|
||||
np.ndarray: Cropped and potentially rotated license plate image.
|
||||
"""
|
||||
assert len(points) == 4, "shape of points must be 4*2"
|
||||
points = points.astype(np.float32)
|
||||
crop_width = int(
|
||||
max(
|
||||
np.linalg.norm(points[0] - points[1]),
|
||||
np.linalg.norm(points[2] - points[3]),
|
||||
)
|
||||
)
|
||||
crop_height = int(
|
||||
max(
|
||||
np.linalg.norm(points[0] - points[3]),
|
||||
np.linalg.norm(points[1] - points[2]),
|
||||
)
|
||||
)
|
||||
pts_std = np.float32(
|
||||
[[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]]
|
||||
)
|
||||
matrix = cv2.getPerspectiveTransform(points, pts_std)
|
||||
image = cv2.warpPerspective(
|
||||
image,
|
||||
matrix,
|
||||
(crop_width, crop_height),
|
||||
borderMode=cv2.BORDER_REPLICATE,
|
||||
flags=cv2.INTER_CUBIC,
|
||||
)
|
||||
height, width = image.shape[0:2]
|
||||
if height * 1.0 / width >= 1.5:
|
||||
image = np.rot90(image, k=3)
|
||||
return image
|
||||
|
||||
|
||||
class CTCDecoder:
|
||||
"""
|
||||
A decoder for interpreting the output of a CTC (Connectionist Temporal Classification) model.
|
||||
|
||||
This decoder converts the model's output probabilities into readable sequences of characters
|
||||
while removing duplicates and handling blank tokens. It also calculates the confidence scores
|
||||
for each decoded character sequence.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize the CTCDecoder with a list of characters and a character map.
|
||||
|
||||
The character set includes digits, letters, special characters, and a "blank" token
|
||||
(used by the CTC model for decoding purposes). A character map is created to map
|
||||
indices to characters.
|
||||
"""
|
||||
self.characters = [
|
||||
"blank",
|
||||
"0",
|
||||
"1",
|
||||
"2",
|
||||
"3",
|
||||
"4",
|
||||
"5",
|
||||
"6",
|
||||
"7",
|
||||
"8",
|
||||
"9",
|
||||
":",
|
||||
";",
|
||||
"<",
|
||||
"=",
|
||||
">",
|
||||
"?",
|
||||
"@",
|
||||
"A",
|
||||
"B",
|
||||
"C",
|
||||
"D",
|
||||
"E",
|
||||
"F",
|
||||
"G",
|
||||
"H",
|
||||
"I",
|
||||
"J",
|
||||
"K",
|
||||
"L",
|
||||
"M",
|
||||
"N",
|
||||
"O",
|
||||
"P",
|
||||
"Q",
|
||||
"R",
|
||||
"S",
|
||||
"T",
|
||||
"U",
|
||||
"V",
|
||||
"W",
|
||||
"X",
|
||||
"Y",
|
||||
"Z",
|
||||
"[",
|
||||
"\\",
|
||||
"]",
|
||||
"^",
|
||||
"_",
|
||||
"`",
|
||||
"a",
|
||||
"b",
|
||||
"c",
|
||||
"d",
|
||||
"e",
|
||||
"f",
|
||||
"g",
|
||||
"h",
|
||||
"i",
|
||||
"j",
|
||||
"k",
|
||||
"l",
|
||||
"m",
|
||||
"n",
|
||||
"o",
|
||||
"p",
|
||||
"q",
|
||||
"r",
|
||||
"s",
|
||||
"t",
|
||||
"u",
|
||||
"v",
|
||||
"w",
|
||||
"x",
|
||||
"y",
|
||||
"z",
|
||||
"{",
|
||||
"|",
|
||||
"}",
|
||||
"~",
|
||||
"!",
|
||||
'"',
|
||||
"#",
|
||||
"$",
|
||||
"%",
|
||||
"&",
|
||||
"'",
|
||||
"(",
|
||||
")",
|
||||
"*",
|
||||
"+",
|
||||
",",
|
||||
"-",
|
||||
".",
|
||||
"/",
|
||||
" ",
|
||||
" ",
|
||||
]
|
||||
self.char_map = {i: char for i, char in enumerate(self.characters)}
|
||||
|
||||
def __call__(
|
||||
self, outputs: List[np.ndarray]
|
||||
) -> Tuple[List[str], List[List[float]]]:
|
||||
"""
|
||||
Decode a batch of model outputs into character sequences and their confidence scores.
|
||||
|
||||
The method takes the output probability distributions for each time step and uses
|
||||
the best path decoding strategy. It then merges repeating characters and ignores
|
||||
blank tokens. Confidence scores for each decoded character are also calculated.
|
||||
|
||||
Args:
|
||||
outputs (List[np.ndarray]): A list of model outputs, where each element is
|
||||
a probability distribution for each time step.
|
||||
|
||||
Returns:
|
||||
Tuple[List[str], List[List[float]]]: A tuple of decoded character sequences
|
||||
and confidence scores for each sequence.
|
||||
"""
|
||||
results = []
|
||||
confidences = []
|
||||
for output in outputs:
|
||||
seq_log_probs = np.log(output + 1e-8)
|
||||
best_path = np.argmax(seq_log_probs, axis=1)
|
||||
|
||||
merged_path = []
|
||||
merged_probs = []
|
||||
for t, char_index in enumerate(best_path):
|
||||
if char_index != 0 and (t == 0 or char_index != best_path[t - 1]):
|
||||
merged_path.append(char_index)
|
||||
merged_probs.append(seq_log_probs[t, char_index])
|
||||
|
||||
result = "".join(self.char_map[idx] for idx in merged_path)
|
||||
results.append(result)
|
||||
|
||||
confidence = np.exp(merged_probs).tolist()
|
||||
confidences.append(confidence)
|
||||
|
||||
return results, confidences
|
||||
|
||||
|
||||
def main():
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("filepath", type=str, help="image file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
lpr_config = {
|
||||
"detection_model_path": "/workspace/frigate/frigate/embeddings/weights/detection.onnx",
|
||||
"classification_model_path": "/workspace/frigate/frigate/embeddings/weights/classification.onnx",
|
||||
"recognition_model_path": "/workspace/frigate/frigate/embeddings/weights/recognition.onnx",
|
||||
"mask_thresh": 0.8,
|
||||
"box_thresh": 0.8,
|
||||
"min_size": 3,
|
||||
"classification_threshold": 0.98,
|
||||
}
|
||||
|
||||
# Initialize LPR
|
||||
license_plate_recognition = LicensePlateRecognition(lpr_config, {})
|
||||
|
||||
# Read and process image
|
||||
frame = cv2.imread(args.filepath)
|
||||
if frame is None:
|
||||
print(f"Error: Could not read image file: {args.filepath}")
|
||||
return
|
||||
|
||||
cv2.cvtColor(frame, cv2.COLOR_BGR2RGB, frame)
|
||||
|
||||
# Process the license plate
|
||||
license_plates, confidences, areas = (
|
||||
license_plate_recognition.process_license_plate(frame)
|
||||
)
|
||||
|
||||
# Print debug information to ensure data structure
|
||||
print(f"License plates: {license_plates}")
|
||||
print(f"Confidences: {confidences}")
|
||||
print(f"Areas: {areas}")
|
||||
|
||||
if license_plates:
|
||||
for plate, confidence, area in zip(license_plates, confidences, areas):
|
||||
print(
|
||||
f"Detected license plate: {plate} (average confidence: {confidence:.2f}, area: {area} pixels)"
|
||||
)
|
||||
else:
|
||||
print("No license plate detected")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -38,6 +38,9 @@ class ModelTypeEnum(str, Enum):
|
||||
face = "face"
|
||||
vision = "vision"
|
||||
text = "text"
|
||||
alpr_detect = "alpr_detect"
|
||||
alpr_classify = "alpr_classify"
|
||||
alpr_recognize = "alpr_recognize"
|
||||
|
||||
|
||||
class GenericONNXEmbedding:
|
||||
@ -139,6 +142,12 @@ class GenericONNXEmbedding:
|
||||
self.feature_extractor = self._load_feature_extractor()
|
||||
elif self.model_type == ModelTypeEnum.face:
|
||||
self.feature_extractor = []
|
||||
elif self.model_type == ModelTypeEnum.alpr_detect:
|
||||
self.feature_extractor = []
|
||||
elif self.model_type == ModelTypeEnum.alpr_classify:
|
||||
self.feature_extractor = []
|
||||
elif self.model_type == ModelTypeEnum.alpr_recognize:
|
||||
self.feature_extractor = []
|
||||
|
||||
self.runner = ONNXModelRunner(
|
||||
os.path.join(self.download_path, self.model_file),
|
||||
@ -214,9 +223,26 @@ class GenericONNXEmbedding:
|
||||
|
||||
frame = np.expand_dims(frame, axis=0)
|
||||
return [{"image_input": frame}]
|
||||
elif self.model_type == ModelTypeEnum.alpr_detect:
|
||||
preprocessed = []
|
||||
for x in raw_inputs:
|
||||
preprocessed.append(x)
|
||||
return [{"x": preprocessed[0]}]
|
||||
elif self.model_type == ModelTypeEnum.alpr_classify:
|
||||
processed = []
|
||||
for img in raw_inputs:
|
||||
processed.append({"x": img})
|
||||
return processed
|
||||
elif self.model_type == ModelTypeEnum.alpr_recognize:
|
||||
processed = []
|
||||
for img in raw_inputs:
|
||||
processed.append({"x": img})
|
||||
return processed
|
||||
else:
|
||||
raise ValueError(f"Unable to preprocess inputs for {self.model_type}")
|
||||
|
||||
# for face recognition
|
||||
|
||||
def _process_image(self, image, output: str = "RGB") -> Image.Image:
|
||||
if isinstance(image, str):
|
||||
if image.startswith("http"):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user