diff --git a/frigate/data_processing/common/face/model.py b/frigate/data_processing/common/face/model.py index 6fda0bfa1..0f37d04ab 100644 --- a/frigate/data_processing/common/face/model.py +++ b/frigate/data_processing/common/face/model.py @@ -197,7 +197,7 @@ class FaceNetRecognizer(FaceRecognizer): continue # type: ignore[unreachable] img = self.align_face(img, img.shape[1], img.shape[0]) - emb = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type] + emb = self.face_embedder([img])[0].squeeze() face_embeddings_map[name].append(emb) idx += 1 diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index d9902feb4..462a314f2 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -10,7 +10,7 @@ import random import re import string from pathlib import Path -from typing import Any, List, Optional, Tuple +from typing import Any, List, Tuple import cv2 import numpy as np @@ -113,7 +113,7 @@ class LicensePlateProcessingMixin: ) try: - outputs = self.model_runner.detection_model([normalized_image])[0] + outputs = self.model_runner.detection_model([normalized_image])[0] # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running LPR box detection model: {e}") return [] @@ -121,18 +121,18 @@ class LicensePlateProcessingMixin: outputs = outputs[0, :, :] if False: - current_time = int(datetime.datetime.now().timestamp()) + current_time = int(datetime.datetime.now().timestamp()) # type: ignore[unreachable] cv2.imwrite( f"debug/frames/probability_map_{current_time}.jpg", (outputs * 255).astype(np.uint8), ) boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) - return self._filter_polygon(boxes, (h, w)) + return self._filter_polygon(boxes, (h, w)) # type: ignore[return-value,arg-type] def _classify( self, images: List[np.ndarray] - ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: + ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]] | None: """ Classify the orientation or category of each detected license plate. @@ -154,15 +154,15 @@ class LicensePlateProcessingMixin: norm_images.append(norm_img) try: - outputs = self.model_runner.classification_model(norm_images) + outputs = self.model_runner.classification_model(norm_images) # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running LPR classification model: {e}") - return + return None return self._process_classification_output(images, outputs) def _recognize( - self, camera: string, images: List[np.ndarray] + self, camera: str, images: List[np.ndarray] ) -> Tuple[List[str], List[List[float]]]: """ Recognize the characters on the detected license plates using the recognition model. @@ -195,7 +195,7 @@ class LicensePlateProcessingMixin: norm_images.append(norm_image) try: - outputs = self.model_runner.recognition_model(norm_images) + outputs = self.model_runner.recognition_model(norm_images) # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running LPR recognition model: {e}") return [], [] @@ -426,7 +426,8 @@ class LicensePlateProcessingMixin: ) if sorted_data: - return map(list, zip(*sorted_data)) + plates, confs, areas_list = zip(*sorted_data) + return list(plates), list(confs), list(areas_list) return [], [], [] @@ -548,7 +549,7 @@ class LicensePlateProcessingMixin: # Add the last box merged_boxes.append(current_box) - return np.array(merged_boxes, dtype=np.int32) + return np.array(merged_boxes, dtype=np.int32) # type: ignore[return-value] def _boxes_from_bitmap( self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int @@ -576,38 +577,42 @@ class LicensePlateProcessingMixin: boxes = [] scores = [] - for index in range(len(contours)): - contour = contours[index] + for index in range(len(contours)): # type: ignore[arg-type] + contour = contours[index] # type: ignore[index] # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. points, sside = self._get_min_boxes(contour) if sside < self.min_size: continue - points = np.array(points, dtype=np.float32) + points = np.array(points, dtype=np.float32) # type: ignore[assignment] score = self._box_score(output, contour) if self.box_thresh > score: continue - points = self._expand_box(points) + points = self._expand_box(points) # type: ignore[assignment] # Get the minimum area rectangle again after expansion - points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) + points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) # type: ignore[attr-defined] if sside < self.min_size + 2: continue - points = np.array(points, dtype=np.float32) + points = np.array(points, dtype=np.float32) # type: ignore[assignment] # normalize and clip box coordinates to fit within the destination image size. - points[:, 0] = np.clip( - np.round(points[:, 0] / width * dest_width), 0, dest_width + points[:, 0] = np.clip( # type: ignore[call-overload] + np.round(points[:, 0] / width * dest_width), # type: ignore[call-overload] + 0, + dest_width, ) - points[:, 1] = np.clip( - np.round(points[:, 1] / height * dest_height), 0, dest_height + points[:, 1] = np.clip( # type: ignore[call-overload] + np.round(points[:, 1] / height * dest_height), # type: ignore[call-overload] + 0, + dest_height, ) - boxes.append(points.astype("int32")) + boxes.append(points.astype("int32")) # type: ignore[attr-defined] scores.append(score) return np.array(boxes, dtype="int32"), scores @@ -648,7 +653,7 @@ class LicensePlateProcessingMixin: x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1]) x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1]) mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8) - cv2.fillPoly(mask, [contour - [x1, y1]], 1) + cv2.fillPoly(mask, [contour - [x1, y1]], 1) # type: ignore[call-overload] return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] @staticmethod @@ -706,7 +711,7 @@ class LicensePlateProcessingMixin: Returns: bool: Whether the polygon is valid or not. """ - return ( + return bool( point[:, 0].min() >= 0 and point[:, 0].max() < width and point[:, 1].min() >= 0 @@ -751,7 +756,7 @@ class LicensePlateProcessingMixin: return np.array([tl, tr, br, bl]) @staticmethod - def _sort_boxes(boxes): + def _sort_boxes(boxes: list[np.ndarray]) -> list[np.ndarray]: """ Sort polygons based on their position in the image. If boxes are close in vertical position (within 5 pixels), sort them by horizontal position. @@ -853,16 +858,16 @@ class LicensePlateProcessingMixin: results = [["", 0.0]] * len(images) indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) - outputs = np.stack(outputs) + stacked_outputs = np.stack(outputs) - outputs = [ - (labels[idx], outputs[i, idx]) - for i, idx in enumerate(outputs.argmax(axis=1)) + stacked_outputs = [ + (labels[idx], stacked_outputs[i, idx]) + for i, idx in enumerate(stacked_outputs.argmax(axis=1)) ] for i in range(0, len(images), self.batch_size): - for j in range(len(outputs)): - label, score = outputs[j] + for j in range(len(stacked_outputs)): + label, score = stacked_outputs[j] results[indices[i + j]] = [label, score] # make sure we have high confidence if we need to flip a box if "180" in label and score >= 0.7: @@ -870,10 +875,10 @@ class LicensePlateProcessingMixin: images[indices[i + j]], cv2.ROTATE_180 ) - return images, results + return images, results # type: ignore[return-value] def _preprocess_recognition_image( - self, camera: string, image: np.ndarray, max_wh_ratio: float + self, camera: str, image: np.ndarray, max_wh_ratio: float ) -> np.ndarray: """ Preprocess an image for recognition by dynamically adjusting its width. @@ -941,7 +946,7 @@ class LicensePlateProcessingMixin: input_w = int(input_h * max_wh_ratio) # check for model-specific input width - model_input_w = self.model_runner.recognition_model.runner.get_input_width() + model_input_w = self.model_runner.recognition_model.runner.get_input_width() # type: ignore[union-attr] if isinstance(model_input_w, int) and model_input_w > 0: input_w = model_input_w @@ -961,7 +966,7 @@ class LicensePlateProcessingMixin: padded_image[:, :, :resized_w] = resized_image if False: - current_time = int(datetime.datetime.now().timestamp() * 1000) + current_time = int(datetime.datetime.now().timestamp() * 1000) # type: ignore[unreachable] cv2.imwrite( f"debug/frames/preprocessed_recognition_{current_time}.jpg", image, @@ -999,8 +1004,9 @@ class LicensePlateProcessingMixin: np.linalg.norm(points[1] - points[2]), ) ) - pts_std = np.float32( - [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]] + pts_std = np.array( + [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]], + dtype=np.float32, ) matrix = cv2.getPerspectiveTransform(points, pts_std) image = cv2.warpPerspective( @@ -1016,15 +1022,15 @@ class LicensePlateProcessingMixin: return image def _detect_license_plate( - self, camera: string, input: np.ndarray - ) -> tuple[int, int, int, int]: + self, camera: str, input: np.ndarray + ) -> tuple[int, int, int, int] | None: """ Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ Return the dimensions of the detected plate as [x1, y1, x2, y2]. """ try: - predictions = self.model_runner.yolov9_detection_model(input) + predictions = self.model_runner.yolov9_detection_model(input) # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running YOLOv9 license plate detection model: {e}") return None @@ -1089,7 +1095,7 @@ class LicensePlateProcessingMixin: logger.debug( f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}" ) - return tuple(expanded_box.astype(int)) + return tuple(expanded_box.astype(int)) # type: ignore[return-value] else: return None # No detection above the threshold @@ -1113,7 +1119,7 @@ class LicensePlateProcessingMixin: f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})" ) - clusters = [] + clusters: list[list[dict[str, Any]]] = [] for i, plate in enumerate(plates): merged = False for j, cluster in enumerate(clusters): @@ -1148,7 +1154,7 @@ class LicensePlateProcessingMixin: ) # Best cluster: largest size, tiebroken by max conf - def cluster_score(c): + def cluster_score(c: list[dict[str, Any]]) -> tuple[int, float]: return (len(c), max(v["conf"] for v in c)) best_cluster_idx = max( @@ -1194,7 +1200,7 @@ class LicensePlateProcessingMixin: def lpr_process( self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False - ): + ) -> None: """Look for license plates in image.""" self.metrics.alpr_pps.value = self.plates_rec_second.eps() self.metrics.yolov9_lpr_pps.value = self.plates_det_second.eps() @@ -1211,7 +1217,7 @@ class LicensePlateProcessingMixin: rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) # apply motion mask - rgb[self.config.cameras[obj_data].motion.rasterized_mask == 0] = [0, 0, 0] + rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined] if WRITE_DEBUG_IMAGES: cv2.imwrite( @@ -1277,7 +1283,7 @@ class LicensePlateProcessingMixin: "stationary", False ): logger.debug( - f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" + f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" # type: ignore[operator] ) return @@ -1304,7 +1310,7 @@ class LicensePlateProcessingMixin: if time_since_stationary > self.stationary_scan_duration: return - license_plate: Optional[dict[str, Any]] = None + license_plate = None if "license_plate" not in self.config.cameras[camera].objects.track: logger.debug(f"{camera}: Running manual license_plate detection.") @@ -1317,7 +1323,7 @@ class LicensePlateProcessingMixin: rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) # apply motion mask - rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] + rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined] left, top, right, bottom = car_box car = rgb[top:bottom, left:right] @@ -1394,10 +1400,10 @@ class LicensePlateProcessingMixin: if attr.get("label") != "license_plate": continue - if license_plate is None or attr.get( + if license_plate is None or attr.get( # type: ignore[unreachable] "score", 0.0 ) > license_plate.get("score", 0.0): - license_plate = attr + license_plate = attr # type: ignore[assignment] # no license plates detected in this frame if not license_plate: @@ -1405,9 +1411,9 @@ class LicensePlateProcessingMixin: # we are using dedicated lpr with frigate+ if obj_data.get("label") == "license_plate": - license_plate = obj_data + license_plate = obj_data # type: ignore[assignment] - license_plate_box = license_plate.get("box") + license_plate_box = license_plate.get("box") # type: ignore[attr-defined] # check that license plate is valid if ( @@ -1436,7 +1442,7 @@ class LicensePlateProcessingMixin: 0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2 ) - plate_box = tuple(int(x) for x in expanded_box) + plate_box = tuple(int(x) for x in expanded_box) # type: ignore[assignment] # Crop using the expanded box license_plate_frame = license_plate_frame[ @@ -1612,7 +1618,7 @@ class LicensePlateProcessingMixin: sub_label = next( ( label - for label, plates_list in self.lpr_config.known_plates.items() + for label, plates_list in self.lpr_config.known_plates.items() # type: ignore[union-attr] if any( re.match(f"^{plate}$", rep_plate) or Levenshtein.distance(plate, rep_plate) @@ -1665,14 +1671,16 @@ class LicensePlateProcessingMixin: frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) _, encoded_img = cv2.imencode(".jpg", frame_bgr) self.sub_label_publisher.publish( - (base64.b64encode(encoded_img).decode("ASCII"), id, camera), + (base64.b64encode(encoded_img.tobytes()).decode("ASCII"), id, camera), EventMetadataTypeEnum.save_lpr_snapshot.value, ) - def handle_request(self, topic, request_data) -> dict[str, Any] | None: - return + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: + return None - def lpr_expire(self, object_id: str, camera: str): + def lpr_expire(self, object_id: str, camera: str) -> None: if object_id in self.detected_license_plates: self.detected_license_plates.pop(object_id) @@ -1689,7 +1697,7 @@ class CTCDecoder: for each decoded character sequence. """ - def __init__(self, character_dict_path=None): + def __init__(self, character_dict_path: str | None = None) -> None: """ Initializes the CTCDecoder. :param character_dict_path: Path to the character dictionary file. diff --git a/frigate/data_processing/common/license_plate/model.py b/frigate/data_processing/common/license_plate/model.py index f53ed7d95..b5d103c51 100644 --- a/frigate/data_processing/common/license_plate/model.py +++ b/frigate/data_processing/common/license_plate/model.py @@ -1,3 +1,5 @@ +from comms.inter_process import InterProcessRequestor + from frigate.embeddings.onnx.lpr_embedding import ( LicensePlateDetector, PaddleOCRClassification, @@ -9,7 +11,12 @@ from ...types import DataProcessorModelRunner class LicensePlateModelRunner(DataProcessorModelRunner): - def __init__(self, requestor, device: str = "CPU", model_size: str = "small"): + def __init__( + self, + requestor: InterProcessRequestor, + device: str = "CPU", + model_size: str = "small", + ): super().__init__(requestor, device, model_size) self.detection_model = PaddleOCRDetection( model_size=model_size, requestor=requestor, device=device diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index a1899684f..aa89aeb12 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -29,7 +29,7 @@ from .api import PostProcessorApi logger = logging.getLogger(__name__) -class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): +class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): # type: ignore[misc] def __init__( self, config: FrigateConfig,