From be1cbb958c7c48f9d92828000a59089e164c888c Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 25 Oct 2024 12:05:00 -0500 Subject: [PATCH] lpr maintainer --- frigate/embeddings/alpr/alpr.py | 63 ++---------------- frigate/embeddings/maintainer.py | 107 ++++++++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 58 deletions(-) diff --git a/frigate/embeddings/alpr/alpr.py b/frigate/embeddings/alpr/alpr.py index e8c039542..88fc94a7d 100644 --- a/frigate/embeddings/alpr/alpr.py +++ b/frigate/embeddings/alpr/alpr.py @@ -1,6 +1,5 @@ import math -from argparse import ArgumentParser -from typing import Any, Dict, List, Tuple +from typing import List, Tuple import cv2 import numpy as np @@ -8,12 +7,15 @@ from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset from shapely.geometry import Polygon from frigate.comms.inter_process import InterProcessRequestor +from frigate.config.semantic_search import LicensePlateRecognitionConfig from frigate.embeddings.functions.onnx import GenericONNXEmbedding, ModelTypeEnum class LicensePlateRecognition: - def __init__(self, config: Dict[str, Any], requestor: InterProcessRequestor): - self.config = config + def __init__( + self, config: LicensePlateRecognitionConfig, requestor: InterProcessRequestor + ): + self.lpr_config = config self.requestor = requestor self.detection_model = self._create_detection_model() self.classification_model = self._create_classification_model() @@ -553,9 +555,7 @@ class LicensePlateRecognition: for j in range(len(outputs)): label, score = outputs[j] results[indices[i + j]] = [label, score] - if "180" in label and score > self.config.get( - "classification_threshold", 0.98 - ): + if "180" in label and score >= self.lpr_config.threshold: images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1) return images, results @@ -807,52 +807,3 @@ class CTCDecoder: confidences.append(confidence) return results, confidences - - -def main(): - parser = ArgumentParser() - parser.add_argument("filepath", type=str, help="image file path") - args = parser.parse_args() - - lpr_config = { - "detection_model_path": "/workspace/frigate/frigate/embeddings/weights/detection.onnx", - "classification_model_path": "/workspace/frigate/frigate/embeddings/weights/classification.onnx", - "recognition_model_path": "/workspace/frigate/frigate/embeddings/weights/recognition.onnx", - "mask_thresh": 0.8, - "box_thresh": 0.8, - "min_size": 3, - "classification_threshold": 0.98, - } - - # Initialize LPR - license_plate_recognition = LicensePlateRecognition(lpr_config, {}) - - # Read and process image - frame = cv2.imread(args.filepath) - if frame is None: - print(f"Error: Could not read image file: {args.filepath}") - return - - cv2.cvtColor(frame, cv2.COLOR_BGR2RGB, frame) - - # Process the license plate - license_plates, confidences, areas = ( - license_plate_recognition.process_license_plate(frame) - ) - - # Print debug information to ensure data structure - print(f"License plates: {license_plates}") - print(f"Confidences: {confidences}") - print(f"Areas: {areas}") - - if license_plates: - for plate, confidence, area in zip(license_plates, confidences, areas): - print( - f"Detected license plate: {plate} (average confidence: {confidence:.2f}, area: {area} pixels)" - ) - else: - print("No license plate detected") - - -if __name__ == "__main__": - main() diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index ca7d09238..2c7dca691 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -22,6 +22,7 @@ from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscrib from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION +from frigate.embeddings.alpr.alpr import LicensePlateRecognition from frigate.events.types import EventTypeEnum from frigate.genai import get_genai_client from frigate.models import Event @@ -66,6 +67,16 @@ class EmbeddingMaintainer(threading.Thread): self.requires_face_detection = "face" not in self.config.model.all_attributes self.detected_faces: dict[str, float] = {} + # set license plate recognition conditions + self.lpr_config = self.config.lpr + self.requires_license_plate_detection = ( + "license_plate" not in self.config.model.all_attributes + ) + self.detected_license_plates: dict[str, float] = {} + self.license_plate_recognition = LicensePlateRecognition( + self.lpr_config, self.requestor + ) + # create communication for updating event descriptions self.requestor = InterProcessRequestor() self.stop_event = stop_event @@ -170,8 +181,12 @@ class EmbeddingMaintainer(threading.Thread): camera_config = self.config.cameras[camera] - # no need to process updated objects if face recognition and genai are disabled - if not camera_config.genai.enabled and not self.face_recognition_enabled: + # no need to process updated objects if face recognition, lpr, genai are disabled + if ( + not camera_config.genai.enabled + and not self.face_recognition_enabled + and not self.lpr_config.enabled + ): return # Create our own thumbnail based on the bounding box and the frame time @@ -190,6 +205,9 @@ class EmbeddingMaintainer(threading.Thread): if self.face_recognition_enabled: self._process_face(data, yuv_frame) + if self.lpr_config.enabled: + self._process_license_plate(data, yuv_frame) + # no need to save our own thumbnails if genai is not enabled # or if the object has become stationary if self.genai_client is not None and not data["stationary"]: @@ -465,6 +483,91 @@ class EmbeddingMaintainer(threading.Thread): if resp.status_code == 200: self.detected_faces[id] = avg_score + def _process_license_plate( + self, obj_data: dict[str, any], frame: np.ndarray + ) -> None: + """Look for faces in image.""" + id = obj_data["id"] + + # don't run for non person objects + if obj_data.get("label") != "car": + logger.debug("Not a processing license plate for non car object.") + return + + # don't overwrite sub label for objects that have a sub label + # that is not a face + if obj_data.get("sub_label") and id not in self.detected_license_plates: + logger.debug( + f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}." + ) + return + + license_plate: Optional[dict[str, any]] = None + + # don't run for object without attributes + if not obj_data.get("current_attributes"): + logger.debug("No attributes to parse.") + return + + attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) + for attr in attributes: + if attr.get("label") != "license_plate": + continue + + if license_plate is None or attr.get("score", 0.0) > license_plate.get( + "score", 0.0 + ): + license_plate = attr + + # no license plates detected in this frame + if not license_plate: + return + + license_plate_box = license_plate.get("box") + + # check that face is valid + if not license_plate_box or area(license_plate_box) < self.config.lpr.min_area: + logger.debug(f"Invalid license plate box {license_plate}") + return + + license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB) + license_plate_frame = license_plate_frame[ + license_plate_box[1] : license_plate_box[3], + license_plate_box[0] : license_plate_box[2], + ] + + # run detection, returns results sorted by confidence, best first + license_plates, confidences, areas = ( + self.license_plate_recognition.process_license_plate(license_plate_frame) + ) + + logger.debug(f"Text boxes: {license_plates}") + logger.debug(f"Confidences: {confidences}") + logger.debug(f"Areas: {areas}") + + if license_plates: + for plate, confidence, text_area in zip(license_plates, confidences, areas): + logger.debug( + f"Detected text: {plate} (average confidence: {confidence:.2f}, area: {text_area} pixels)" + ) + else: + logger.debug("No text detected") + + if confidences[0] < self.config.face_recognition.threshold: + logger.debug( + f"Recognized license plate top score {confidence[0]} is less than threshold ({self.config.lpr.threshold})." + ) + return + + resp = requests.post( + f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", + json={ + "camera": obj_data.get("camera"), + "subLabel": license_plates[0], + "subLabelScore": confidences[0], + }, + ) + def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: """Return jpg thumbnail of a region of the frame.""" frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)