Cleanup LPR mypy

This commit is contained in:
Nicolas Mowen 2026-03-26 10:50:57 -06:00
parent 9a654e9c4a
commit f5034f7c58
4 changed files with 77 additions and 62 deletions

View File

@ -197,7 +197,7 @@ class FaceNetRecognizer(FaceRecognizer):
continue # type: ignore[unreachable] continue # type: ignore[unreachable]
img = self.align_face(img, img.shape[1], img.shape[0]) img = self.align_face(img, img.shape[1], img.shape[0])
emb = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type] emb = self.face_embedder([img])[0].squeeze()
face_embeddings_map[name].append(emb) face_embeddings_map[name].append(emb)
idx += 1 idx += 1

View File

@ -10,7 +10,7 @@ import random
import re import re
import string import string
from pathlib import Path from pathlib import Path
from typing import Any, List, Optional, Tuple from typing import Any, List, Tuple
import cv2 import cv2
import numpy as np import numpy as np
@ -113,7 +113,7 @@ class LicensePlateProcessingMixin:
) )
try: try:
outputs = self.model_runner.detection_model([normalized_image])[0] outputs = self.model_runner.detection_model([normalized_image])[0] # type: ignore[arg-type]
except Exception as e: except Exception as e:
logger.warning(f"Error running LPR box detection model: {e}") logger.warning(f"Error running LPR box detection model: {e}")
return [] return []
@ -121,18 +121,18 @@ class LicensePlateProcessingMixin:
outputs = outputs[0, :, :] outputs = outputs[0, :, :]
if False: if False:
current_time = int(datetime.datetime.now().timestamp()) current_time = int(datetime.datetime.now().timestamp()) # type: ignore[unreachable]
cv2.imwrite( cv2.imwrite(
f"debug/frames/probability_map_{current_time}.jpg", f"debug/frames/probability_map_{current_time}.jpg",
(outputs * 255).astype(np.uint8), (outputs * 255).astype(np.uint8),
) )
boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self._filter_polygon(boxes, (h, w)) return self._filter_polygon(boxes, (h, w)) # type: ignore[return-value,arg-type]
def _classify( def _classify(
self, images: List[np.ndarray] self, images: List[np.ndarray]
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]] | None:
""" """
Classify the orientation or category of each detected license plate. Classify the orientation or category of each detected license plate.
@ -154,15 +154,15 @@ class LicensePlateProcessingMixin:
norm_images.append(norm_img) norm_images.append(norm_img)
try: try:
outputs = self.model_runner.classification_model(norm_images) outputs = self.model_runner.classification_model(norm_images) # type: ignore[arg-type]
except Exception as e: except Exception as e:
logger.warning(f"Error running LPR classification model: {e}") logger.warning(f"Error running LPR classification model: {e}")
return return None
return self._process_classification_output(images, outputs) return self._process_classification_output(images, outputs)
def _recognize( def _recognize(
self, camera: string, images: List[np.ndarray] self, camera: str, images: List[np.ndarray]
) -> Tuple[List[str], List[List[float]]]: ) -> Tuple[List[str], List[List[float]]]:
""" """
Recognize the characters on the detected license plates using the recognition model. Recognize the characters on the detected license plates using the recognition model.
@ -195,7 +195,7 @@ class LicensePlateProcessingMixin:
norm_images.append(norm_image) norm_images.append(norm_image)
try: try:
outputs = self.model_runner.recognition_model(norm_images) outputs = self.model_runner.recognition_model(norm_images) # type: ignore[arg-type]
except Exception as e: except Exception as e:
logger.warning(f"Error running LPR recognition model: {e}") logger.warning(f"Error running LPR recognition model: {e}")
return [], [] return [], []
@ -426,7 +426,8 @@ class LicensePlateProcessingMixin:
) )
if sorted_data: if sorted_data:
return map(list, zip(*sorted_data)) plates, confs, areas_list = zip(*sorted_data)
return list(plates), list(confs), list(areas_list)
return [], [], [] return [], [], []
@ -548,7 +549,7 @@ class LicensePlateProcessingMixin:
# Add the last box # Add the last box
merged_boxes.append(current_box) merged_boxes.append(current_box)
return np.array(merged_boxes, dtype=np.int32) return np.array(merged_boxes, dtype=np.int32) # type: ignore[return-value]
def _boxes_from_bitmap( def _boxes_from_bitmap(
self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int
@ -576,38 +577,42 @@ class LicensePlateProcessingMixin:
boxes = [] boxes = []
scores = [] scores = []
for index in range(len(contours)): for index in range(len(contours)): # type: ignore[arg-type]
contour = contours[index] contour = contours[index] # type: ignore[index]
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length. # get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, sside = self._get_min_boxes(contour) points, sside = self._get_min_boxes(contour)
if sside < self.min_size: if sside < self.min_size:
continue continue
points = np.array(points, dtype=np.float32) points = np.array(points, dtype=np.float32) # type: ignore[assignment]
score = self._box_score(output, contour) score = self._box_score(output, contour)
if self.box_thresh > score: if self.box_thresh > score:
continue continue
points = self._expand_box(points) points = self._expand_box(points) # type: ignore[assignment]
# Get the minimum area rectangle again after expansion # Get the minimum area rectangle again after expansion
points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) # type: ignore[attr-defined]
if sside < self.min_size + 2: if sside < self.min_size + 2:
continue continue
points = np.array(points, dtype=np.float32) points = np.array(points, dtype=np.float32) # type: ignore[assignment]
# normalize and clip box coordinates to fit within the destination image size. # normalize and clip box coordinates to fit within the destination image size.
points[:, 0] = np.clip( points[:, 0] = np.clip( # type: ignore[call-overload]
np.round(points[:, 0] / width * dest_width), 0, dest_width np.round(points[:, 0] / width * dest_width), # type: ignore[call-overload]
0,
dest_width,
) )
points[:, 1] = np.clip( points[:, 1] = np.clip( # type: ignore[call-overload]
np.round(points[:, 1] / height * dest_height), 0, dest_height np.round(points[:, 1] / height * dest_height), # type: ignore[call-overload]
0,
dest_height,
) )
boxes.append(points.astype("int32")) boxes.append(points.astype("int32")) # type: ignore[attr-defined]
scores.append(score) scores.append(score)
return np.array(boxes, dtype="int32"), scores return np.array(boxes, dtype="int32"), scores
@ -648,7 +653,7 @@ class LicensePlateProcessingMixin:
x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1]) x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1])
x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1]) x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1])
mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8) mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8)
cv2.fillPoly(mask, [contour - [x1, y1]], 1) cv2.fillPoly(mask, [contour - [x1, y1]], 1) # type: ignore[call-overload]
return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0]
@staticmethod @staticmethod
@ -706,7 +711,7 @@ class LicensePlateProcessingMixin:
Returns: Returns:
bool: Whether the polygon is valid or not. bool: Whether the polygon is valid or not.
""" """
return ( return bool(
point[:, 0].min() >= 0 point[:, 0].min() >= 0
and point[:, 0].max() < width and point[:, 0].max() < width
and point[:, 1].min() >= 0 and point[:, 1].min() >= 0
@ -751,7 +756,7 @@ class LicensePlateProcessingMixin:
return np.array([tl, tr, br, bl]) return np.array([tl, tr, br, bl])
@staticmethod @staticmethod
def _sort_boxes(boxes): def _sort_boxes(boxes: list[np.ndarray]) -> list[np.ndarray]:
""" """
Sort polygons based on their position in the image. If boxes are close in vertical Sort polygons based on their position in the image. If boxes are close in vertical
position (within 5 pixels), sort them by horizontal position. position (within 5 pixels), sort them by horizontal position.
@ -853,16 +858,16 @@ class LicensePlateProcessingMixin:
results = [["", 0.0]] * len(images) results = [["", 0.0]] * len(images)
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
outputs = np.stack(outputs) stacked_outputs = np.stack(outputs)
outputs = [ stacked_outputs = [
(labels[idx], outputs[i, idx]) (labels[idx], stacked_outputs[i, idx])
for i, idx in enumerate(outputs.argmax(axis=1)) for i, idx in enumerate(stacked_outputs.argmax(axis=1))
] ]
for i in range(0, len(images), self.batch_size): for i in range(0, len(images), self.batch_size):
for j in range(len(outputs)): for j in range(len(stacked_outputs)):
label, score = outputs[j] label, score = stacked_outputs[j]
results[indices[i + j]] = [label, score] results[indices[i + j]] = [label, score]
# make sure we have high confidence if we need to flip a box # make sure we have high confidence if we need to flip a box
if "180" in label and score >= 0.7: if "180" in label and score >= 0.7:
@ -870,10 +875,10 @@ class LicensePlateProcessingMixin:
images[indices[i + j]], cv2.ROTATE_180 images[indices[i + j]], cv2.ROTATE_180
) )
return images, results return images, results # type: ignore[return-value]
def _preprocess_recognition_image( def _preprocess_recognition_image(
self, camera: string, image: np.ndarray, max_wh_ratio: float self, camera: str, image: np.ndarray, max_wh_ratio: float
) -> np.ndarray: ) -> np.ndarray:
""" """
Preprocess an image for recognition by dynamically adjusting its width. Preprocess an image for recognition by dynamically adjusting its width.
@ -941,7 +946,7 @@ class LicensePlateProcessingMixin:
input_w = int(input_h * max_wh_ratio) input_w = int(input_h * max_wh_ratio)
# check for model-specific input width # check for model-specific input width
model_input_w = self.model_runner.recognition_model.runner.get_input_width() model_input_w = self.model_runner.recognition_model.runner.get_input_width() # type: ignore[union-attr]
if isinstance(model_input_w, int) and model_input_w > 0: if isinstance(model_input_w, int) and model_input_w > 0:
input_w = model_input_w input_w = model_input_w
@ -961,7 +966,7 @@ class LicensePlateProcessingMixin:
padded_image[:, :, :resized_w] = resized_image padded_image[:, :, :resized_w] = resized_image
if False: if False:
current_time = int(datetime.datetime.now().timestamp() * 1000) current_time = int(datetime.datetime.now().timestamp() * 1000) # type: ignore[unreachable]
cv2.imwrite( cv2.imwrite(
f"debug/frames/preprocessed_recognition_{current_time}.jpg", f"debug/frames/preprocessed_recognition_{current_time}.jpg",
image, image,
@ -999,8 +1004,9 @@ class LicensePlateProcessingMixin:
np.linalg.norm(points[1] - points[2]), np.linalg.norm(points[1] - points[2]),
) )
) )
pts_std = np.float32( pts_std = np.array(
[[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]] [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]],
dtype=np.float32,
) )
matrix = cv2.getPerspectiveTransform(points, pts_std) matrix = cv2.getPerspectiveTransform(points, pts_std)
image = cv2.warpPerspective( image = cv2.warpPerspective(
@ -1016,15 +1022,15 @@ class LicensePlateProcessingMixin:
return image return image
def _detect_license_plate( def _detect_license_plate(
self, camera: string, input: np.ndarray self, camera: str, input: np.ndarray
) -> tuple[int, int, int, int]: ) -> tuple[int, int, int, int] | None:
""" """
Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ Use a lightweight YOLOv9 model to detect license plates for users without Frigate+
Return the dimensions of the detected plate as [x1, y1, x2, y2]. Return the dimensions of the detected plate as [x1, y1, x2, y2].
""" """
try: try:
predictions = self.model_runner.yolov9_detection_model(input) predictions = self.model_runner.yolov9_detection_model(input) # type: ignore[arg-type]
except Exception as e: except Exception as e:
logger.warning(f"Error running YOLOv9 license plate detection model: {e}") logger.warning(f"Error running YOLOv9 license plate detection model: {e}")
return None return None
@ -1089,7 +1095,7 @@ class LicensePlateProcessingMixin:
logger.debug( logger.debug(
f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}" f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}"
) )
return tuple(expanded_box.astype(int)) return tuple(expanded_box.astype(int)) # type: ignore[return-value]
else: else:
return None # No detection above the threshold return None # No detection above the threshold
@ -1113,7 +1119,7 @@ class LicensePlateProcessingMixin:
f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})" f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})"
) )
clusters = [] clusters: list[list[dict[str, Any]]] = []
for i, plate in enumerate(plates): for i, plate in enumerate(plates):
merged = False merged = False
for j, cluster in enumerate(clusters): for j, cluster in enumerate(clusters):
@ -1148,7 +1154,7 @@ class LicensePlateProcessingMixin:
) )
# Best cluster: largest size, tiebroken by max conf # Best cluster: largest size, tiebroken by max conf
def cluster_score(c): def cluster_score(c: list[dict[str, Any]]) -> tuple[int, float]:
return (len(c), max(v["conf"] for v in c)) return (len(c), max(v["conf"] for v in c))
best_cluster_idx = max( best_cluster_idx = max(
@ -1194,7 +1200,7 @@ class LicensePlateProcessingMixin:
def lpr_process( def lpr_process(
self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False
): ) -> None:
"""Look for license plates in image.""" """Look for license plates in image."""
self.metrics.alpr_pps.value = self.plates_rec_second.eps() self.metrics.alpr_pps.value = self.plates_rec_second.eps()
self.metrics.yolov9_lpr_pps.value = self.plates_det_second.eps() self.metrics.yolov9_lpr_pps.value = self.plates_det_second.eps()
@ -1211,7 +1217,7 @@ class LicensePlateProcessingMixin:
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# apply motion mask # apply motion mask
rgb[self.config.cameras[obj_data].motion.rasterized_mask == 0] = [0, 0, 0] rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined]
if WRITE_DEBUG_IMAGES: if WRITE_DEBUG_IMAGES:
cv2.imwrite( cv2.imwrite(
@ -1277,7 +1283,7 @@ class LicensePlateProcessingMixin:
"stationary", False "stationary", False
): ):
logger.debug( logger.debug(
f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" # type: ignore[operator]
) )
return return
@ -1304,7 +1310,7 @@ class LicensePlateProcessingMixin:
if time_since_stationary > self.stationary_scan_duration: if time_since_stationary > self.stationary_scan_duration:
return return
license_plate: Optional[dict[str, Any]] = None license_plate = None
if "license_plate" not in self.config.cameras[camera].objects.track: if "license_plate" not in self.config.cameras[camera].objects.track:
logger.debug(f"{camera}: Running manual license_plate detection.") logger.debug(f"{camera}: Running manual license_plate detection.")
@ -1317,7 +1323,7 @@ class LicensePlateProcessingMixin:
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
# apply motion mask # apply motion mask
rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined]
left, top, right, bottom = car_box left, top, right, bottom = car_box
car = rgb[top:bottom, left:right] car = rgb[top:bottom, left:right]
@ -1394,10 +1400,10 @@ class LicensePlateProcessingMixin:
if attr.get("label") != "license_plate": if attr.get("label") != "license_plate":
continue continue
if license_plate is None or attr.get( if license_plate is None or attr.get( # type: ignore[unreachable]
"score", 0.0 "score", 0.0
) > license_plate.get("score", 0.0): ) > license_plate.get("score", 0.0):
license_plate = attr license_plate = attr # type: ignore[assignment]
# no license plates detected in this frame # no license plates detected in this frame
if not license_plate: if not license_plate:
@ -1405,9 +1411,9 @@ class LicensePlateProcessingMixin:
# we are using dedicated lpr with frigate+ # we are using dedicated lpr with frigate+
if obj_data.get("label") == "license_plate": if obj_data.get("label") == "license_plate":
license_plate = obj_data license_plate = obj_data # type: ignore[assignment]
license_plate_box = license_plate.get("box") license_plate_box = license_plate.get("box") # type: ignore[attr-defined]
# check that license plate is valid # check that license plate is valid
if ( if (
@ -1436,7 +1442,7 @@ class LicensePlateProcessingMixin:
0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2 0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2
) )
plate_box = tuple(int(x) for x in expanded_box) plate_box = tuple(int(x) for x in expanded_box) # type: ignore[assignment]
# Crop using the expanded box # Crop using the expanded box
license_plate_frame = license_plate_frame[ license_plate_frame = license_plate_frame[
@ -1612,7 +1618,7 @@ class LicensePlateProcessingMixin:
sub_label = next( sub_label = next(
( (
label label
for label, plates_list in self.lpr_config.known_plates.items() for label, plates_list in self.lpr_config.known_plates.items() # type: ignore[union-attr]
if any( if any(
re.match(f"^{plate}$", rep_plate) re.match(f"^{plate}$", rep_plate)
or Levenshtein.distance(plate, rep_plate) or Levenshtein.distance(plate, rep_plate)
@ -1665,14 +1671,16 @@ class LicensePlateProcessingMixin:
frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
_, encoded_img = cv2.imencode(".jpg", frame_bgr) _, encoded_img = cv2.imencode(".jpg", frame_bgr)
self.sub_label_publisher.publish( self.sub_label_publisher.publish(
(base64.b64encode(encoded_img).decode("ASCII"), id, camera), (base64.b64encode(encoded_img.tobytes()).decode("ASCII"), id, camera),
EventMetadataTypeEnum.save_lpr_snapshot.value, EventMetadataTypeEnum.save_lpr_snapshot.value,
) )
def handle_request(self, topic, request_data) -> dict[str, Any] | None: def handle_request(
return self, topic: str, request_data: dict[str, Any]
) -> dict[str, Any] | None:
return None
def lpr_expire(self, object_id: str, camera: str): def lpr_expire(self, object_id: str, camera: str) -> None:
if object_id in self.detected_license_plates: if object_id in self.detected_license_plates:
self.detected_license_plates.pop(object_id) self.detected_license_plates.pop(object_id)
@ -1689,7 +1697,7 @@ class CTCDecoder:
for each decoded character sequence. for each decoded character sequence.
""" """
def __init__(self, character_dict_path=None): def __init__(self, character_dict_path: str | None = None) -> None:
""" """
Initializes the CTCDecoder. Initializes the CTCDecoder.
:param character_dict_path: Path to the character dictionary file. :param character_dict_path: Path to the character dictionary file.

View File

@ -1,3 +1,5 @@
from comms.inter_process import InterProcessRequestor
from frigate.embeddings.onnx.lpr_embedding import ( from frigate.embeddings.onnx.lpr_embedding import (
LicensePlateDetector, LicensePlateDetector,
PaddleOCRClassification, PaddleOCRClassification,
@ -9,7 +11,12 @@ from ...types import DataProcessorModelRunner
class LicensePlateModelRunner(DataProcessorModelRunner): class LicensePlateModelRunner(DataProcessorModelRunner):
def __init__(self, requestor, device: str = "CPU", model_size: str = "small"): def __init__(
self,
requestor: InterProcessRequestor,
device: str = "CPU",
model_size: str = "small",
):
super().__init__(requestor, device, model_size) super().__init__(requestor, device, model_size)
self.detection_model = PaddleOCRDetection( self.detection_model = PaddleOCRDetection(
model_size=model_size, requestor=requestor, device=device model_size=model_size, requestor=requestor, device=device

View File

@ -29,7 +29,7 @@ from .api import PostProcessorApi
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): # type: ignore[misc]
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,