lpr maintainer

This commit is contained in:
Josh Hawkins 2024-10-25 12:05:00 -05:00
parent e45bb2d193
commit be1cbb958c
2 changed files with 112 additions and 58 deletions

View File

@ -1,6 +1,5 @@
import math
from argparse import ArgumentParser
from typing import Any, Dict, List, Tuple
from typing import List, Tuple
import cv2
import numpy as np
@ -8,12 +7,15 @@ from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
from shapely.geometry import Polygon
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config.semantic_search import LicensePlateRecognitionConfig
from frigate.embeddings.functions.onnx import GenericONNXEmbedding, ModelTypeEnum
class LicensePlateRecognition:
def __init__(self, config: Dict[str, Any], requestor: InterProcessRequestor):
self.config = config
def __init__(
self, config: LicensePlateRecognitionConfig, requestor: InterProcessRequestor
):
self.lpr_config = config
self.requestor = requestor
self.detection_model = self._create_detection_model()
self.classification_model = self._create_classification_model()
@ -553,9 +555,7 @@ class LicensePlateRecognition:
for j in range(len(outputs)):
label, score = outputs[j]
results[indices[i + j]] = [label, score]
if "180" in label and score > self.config.get(
"classification_threshold", 0.98
):
if "180" in label and score >= self.lpr_config.threshold:
images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1)
return images, results
@ -807,52 +807,3 @@ class CTCDecoder:
confidences.append(confidence)
return results, confidences
def main():
parser = ArgumentParser()
parser.add_argument("filepath", type=str, help="image file path")
args = parser.parse_args()
lpr_config = {
"detection_model_path": "/workspace/frigate/frigate/embeddings/weights/detection.onnx",
"classification_model_path": "/workspace/frigate/frigate/embeddings/weights/classification.onnx",
"recognition_model_path": "/workspace/frigate/frigate/embeddings/weights/recognition.onnx",
"mask_thresh": 0.8,
"box_thresh": 0.8,
"min_size": 3,
"classification_threshold": 0.98,
}
# Initialize LPR
license_plate_recognition = LicensePlateRecognition(lpr_config, {})
# Read and process image
frame = cv2.imread(args.filepath)
if frame is None:
print(f"Error: Could not read image file: {args.filepath}")
return
cv2.cvtColor(frame, cv2.COLOR_BGR2RGB, frame)
# Process the license plate
license_plates, confidences, areas = (
license_plate_recognition.process_license_plate(frame)
)
# Print debug information to ensure data structure
print(f"License plates: {license_plates}")
print(f"Confidences: {confidences}")
print(f"Areas: {areas}")
if license_plates:
for plate, confidence, area in zip(license_plates, confidences, areas):
print(
f"Detected license plate: {plate} (average confidence: {confidence:.2f}, area: {area} pixels)"
)
else:
print("No license plate detected")
if __name__ == "__main__":
main()

View File

@ -22,6 +22,7 @@ from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscrib
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION
from frigate.embeddings.alpr.alpr import LicensePlateRecognition
from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
from frigate.models import Event
@ -66,6 +67,16 @@ class EmbeddingMaintainer(threading.Thread):
self.requires_face_detection = "face" not in self.config.model.all_attributes
self.detected_faces: dict[str, float] = {}
# set license plate recognition conditions
self.lpr_config = self.config.lpr
self.requires_license_plate_detection = (
"license_plate" not in self.config.model.all_attributes
)
self.detected_license_plates: dict[str, float] = {}
self.license_plate_recognition = LicensePlateRecognition(
self.lpr_config, self.requestor
)
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.stop_event = stop_event
@ -170,8 +181,12 @@ class EmbeddingMaintainer(threading.Thread):
camera_config = self.config.cameras[camera]
# no need to process updated objects if face recognition and genai are disabled
if not camera_config.genai.enabled and not self.face_recognition_enabled:
# no need to process updated objects if face recognition, lpr, genai are disabled
if (
not camera_config.genai.enabled
and not self.face_recognition_enabled
and not self.lpr_config.enabled
):
return
# Create our own thumbnail based on the bounding box and the frame time
@ -190,6 +205,9 @@ class EmbeddingMaintainer(threading.Thread):
if self.face_recognition_enabled:
self._process_face(data, yuv_frame)
if self.lpr_config.enabled:
self._process_license_plate(data, yuv_frame)
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self.genai_client is not None and not data["stationary"]:
@ -465,6 +483,91 @@ class EmbeddingMaintainer(threading.Thread):
if resp.status_code == 200:
self.detected_faces[id] = avg_score
def _process_license_plate(
self, obj_data: dict[str, any], frame: np.ndarray
) -> None:
"""Look for faces in image."""
id = obj_data["id"]
# don't run for non person objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a face
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that face is valid
if not license_plate_box or area(license_plate_box) < self.config.lpr.min_area:
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB)
license_plate_frame = license_plate_frame[
license_plate_box[1] : license_plate_box[3],
license_plate_box[0] : license_plate_box[2],
]
# run detection, returns results sorted by confidence, best first
license_plates, confidences, areas = (
self.license_plate_recognition.process_license_plate(license_plate_frame)
)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
logger.debug(
f"Detected text: {plate} (average confidence: {confidence:.2f}, area: {text_area} pixels)"
)
else:
logger.debug("No text detected")
if confidences[0] < self.config.face_recognition.threshold:
logger.debug(
f"Recognized license plate top score {confidence[0]} is less than threshold ({self.config.lpr.threshold})."
)
return
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": license_plates[0],
"subLabelScore": confidences[0],
},
)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)