use yolov9 for users without frigate+ and update retention algorithm

This commit is contained in:
Josh Hawkins 2025-02-13 10:55:11 -06:00
parent 0dcfcb2cf7
commit 793d0fb129

View File

@ -24,6 +24,7 @@ from .api import RealTimeProcessorApi
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MIN_PLATE_LENGTH = 3 MIN_PLATE_LENGTH = 3
WRITE_DEBUG_IMAGES = False
class LicensePlateProcessor(RealTimeProcessorApi): class LicensePlateProcessor(RealTimeProcessorApi):
@ -86,12 +87,24 @@ class LicensePlateProcessor(RealTimeProcessorApi):
requestor=self.requestor, requestor=self.requestor,
device="CPU", device="CPU",
) )
self.yolov9_detection_model = GenericONNXEmbedding(
model_name="yolov9_license_plate",
model_file="yolov9-256-license-plates.onnx",
download_urls={
"yolov9-256-license-plates.onnx": "https://github.com/hawkeye217/yolov9-license-plates/raw/refs/heads/master/models/yolov9-256-license-plates.onnx"
},
model_size="large",
model_type=ModelTypeEnum.yolov9_lpr_detect,
requestor=self.requestor,
device="CPU",
)
if self.lpr_config.enabled: if self.lpr_config.enabled:
# all models need to be loaded to run LPR # all models need to be loaded to run LPR
self.detection_model._load_model_and_utils() self.detection_model._load_model_and_utils()
self.classification_model._load_model_and_utils() self.classification_model._load_model_and_utils()
self.recognition_model._load_model_and_utils() self.recognition_model._load_model_and_utils()
self.yolov9_detection_model._load_model_and_utils()
def _detect(self, image: np.ndarray) -> List[np.ndarray]: def _detect(self, image: np.ndarray) -> List[np.ndarray]:
""" """
@ -112,6 +125,13 @@ class LicensePlateProcessor(RealTimeProcessorApi):
resized_image = self._resize_image(image) resized_image = self._resize_image(image)
normalized_image = self._normalize_image(resized_image) normalized_image = self._normalize_image(resized_image)
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/license_plate_resized_{current_time}.jpg",
resized_image,
)
outputs = self.detection_model([normalized_image])[0] outputs = self.detection_model([normalized_image])[0]
outputs = outputs[0, :, :] outputs = outputs[0, :, :]
@ -207,12 +227,27 @@ class LicensePlateProcessor(RealTimeProcessorApi):
plate_points = self._detect(image) plate_points = self._detect(image)
if len(plate_points) == 0: if len(plate_points) == 0:
logger.debug("No points found by OCR detector model")
return [], [], [] return [], [], []
plate_points = self._sort_polygon(list(plate_points)) plate_points = self._sort_polygon(list(plate_points))
plate_images = [self._crop_license_plate(image, x) for x in plate_points] plate_images = [self._crop_license_plate(image, x) for x in plate_points]
rotated_images, _ = self._classify(plate_images) rotated_images, _ = self._classify(plate_images)
# debug rotated and classification result
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
for i, img in enumerate(plate_images):
cv2.imwrite(
f"debug/frames/license_plate_rotated_{current_time}_{i + 1}.jpg",
img,
)
for i, img in enumerate(rotated_images):
cv2.imwrite(
f"debug/frames/license_plate_classified_{current_time}_{i + 1}.jpg",
img,
)
# keep track of the index of each image for correct area calc later # keep track of the index of each image for correct area calc later
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images]) sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images])
reverse_mapping = { reverse_mapping = {
@ -331,6 +366,7 @@ class LicensePlateProcessor(RealTimeProcessorApi):
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length. # get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, min_side = self._get_min_boxes(contour) points, min_side = self._get_min_boxes(contour)
logger.debug(f"min side {index}, {min_side}")
if min_side < self.min_size: if min_side < self.min_size:
continue continue
@ -338,6 +374,7 @@ class LicensePlateProcessor(RealTimeProcessorApi):
points = np.array(points) points = np.array(points)
score = self._box_score(output, contour) score = self._box_score(output, contour)
logger.debug(f"box score {index}, {score}")
if self.box_thresh > score: if self.box_thresh > score:
continue continue
@ -492,7 +529,7 @@ class LicensePlateProcessor(RealTimeProcessorApi):
def _sort_polygon(points): def _sort_polygon(points):
""" """
Sort polygons based on their position in the image. If polygons are close in vertical Sort polygons based on their position in the image. If polygons are close in vertical
position (within 10 pixels), sort them by horizontal position. position (within 5 pixels), sort them by horizontal position.
Args: Args:
points: List of polygons to sort. points: List of polygons to sort.
@ -503,7 +540,7 @@ class LicensePlateProcessor(RealTimeProcessorApi):
points.sort(key=lambda x: (x[0][1], x[0][0])) points.sort(key=lambda x: (x[0][1], x[0][0]))
for i in range(len(points) - 1): for i in range(len(points) - 1):
for j in range(i, -1, -1): for j in range(i, -1, -1):
if abs(points[j + 1][0][1] - points[j][0][1]) < 10 and ( if abs(points[j + 1][0][1] - points[j][0][1]) < 5 and (
points[j + 1][0][0] < points[j][0][0] points[j + 1][0][0] < points[j][0][0]
): ):
temp = points[j] temp = points[j]
@ -602,7 +639,8 @@ class LicensePlateProcessor(RealTimeProcessorApi):
for j in range(len(outputs)): for j in range(len(outputs)):
label, score = outputs[j] label, score = outputs[j]
results[indices[i + j]] = [label, score] results[indices[i + j]] = [label, score]
if "180" in label and score >= self.lpr_config.threshold: # make sure we have high confidence if we need to flip a box, this will be rare in lpr
if "180" in label and score >= 0.9:
images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1) images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1)
return images, results return images, results
@ -701,10 +739,122 @@ class LicensePlateProcessor(RealTimeProcessorApi):
self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10 self.metrics.alpr_pps.value = (self.metrics.alpr_pps.value * 9 + duration) / 10
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height].""" """
# TODO: use a small model here to detect plates Use a lightweight YOLOv9 model to detect license plates for users without Frigate+
height, width = input.shape[:2]
return (0, 0, width, height) Return the dimensions of the detected plate as [x1, y1, x2, y2].
"""
predictions = self.yolov9_detection_model(input)
confidence_threshold = self.lpr_config.threshold
top_score = -1
top_box = None
# Loop over predictions
for prediction in predictions:
score = prediction[6]
if score >= confidence_threshold:
bbox = prediction[1:5]
# Scale boxes back to original image size
scale_x = input.shape[1] / 256
scale_y = input.shape[0] / 256
bbox[0] *= scale_x
bbox[1] *= scale_y
bbox[2] *= scale_x
bbox[3] *= scale_y
if score > top_score:
top_score = score
top_box = bbox
# Return the top scoring bounding box if found
if top_box is not None:
logger.debug("Found license plate: {}".format(top_box.astype(int)))
return tuple(top_box.astype(int))
else:
return None # No detection above the threshold
def _should_keep_previous_plate(
self, id, top_plate, top_char_confidences, top_area, avg_confidence
):
if id not in self.detected_license_plates:
return False
prev_data = self.detected_license_plates[id]
prev_plate = prev_data["plate"]
prev_char_confidences = prev_data["char_confidences"]
prev_area = prev_data["area"]
prev_avg_confidence = (
sum(prev_char_confidences) / len(prev_char_confidences)
if prev_char_confidences
else 0
)
# 1. Normalize metrics
# Length score - use relative comparison
# If lengths are equal, score is 0.5 for both
# If one is longer, it gets a higher score up to 1.0
max_length_diff = 4 # Maximum expected difference in plate lengths
length_diff = len(top_plate) - len(prev_plate)
curr_length_score = 0.5 + (
length_diff / (2 * max_length_diff)
) # Normalize to 0-1
curr_length_score = max(0, min(1, curr_length_score)) # Clamp to 0-1
prev_length_score = 1 - curr_length_score # Inverse relationship
# Area score (normalize based on max of current and previous)
max_area = max(top_area, prev_area)
curr_area_score = top_area / max_area
prev_area_score = prev_area / max_area
# Average confidence score (already normalized 0-1)
curr_conf_score = avg_confidence
prev_conf_score = prev_avg_confidence
# Character confidence comparison score
min_length = min(len(top_plate), len(prev_plate))
if min_length > 0:
curr_char_conf = sum(top_char_confidences[:min_length]) / min_length
prev_char_conf = sum(prev_char_confidences[:min_length]) / min_length
else:
curr_char_conf = 0
prev_char_conf = 0
# 2. Define weights
weights = {
"length": 0.4,
"area": 0.3,
"avg_confidence": 0.2,
"char_confidence": 0.1,
}
# 3. Calculate weighted scores
curr_score = (
curr_length_score * weights["length"]
+ curr_area_score * weights["area"]
+ curr_conf_score * weights["avg_confidence"]
+ curr_char_conf * weights["char_confidence"]
)
prev_score = (
prev_length_score * weights["length"]
+ prev_area_score * weights["area"]
+ prev_conf_score * weights["avg_confidence"]
+ prev_char_conf * weights["char_confidence"]
)
# 4. Log the comparison for debugging
logger.debug(
f"Plate comparison - Current plate: {top_plate} (score: {curr_score:.3f}) vs "
f"Previous plate: {prev_plate} (score: {prev_score:.3f})\n"
f"Metrics - Length: {len(top_plate)} vs {len(prev_plate)} (scores: {curr_length_score:.2f} vs {prev_length_score:.2f}), "
f"Area: {top_area} vs {prev_area}, "
f"Avg Conf: {avg_confidence:.2f} vs {prev_avg_confidence:.2f}"
)
# 5. Return True if we should keep the previous plate (i.e., if it scores higher)
return prev_score > curr_score
def process_frame(self, obj_data: dict[str, any], frame: np.ndarray): def process_frame(self, obj_data: dict[str, any], frame: np.ndarray):
"""Look for license plates in image.""" """Look for license plates in image."""
@ -739,19 +889,41 @@ class LicensePlateProcessor(RealTimeProcessorApi):
if not car_box: if not car_box:
return return
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
left, top, right, bottom = car_box left, top, right, bottom = car_box
car = rgb[top:bottom, left:right] car = rgb[top:bottom, left:right]
# double the size of the car for better box detection
car = cv2.resize(car, (int(2 * car.shape[1]), int(2 * car.shape[0])))
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/car_frame_{current_time}.jpg",
car,
)
license_plate = self._detect_license_plate(car) license_plate = self._detect_license_plate(car)
if not license_plate: if not license_plate:
logger.debug("Detected no license plates for car object.") logger.debug("Detected no license plates for car object.")
return return
# double the size of the license plate for better OCR
license_plate_area = max(
0,
(license_plate[2] - license_plate[0])
* (license_plate[3] - license_plate[1]),
)
# check that license plate is valid
if license_plate_area < self.config.lpr.min_area:
logger.debug("License plate is less than min_area")
return
license_plate_frame = car[ license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2] license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
] ]
license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR)
else: else:
# don't run for object without attributes # don't run for object without attributes
if not obj_data.get("current_attributes"): if not obj_data.get("current_attributes"):
@ -788,6 +960,22 @@ class LicensePlateProcessor(RealTimeProcessorApi):
license_plate_box[0] : license_plate_box[2], license_plate_box[0] : license_plate_box[2],
] ]
# double the size of the license plate frame for better OCR
license_plate_frame = cv2.resize(
license_plate_frame,
(
int(2 * license_plate_frame.shape[1]),
int(2 * license_plate_frame.shape[0]),
),
)
if WRITE_DEBUG_IMAGES:
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/license_plate_frame_{current_time}.jpg",
license_plate_frame,
)
# run detection, returns results sorted by confidence, best first # run detection, returns results sorted by confidence, best first
license_plates, confidences, areas = self._process_license_plate( license_plates, confidences, areas = self._process_license_plate(
license_plate_frame license_plate_frame
@ -824,38 +1012,11 @@ class LicensePlateProcessor(RealTimeProcessorApi):
# Check if we have a previously detected plate for this ID # Check if we have a previously detected plate for this ID
if id in self.detected_license_plates: if id in self.detected_license_plates:
prev_plate = self.detected_license_plates[id]["plate"] if self._should_keep_previous_plate(
prev_char_confidences = self.detected_license_plates[id]["char_confidences"] id, top_plate, top_char_confidences, top_area, avg_confidence
prev_area = self.detected_license_plates[id]["area"]
prev_avg_confidence = (
(sum(prev_char_confidences) / len(prev_char_confidences))
if prev_char_confidences
else 0
)
# Define conditions for keeping the previous plate
shorter_than_previous = len(top_plate) < len(prev_plate)
lower_avg_confidence = avg_confidence <= prev_avg_confidence
smaller_area = top_area < prev_area
# Compare character-by-character confidence where possible
min_length = min(len(top_plate), len(prev_plate))
char_confidence_comparison = sum(
1
for i in range(min_length)
if top_char_confidences[i] <= prev_char_confidences[i]
)
worse_char_confidences = char_confidence_comparison >= min_length / 2
if (shorter_than_previous or smaller_area) and (
lower_avg_confidence and worse_char_confidences
): ):
logger.debug( logger.debug("Keeping previous plate")
f"Keeping previous plate. New plate stats: " return
f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} "
f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}"
)
return True
# Check against minimum confidence threshold # Check against minimum confidence threshold
if avg_confidence < self.lpr_config.threshold: if avg_confidence < self.lpr_config.threshold: