Merge pull request #1 from weitheng/0.16

0.16 Alpha 30th Oct
This commit is contained in:
Wei Theng 2024-10-30 09:46:22 +00:00 committed by GitHub
commit 20ae6099f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 1757 additions and 82 deletions

View File

@ -2,6 +2,7 @@ aarch
absdiff
airockchip
Alloc
alpr
Amcrest
amdgpu
analyzeduration
@ -60,6 +61,7 @@ dsize
dtype
ECONNRESET
edgetpu
facenet
fastapi
faststart
fflags
@ -113,6 +115,8 @@ itemsize
Jellyfin
jetson
jetsons
jina
jinaai
joserfc
jsmpeg
jsonify
@ -186,6 +190,7 @@ openai
opencv
openvino
OWASP
paddleocr
paho
passwordless
popleft

View File

@ -1,7 +1,7 @@
default_target: local
COMMIT_HASH := $(shell git log -1 --pretty=format:"%h"|tail -1)
VERSION = 0.15.0
VERSION = 0.16.0
IMAGE_REPO ?= ghcr.io/blakeblackshear/frigate
GITHUB_REF_NAME ?= $(shell git rev-parse --abbrev-ref HEAD)
BOARDS= #Initialized empty

View File

@ -8,6 +8,8 @@ imutils == 0.5.*
joserfc == 1.0.*
pathvalidate == 3.2.*
markupsafe == 2.1.*
python-multipart == 0.0.12
# General
mypy == 1.6.1
numpy == 1.26.*
onvif_zeep == 0.2.12
@ -43,3 +45,6 @@ openai == 1.51.*
# push notifications
py-vapid == 1.9.*
pywebpush == 2.0.*
# alpr
pyclipper == 1.3.*
shapely == 2.0.*

View File

@ -246,6 +246,8 @@ http {
proxy_no_cache $should_not_cache;
add_header X-Cache-Status $upstream_cache_status;
client_max_body_size 10M;
location /api/vod/ {
include auth_request.conf;
proxy_pass http://frigate_api/vod/;

View File

@ -0,0 +1,21 @@
---
id: face_recognition
title: Face Recognition
---
Face recognition allows people to be assigned names and when their face is recognized Frigate will assign the person's name as a sub label. This information is included in the UI, filters, as well as in notifications.
Frigate has support for FaceNet to create face embeddings, which runs locally. Embeddings are then saved to Frigate's database.
## Minimum System Requirements
Face recognition works by running a large AI model locally on your system. Systems without a GPU will not run Face Recognition reliably or at all.
## Configuration
Face recognition is disabled by default and requires semantic search to be enabled, face recognition must be enabled in your config file before it can be used. Semantic Search and face recognition are global configuration settings.
```yaml
face_recognition:
enabled: true
```

View File

@ -0,0 +1,48 @@
---
id: license_plate_recognition
title: License Plate Recognition (LPR)
---
Frigate can recognize license plates on vehicles and automatically add the detected characters as a `sub_label` to objects that are of type `car`. A common use case may be to read the license plates of cars pulling into a driveway or cars passing by on a street with a dedicated LPR camera.
Users running a Frigate+ model should ensure that `license_plate` is added to the [list of objects to track](https://docs.frigate.video/plus/#available-label-types) either globally or for a specific camera. This will improve the accuracy and performance of the LPR model.
LPR is most effective when the vehicles license plate is fully visible to the camera. For moving vehicles, Frigate will attempt to read the plate continuously, refining its detection and keeping the most confident result. LPR will not run on stationary vehicles.
## Minimum System Requirements
License plate recognition works by running AI models locally on your system. The models are relatively lightweight and run on your CPU. At least 4GB of RAM is required.
## Configuration
License plate recognition is disabled by default. Enable it in your config file:
```yaml
lpr:
enabled: true
```
## Advanced Configuration
Several options are available to fine-tune the LPR feature. For example, you can adjust the `min_area` setting, which defines the minimum size in pixels a license plate must be before LPR runs. The default is 500 pixels.
Additionally, you can define `known_plates`, allowing Frigate to label tracked vehicles with custom sub_labels when a recognized plate is detected. This information is then accessible in the UI, filters, and notifications.
```yaml
lpr:
enabled: true
min_area: 500
known_plates:
Wife's Car:
- "ABC-1234"
- "ABC-I234"
Johnny:
- "JHN-1234"
- "JMN-1234"
- "JHN-I234"
Sally:
- "SLL-1234"
- "5LL-1234"
```
In this example, "Wife's Car" will appear as the label for any vehicle matching the plate "ABC-1234." The model might occasionally interpret the digit 1 as a capital I (e.g., "ABC-I234"), so both variations are listed. Similarly, multiple possible variations are specified for Johnny and Sally.

View File

@ -522,6 +522,14 @@ semantic_search:
# NOTE: small model runs on CPU and large model runs on GPU
model_size: "small"
# Optional: Configuration for face recognition capability
face_recognition:
# Optional: Enable semantic search (default: shown below)
enabled: False
# Optional: Set the model size used for embeddings. (default: shown below)
# NOTE: small model runs on CPU and large model runs on GPU
model_size: "small"
# Optional: Configuration for AI generated tracked object descriptions
# NOTE: Semantic Search must be enabled for this to do anything.
# WARNING: Depending on the provider, this will send thumbnails over the internet

View File

@ -36,6 +36,8 @@ const sidebars: SidebarsConfig = {
'Semantic Search': [
'configuration/semantic_search',
'configuration/genai',
'configuration/face_recognition',
'configuration/license_plate_recognition',
],
Cameras: [
'configuration/cameras',

View File

@ -0,0 +1,56 @@
"""Object classification APIs."""
import logging
from fastapi import APIRouter, Request, UploadFile
from fastapi.responses import JSONResponse
from frigate.api.defs.tags import Tags
from frigate.embeddings import EmbeddingsContext
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.events])
@router.get("/faces")
def get_faces():
return JSONResponse(content={"message": "there are faces"})
@router.post("/faces/{name}")
async def register_face(request: Request, name: str, file: UploadFile):
# if not file.content_type.startswith("image"):
# return JSONResponse(
# status_code=400,
# content={
# "success": False,
# "message": "Only an image can be used to register a face.",
# },
# )
context: EmbeddingsContext = request.app.embeddings
context.register_face(name, await file.read())
return JSONResponse(
status_code=200,
content={"success": True, "message": "Successfully registered face."},
)
@router.delete("/faces")
def deregister_faces(request: Request, body: dict = None):
json: dict[str, any] = body or {}
list_of_ids = json.get("ids", "")
if not list_of_ids or len(list_of_ids) == 0:
return JSONResponse(
content=({"success": False, "message": "Not a valid list of ids"}),
status_code=404,
)
context: EmbeddingsContext = request.app.embeddings
context.delete_face_ids(list_of_ids)
return JSONResponse(
content=({"success": True, "message": "Successfully deleted faces."}),
status_code=200,
)

View File

@ -8,6 +8,9 @@ class EventsSubLabelBody(BaseModel):
subLabelScore: Optional[float] = Field(
title="Score for sub label", default=None, gt=0.0, le=1.0
)
camera: Optional[str] = Field(
title="Camera this object is detected on.", default=None
)
class EventsDescriptionBody(BaseModel):

View File

@ -10,4 +10,5 @@ class Tags(Enum):
review = "Review"
export = "Export"
events = "Events"
classification = "classification"
auth = "Auth"

View File

@ -890,38 +890,59 @@ def set_sub_label(
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
if not body.camera:
return JSONResponse(
content=(
{
"success": False,
"message": "Event "
+ event_id
+ " not found and camera is not provided.",
}
),
status_code=404,
)
event = None
if request.app.detected_frames_processor:
tracked_obj: TrackedObject = (
request.app.detected_frames_processor.camera_states[
event.camera if event else body.camera
].tracked_objects.get(event_id)
)
else:
tracked_obj = None
if not event and not tracked_obj:
return JSONResponse(
content=({"success": False, "message": "Event " + event_id + " not found"}),
content=(
{"success": False, "message": "Event " + event_id + " not found."}
),
status_code=404,
)
new_sub_label = body.subLabel
new_score = body.subLabelScore
if not event.end_time:
# update tracked object
tracked_obj: TrackedObject = (
request.app.detected_frames_processor.camera_states[
event.camera
].tracked_objects.get(event.id)
)
if tracked_obj:
tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score)
if tracked_obj:
tracked_obj.obj_data["sub_label"] = (new_sub_label, new_score)
# update timeline items
Timeline.update(
data=Timeline.data.update({"sub_label": (new_sub_label, new_score)})
).where(Timeline.source_id == event_id).execute()
event.sub_label = new_sub_label
if event:
event.sub_label = new_sub_label
if new_score:
data = event.data
data["sub_label_score"] = new_score
event.data = data
if new_score:
data = event.data
data["sub_label_score"] = new_score
event.data = data
event.save()
event.save()
return JSONResponse(
content=(
{

View File

@ -11,7 +11,16 @@ from starlette_context import middleware, plugins
from starlette_context.plugins import Plugin
from frigate.api import app as main_app
from frigate.api import auth, event, export, media, notification, preview, review
from frigate.api import (
auth,
classification,
event,
export,
media,
notification,
preview,
review,
)
from frigate.api.auth import get_jwt_secret, limiter
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
@ -95,6 +104,7 @@ def create_fastapi_app(
# Routes
# Order of include_router matters: https://fastapi.tiangolo.com/tutorial/path-params/#order-matters
app.include_router(auth.router)
app.include_router(classification.router)
app.include_router(review.router)
app.include_router(main_app.router)
app.include_router(preview.router)

View File

@ -12,6 +12,7 @@ class EmbeddingsRequestEnum(Enum):
embed_description = "embed_description"
embed_thumbnail = "embed_thumbnail"
generate_search = "generate_search"
register_face = "register_face"
class EmbeddingsResponder:
@ -22,7 +23,7 @@ class EmbeddingsResponder:
def check_for_request(self, process: Callable) -> None:
while True: # load all messages that are queued
has_message, _, _ = zmq.select([self.socket], [], [], 0.1)
has_message, _, _ = zmq.select([self.socket], [], [], 0.01)
if not has_message:
break

View File

@ -1,6 +1,6 @@
from typing import Any, Optional, Union
from pydantic import Field, field_serializer
from pydantic import Field, PrivateAttr, field_serializer
from ..base import FrigateBaseModel
@ -53,3 +53,20 @@ class ObjectConfig(FrigateBaseModel):
default_factory=dict, title="Object filters."
)
mask: Union[str, list[str]] = Field(default="", title="Object mask.")
_all_objects: list[str] = PrivateAttr()
@property
def all_objects(self) -> list[str]:
return self._all_objects
def parse_all_objects(self, cameras):
if "_all_objects" in self:
return
# get list of unique enabled labels for tracking
enabled_labels = set(self.track)
for camera in cameras.values():
enabled_labels.update(camera.objects.track)
self._all_objects = list(enabled_labels)

View File

@ -56,7 +56,11 @@ from .logger import LoggerConfig
from .mqtt import MqttConfig
from .notification import NotificationConfig
from .proxy import ProxyConfig
from .semantic_search import SemanticSearchConfig
from .semantic_search import (
FaceRecognitionConfig,
LicensePlateRecognitionConfig,
SemanticSearchConfig,
)
from .telemetry import TelemetryConfig
from .tls import TlsConfig
from .ui import UIConfig
@ -159,6 +163,16 @@ class RestreamConfig(BaseModel):
model_config = ConfigDict(extra="allow")
def verify_semantic_search_dependent_configs(config: FrigateConfig) -> None:
"""Verify that semantic search is enabled if required features are enabled."""
if not config.semantic_search.enabled:
if config.genai.enabled:
raise ValueError("Genai requires semantic search to be enabled.")
if config.face_recognition.enabled:
raise ValueError("Face recognition requires semantic to be enabled.")
def verify_config_roles(camera_config: CameraConfig) -> None:
"""Verify that roles are setup in the config correctly."""
assigned_roles = list(
@ -316,6 +330,13 @@ class FrigateConfig(FrigateBaseModel):
semantic_search: SemanticSearchConfig = Field(
default_factory=SemanticSearchConfig, title="Semantic search configuration."
)
face_recognition: FaceRecognitionConfig = Field(
default_factory=FaceRecognitionConfig, title="Face recognition config."
)
lpr: LicensePlateRecognitionConfig = Field(
default_factory=LicensePlateRecognitionConfig,
title="License Plate recognition config.",
)
ui: UIConfig = Field(default_factory=UIConfig, title="UI configuration.")
# Detector config
@ -574,13 +595,8 @@ class FrigateConfig(FrigateBaseModel):
verify_autotrack_zones(camera_config)
verify_motion_and_detect(camera_config)
# get list of unique enabled labels for tracking
enabled_labels = set(self.objects.track)
for camera in self.cameras.values():
enabled_labels.update(camera.objects.track)
self.model.create_colormap(sorted(enabled_labels))
self.objects.parse_all_objects(self.cameras)
self.model.create_colormap(sorted(self.objects.all_objects))
self.model.check_and_load_plus_model(self.plus_api)
for key, detector in self.detectors.items():
@ -621,6 +637,7 @@ class FrigateConfig(FrigateBaseModel):
detector_config.model.compute_model_hash()
self.detectors[key] = detector_config
verify_semantic_search_dependent_configs(self)
return self
@field_validator("cameras")

View File

@ -1,10 +1,14 @@
from typing import Optional
from typing import Dict, List, Optional
from pydantic import Field
from .base import FrigateBaseModel
__all__ = ["SemanticSearchConfig"]
__all__ = [
"FaceRecognitionConfig",
"SemanticSearchConfig",
"LicensePlateRecognitionConfig",
]
class SemanticSearchConfig(FrigateBaseModel):
@ -15,3 +19,28 @@ class SemanticSearchConfig(FrigateBaseModel):
model_size: str = Field(
default="small", title="The size of the embeddings model used."
)
class FaceRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable face recognition.")
threshold: float = Field(
default=0.9, title="Face similarity score required to be considered a match."
)
min_area: int = Field(
default=500, title="Min area of face box to consider running face recognition."
)
class LicensePlateRecognitionConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="Enable license plate recognition.")
threshold: float = Field(
default=0.9,
title="License plate confidence score required to be added to the object as a sub label.",
)
min_area: int = Field(
default=500,
title="Min area of license plate to consider running license plate recognition.",
)
known_plates: Optional[Dict[str, List[str]]] = Field(
default={}, title="Known plates to track."
)

View File

@ -5,8 +5,9 @@ DEFAULT_DB_PATH = f"{CONFIG_DIR}/frigate.db"
MODEL_CACHE_DIR = f"{CONFIG_DIR}/model_cache"
BASE_DIR = "/media/frigate"
CLIPS_DIR = f"{BASE_DIR}/clips"
RECORD_DIR = f"{BASE_DIR}/recordings"
EXPORT_DIR = f"{BASE_DIR}/exports"
FACE_DIR = f"{CLIPS_DIR}/faces"
RECORD_DIR = f"{BASE_DIR}/recordings"
BIRDSEYE_PIPE = "/tmp/cache/birdseye"
CACHE_DIR = "/tmp/cache"
FRIGATE_LOCALHOST = "http://127.0.0.1:5000"

View File

@ -29,6 +29,10 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase):
ids = ",".join(["?" for _ in event_ids])
self.execute_sql(f"DELETE FROM vec_descriptions WHERE id IN ({ids})", event_ids)
def delete_embeddings_face(self, face_ids: list[str]) -> None:
ids = ",".join(["?" for _ in face_ids])
self.execute_sql(f"DELETE FROM vec_faces WHERE id IN ({ids})", face_ids)
def drop_embeddings_tables(self) -> None:
self.execute_sql("""
DROP TABLE vec_descriptions;
@ -36,8 +40,11 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase):
self.execute_sql("""
DROP TABLE vec_thumbnails;
""")
self.execute_sql("""
DROP TABLE vec_faces;
""")
def create_embeddings_tables(self) -> None:
def create_embeddings_tables(self, face_recognition: bool) -> None:
"""Create vec0 virtual table for embeddings"""
self.execute_sql("""
CREATE VIRTUAL TABLE IF NOT EXISTS vec_thumbnails USING vec0(
@ -51,3 +58,11 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase):
description_embedding FLOAT[768] distance_metric=cosine
);
""")
if face_recognition:
self.execute_sql("""
CREATE VIRTUAL TABLE IF NOT EXISTS vec_faces USING vec0(
id TEXT PRIMARY KEY,
face_embedding FLOAT[512] distance_metric=cosine
);
""")

View File

@ -1,5 +1,6 @@
"""SQLite-vec embeddings database."""
import base64
import json
import logging
import multiprocessing as mp
@ -189,6 +190,28 @@ class EmbeddingsContext:
return results
def register_face(self, face_name: str, image_data: bytes) -> None:
self.requestor.send_data(
EmbeddingsRequestEnum.register_face.value,
{
"face_name": face_name,
"image": base64.b64encode(image_data).decode("ASCII"),
},
)
def get_face_ids(self, name: str) -> list[str]:
sql_query = f"""
SELECT
id
FROM vec_descriptions
WHERE id LIKE '%{name}%'
"""
return self.db.execute_sql(sql_query).fetchall()
def delete_face_ids(self, ids: list[str]) -> None:
self.db.delete_embeddings_face(ids)
def update_description(self, event_id: str, description: str) -> None:
self.requestor.send_data(
EmbeddingsRequestEnum.embed_description.value,

View File

@ -0,0 +1,808 @@
import logging
import math
from typing import List, Tuple
import cv2
import numpy as np
from pyclipper import ET_CLOSEDPOLYGON, JT_ROUND, PyclipperOffset
from shapely.geometry import Polygon
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config.semantic_search import LicensePlateRecognitionConfig
from frigate.embeddings.embeddings import Embeddings
logger = logging.getLogger(__name__)
MIN_PLATE_LENGTH = 3
class LicensePlateRecognition:
def __init__(
self,
config: LicensePlateRecognitionConfig,
requestor: InterProcessRequestor,
embeddings: Embeddings,
):
self.lpr_config = config
self.requestor = requestor
self.embeddings = embeddings
self.detection_model = self.embeddings.lpr_detection_model
self.classification_model = self.embeddings.lpr_classification_model
self.recognition_model = self.embeddings.lpr_recognition_model
self.ctc_decoder = CTCDecoder()
self.batch_size = 6
# Detection specific parameters
self.min_size = 3
self.max_size = 960
self.box_thresh = 0.8
self.mask_thresh = 0.8
if self.lpr_config.enabled:
# all models need to be loaded to run LPR
self.detection_model._load_model_and_utils()
self.classification_model._load_model_and_utils()
self.recognition_model._load_model_and_utils()
def detect(self, image: np.ndarray) -> List[np.ndarray]:
"""
Detect possible license plates in the input image by first resizing and normalizing it,
running a detection model, and filtering out low-probability regions.
Args:
image (np.ndarray): The input image in which license plates will be detected.
Returns:
List[np.ndarray]: A list of bounding box coordinates representing detected license plates.
"""
h, w = image.shape[:2]
if sum([h, w]) < 64:
image = self.zero_pad(image)
resized_image = self.resize_image(image)
normalized_image = self.normalize_image(resized_image)
outputs = self.detection_model([normalized_image])[0]
outputs = outputs[0, :, :]
boxes, _ = self.boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h)
return self.filter_polygon(boxes, (h, w))
def classify(
self, images: List[np.ndarray]
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
"""
Classify the orientation or category of each detected license plate.
Args:
images (List[np.ndarray]): A list of images of detected license plates.
Returns:
Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of rotated/normalized plate images
and classification results with confidence scores.
"""
num_images = len(images)
indices = np.argsort([x.shape[1] / x.shape[0] for x in images])
for i in range(0, num_images, self.batch_size):
norm_images = []
for j in range(i, min(num_images, i + self.batch_size)):
norm_img = self._preprocess_classification_image(images[indices[j]])
norm_img = norm_img[np.newaxis, :]
norm_images.append(norm_img)
outputs = self.classification_model(norm_images)
return self._process_classification_output(images, outputs)
def recognize(
self, images: List[np.ndarray]
) -> Tuple[List[str], List[List[float]]]:
"""
Recognize the characters on the detected license plates using the recognition model.
Args:
images (List[np.ndarray]): A list of images of license plates to recognize.
Returns:
Tuple[List[str], List[List[float]]]: A tuple of recognized license plate texts and confidence scores.
"""
input_shape = [3, 48, 320]
num_images = len(images)
# sort images by aspect ratio for processing
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
for index in range(0, num_images, self.batch_size):
input_h, input_w = input_shape[1], input_shape[2]
max_wh_ratio = input_w / input_h
norm_images = []
# calculate the maximum aspect ratio in the current batch
for i in range(index, min(num_images, index + self.batch_size)):
h, w = images[indices[i]].shape[0:2]
max_wh_ratio = max(max_wh_ratio, w * 1.0 / h)
# preprocess the images based on the max aspect ratio
for i in range(index, min(num_images, index + self.batch_size)):
norm_image = self._preprocess_recognition_image(
images[indices[i]], max_wh_ratio
)
norm_image = norm_image[np.newaxis, :]
norm_images.append(norm_image)
outputs = self.recognition_model(norm_images)
return self.ctc_decoder(outputs)
def process_license_plate(
self, image: np.ndarray
) -> Tuple[List[str], List[float], List[int]]:
"""
Complete pipeline for detecting, classifying, and recognizing license plates in the input image.
Args:
image (np.ndarray): The input image in which to detect, classify, and recognize license plates.
Returns:
Tuple[List[str], List[float], List[int]]: Detected license plate texts, confidence scores, and areas of the plates.
"""
if (
self.detection_model.runner is None
or self.classification_model.runner is None
or self.recognition_model.runner is None
):
# we might still be downloading the models
logger.debug("Model runners not loaded")
return [], [], []
plate_points = self.detect(image)
if len(plate_points) == 0:
return [], [], []
plate_points = self.sort_polygon(list(plate_points))
plate_images = [self._crop_license_plate(image, x) for x in plate_points]
rotated_images, _ = self.classify(plate_images)
# keep track of the index of each image for correct area calc later
sorted_indices = np.argsort([x.shape[1] / x.shape[0] for x in rotated_images])
reverse_mapping = {
idx: original_idx for original_idx, idx in enumerate(sorted_indices)
}
results, confidences = self.recognize(rotated_images)
if results:
license_plates = [""] * len(rotated_images)
average_confidences = [[0.0]] * len(rotated_images)
areas = [0] * len(rotated_images)
# map results back to original image order
for i, (plate, conf) in enumerate(zip(results, confidences)):
original_idx = reverse_mapping[i]
height, width = rotated_images[original_idx].shape[:2]
area = height * width
average_confidence = conf
# set to True to write each cropped image for debugging
if False:
save_image = cv2.cvtColor(
rotated_images[original_idx], cv2.COLOR_RGB2BGR
)
filename = f"/config/plate_{original_idx}_{plate}_{area}.jpg"
cv2.imwrite(filename, save_image)
license_plates[original_idx] = plate
average_confidences[original_idx] = average_confidence
areas[original_idx] = area
# Filter out plates that have a length of less than 3 characters
# Sort by area, then by plate length, then by confidence all desc
sorted_data = sorted(
[
(plate, conf, area)
for plate, conf, area in zip(
license_plates, average_confidences, areas
)
if len(plate) >= MIN_PLATE_LENGTH
],
key=lambda x: (x[2], len(x[0]), x[1]),
reverse=True,
)
if sorted_data:
return map(list, zip(*sorted_data))
return [], [], []
def resize_image(self, image: np.ndarray) -> np.ndarray:
"""
Resize the input image while maintaining the aspect ratio, ensuring dimensions are multiples of 32.
Args:
image (np.ndarray): The input image to resize.
Returns:
np.ndarray: The resized image.
"""
h, w = image.shape[:2]
ratio = min(self.max_size / max(h, w), 1.0)
resize_h = max(int(round(int(h * ratio) / 32) * 32), 32)
resize_w = max(int(round(int(w * ratio) / 32) * 32), 32)
return cv2.resize(image, (resize_w, resize_h))
def normalize_image(self, image: np.ndarray) -> np.ndarray:
"""
Normalize the input image by subtracting the mean and multiplying by the standard deviation.
Args:
image (np.ndarray): The input image to normalize.
Returns:
np.ndarray: The normalized image, transposed to match the model's expected input format.
"""
mean = np.array([123.675, 116.28, 103.53]).reshape(1, -1).astype("float64")
std = 1 / np.array([58.395, 57.12, 57.375]).reshape(1, -1).astype("float64")
image = image.astype("float32")
cv2.subtract(image, mean, image)
cv2.multiply(image, std, image)
return image.transpose((2, 0, 1))[np.newaxis, ...]
def boxes_from_bitmap(
self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int
) -> Tuple[np.ndarray, List[float]]:
"""
Process the binary mask to extract bounding boxes and associated confidence scores.
Args:
output (np.ndarray): Output confidence map from the model.
mask (np.ndarray): Binary mask of detected regions.
dest_width (int): Target width for scaling the box coordinates.
dest_height (int): Target height for scaling the box coordinates.
Returns:
Tuple[np.ndarray, List[float]]: Array of bounding boxes and list of corresponding scores.
"""
mask = (mask * 255).astype(np.uint8)
height, width = mask.shape
outs = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE)
# handle different return values of findContours between OpenCV versions
contours = outs[0] if len(outs) == 2 else outs[1]
boxes = []
scores = []
for index in range(len(contours)):
contour = contours[index]
# get minimum bounding box (rotated rectangle) around the contour and the smallest side length.
points, min_side = self.get_min_boxes(contour)
if min_side < self.min_size:
continue
points = np.array(points)
score = self.box_score(output, contour)
if self.box_thresh > score:
continue
polygon = Polygon(points)
distance = polygon.area / polygon.length
# Use pyclipper to shrink the polygon slightly based on the computed distance.
offset = PyclipperOffset()
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
points = np.array(offset.Execute(distance * 1.5)).reshape((-1, 1, 2))
# get the minimum bounding box around the shrunken polygon.
box, min_side = self.get_min_boxes(points)
if min_side < self.min_size + 2:
continue
box = np.array(box)
# normalize and clip box coordinates to fit within the destination image size.
box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width)
box[:, 1] = np.clip(
np.round(box[:, 1] / height * dest_height), 0, dest_height
)
boxes.append(box.astype("int32"))
scores.append(score)
return np.array(boxes, dtype="int32"), scores
@staticmethod
def get_min_boxes(contour: np.ndarray) -> Tuple[List[Tuple[float, float]], float]:
"""
Calculate the minimum bounding box (rotated rectangle) for a given contour.
Args:
contour (np.ndarray): The contour points of the detected shape.
Returns:
Tuple[List[Tuple[float, float]], float]: A list of four points representing the
corners of the bounding box, and the length of the shortest side.
"""
bounding_box = cv2.minAreaRect(contour)
points = sorted(cv2.boxPoints(bounding_box), key=lambda x: x[0])
index_1, index_4 = (0, 1) if points[1][1] > points[0][1] else (1, 0)
index_2, index_3 = (2, 3) if points[3][1] > points[2][1] else (3, 2)
box = [points[index_1], points[index_2], points[index_3], points[index_4]]
return box, min(bounding_box[1])
@staticmethod
def box_score(bitmap: np.ndarray, contour: np.ndarray) -> float:
"""
Calculate the average score within the bounding box of a contour.
Args:
bitmap (np.ndarray): The output confidence map from the model.
contour (np.ndarray): The contour of the detected shape.
Returns:
float: The average score of the pixels inside the contour region.
"""
h, w = bitmap.shape[:2]
contour = contour.reshape(-1, 2)
x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1])
x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1])
mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8)
cv2.fillPoly(mask, [contour - [x1, y1]], 1)
return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0]
@staticmethod
def expand_box(points: List[Tuple[float, float]]) -> np.ndarray:
"""
Expand a polygonal shape slightly by a factor determined by the area-to-perimeter ratio.
Args:
points (List[Tuple[float, float]]): Points of the polygon to expand.
Returns:
np.ndarray: Expanded polygon points.
"""
polygon = Polygon(points)
distance = polygon.area / polygon.length
offset = PyclipperOffset()
offset.AddPath(points, JT_ROUND, ET_CLOSEDPOLYGON)
expanded = np.array(offset.Execute(distance * 1.5)).reshape((-1, 2))
return expanded
def filter_polygon(
self, points: List[np.ndarray], shape: Tuple[int, int]
) -> np.ndarray:
"""
Filter a set of polygons to include only valid ones that fit within an image shape
and meet size constraints.
Args:
points (List[np.ndarray]): List of polygons to filter.
shape (Tuple[int, int]): Shape of the image (height, width).
Returns:
np.ndarray: List of filtered polygons.
"""
height, width = shape
return np.array(
[
self.clockwise_order(point)
for point in points
if self.is_valid_polygon(point, width, height)
]
)
@staticmethod
def is_valid_polygon(point: np.ndarray, width: int, height: int) -> bool:
"""
Check if a polygon is valid, meaning it fits within the image bounds
and has sides of a minimum length.
Args:
point (np.ndarray): The polygon to validate.
width (int): Image width.
height (int): Image height.
Returns:
bool: Whether the polygon is valid or not.
"""
return (
point[:, 0].min() >= 0
and point[:, 0].max() < width
and point[:, 1].min() >= 0
and point[:, 1].max() < height
and np.linalg.norm(point[0] - point[1]) > 3
and np.linalg.norm(point[0] - point[3]) > 3
)
@staticmethod
def clockwise_order(point: np.ndarray) -> np.ndarray:
"""
Arrange the points of a polygon in clockwise order based on their angular positions
around the polygon's center.
Args:
point (np.ndarray): Array of points of the polygon.
Returns:
np.ndarray: Points ordered in clockwise direction.
"""
center = point.mean(axis=0)
return point[
np.argsort(np.arctan2(point[:, 1] - center[1], point[:, 0] - center[0]))
]
@staticmethod
def sort_polygon(points):
"""
Sort polygons based on their position in the image. If polygons are close in vertical
position (within 10 pixels), sort them by horizontal position.
Args:
points: List of polygons to sort.
Returns:
List: Sorted list of polygons.
"""
points.sort(key=lambda x: (x[0][1], x[0][0]))
for i in range(len(points) - 1):
for j in range(i, -1, -1):
if abs(points[j + 1][0][1] - points[j][0][1]) < 10 and (
points[j + 1][0][0] < points[j][0][0]
):
temp = points[j]
points[j] = points[j + 1]
points[j + 1] = temp
else:
break
return points
@staticmethod
def zero_pad(image: np.ndarray) -> np.ndarray:
"""
Apply zero-padding to an image, ensuring its dimensions are at least 32x32.
The padding is added only if needed.
Args:
image (np.ndarray): Input image.
Returns:
np.ndarray: Zero-padded image.
"""
h, w, c = image.shape
pad = np.zeros((max(32, h), max(32, w), c), np.uint8)
pad[:h, :w, :] = image
return pad
@staticmethod
def _preprocess_classification_image(image: np.ndarray) -> np.ndarray:
"""
Preprocess a single image for classification by resizing, normalizing, and padding.
This method resizes the input image to a fixed height of 48 pixels while adjusting
the width dynamically up to a maximum of 192 pixels. The image is then normalized and
padded to fit the required input dimensions for classification.
Args:
image (np.ndarray): Input image to preprocess.
Returns:
np.ndarray: Preprocessed and padded image.
"""
# fixed height of 48, dynamic width up to 192
input_shape = (3, 48, 192)
input_c, input_h, input_w = input_shape
h, w = image.shape[:2]
ratio = w / h
resized_w = min(input_w, math.ceil(input_h * ratio))
resized_image = cv2.resize(image, (resized_w, input_h))
# handle single-channel images (grayscale) if needed
if input_c == 1 and resized_image.ndim == 2:
resized_image = resized_image[np.newaxis, :, :]
else:
resized_image = resized_image.transpose((2, 0, 1))
# normalize
resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5
padded_image = np.zeros((input_c, input_h, input_w), dtype=np.float32)
padded_image[:, :, :resized_w] = resized_image
return padded_image
def _process_classification_output(
self, images: List[np.ndarray], outputs: List[np.ndarray]
) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]:
"""
Process the classification model output by matching labels with confidence scores.
This method processes the outputs from the classification model and rotates images
with high confidence of being labeled "180". It ensures that results are mapped to
the original image order.
Args:
images (List[np.ndarray]): List of input images.
outputs (List[np.ndarray]): Corresponding model outputs.
Returns:
Tuple[List[np.ndarray], List[Tuple[str, float]]]: A tuple of processed images and
classification results (label and confidence score).
"""
labels = ["0", "180"]
results = [["", 0.0]] * len(images)
indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images]))
outputs = np.stack(outputs)
outputs = [
(labels[idx], outputs[i, idx])
for i, idx in enumerate(outputs.argmax(axis=1))
]
for i in range(0, len(images), self.batch_size):
for j in range(len(outputs)):
label, score = outputs[j]
results[indices[i + j]] = [label, score]
if "180" in label and score >= self.lpr_config.threshold:
images[indices[i + j]] = cv2.rotate(images[indices[i + j]], 1)
return images, results
def _preprocess_recognition_image(
self, image: np.ndarray, max_wh_ratio: float
) -> np.ndarray:
"""
Preprocess an image for recognition by dynamically adjusting its width.
This method adjusts the width of the image based on the maximum width-to-height ratio
while keeping the height fixed at 48 pixels. The image is then normalized and padded
to fit the required input dimensions for recognition.
Args:
image (np.ndarray): Input image to preprocess.
max_wh_ratio (float): Maximum width-to-height ratio for resizing.
Returns:
np.ndarray: Preprocessed and padded image.
"""
# fixed height of 48, dynamic width based on ratio
input_shape = [3, 48, 320]
input_h, input_w = input_shape[1], input_shape[2]
assert image.shape[2] == input_shape[0], "Unexpected number of image channels."
# dynamically adjust input width based on max_wh_ratio
input_w = int(input_h * max_wh_ratio)
# check for model-specific input width
model_input_w = self.recognition_model.runner.ort.get_inputs()[0].shape[3]
if isinstance(model_input_w, int) and model_input_w > 0:
input_w = model_input_w
h, w = image.shape[:2]
aspect_ratio = w / h
resized_w = min(input_w, math.ceil(input_h * aspect_ratio))
resized_image = cv2.resize(image, (resized_w, input_h))
resized_image = resized_image.transpose((2, 0, 1))
resized_image = (resized_image.astype("float32") / 255.0 - 0.5) / 0.5
padded_image = np.zeros((input_shape[0], input_h, input_w), dtype=np.float32)
padded_image[:, :, :resized_w] = resized_image
return padded_image
@staticmethod
def _crop_license_plate(image: np.ndarray, points: np.ndarray) -> np.ndarray:
"""
Crop the license plate from the image using four corner points.
This method crops the region containing the license plate by using the perspective
transformation based on four corner points. If the resulting image is significantly
taller than wide, the image is rotated to the correct orientation.
Args:
image (np.ndarray): Input image containing the license plate.
points (np.ndarray): Four corner points defining the plate's position.
Returns:
np.ndarray: Cropped and potentially rotated license plate image.
"""
assert len(points) == 4, "shape of points must be 4*2"
points = points.astype(np.float32)
crop_width = int(
max(
np.linalg.norm(points[0] - points[1]),
np.linalg.norm(points[2] - points[3]),
)
)
crop_height = int(
max(
np.linalg.norm(points[0] - points[3]),
np.linalg.norm(points[1] - points[2]),
)
)
pts_std = np.float32(
[[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]]
)
matrix = cv2.getPerspectiveTransform(points, pts_std)
image = cv2.warpPerspective(
image,
matrix,
(crop_width, crop_height),
borderMode=cv2.BORDER_REPLICATE,
flags=cv2.INTER_CUBIC,
)
height, width = image.shape[0:2]
if height * 1.0 / width >= 1.5:
image = np.rot90(image, k=3)
return image
class CTCDecoder:
"""
A decoder for interpreting the output of a CTC (Connectionist Temporal Classification) model.
This decoder converts the model's output probabilities into readable sequences of characters
while removing duplicates and handling blank tokens. It also calculates the confidence scores
for each decoded character sequence.
"""
def __init__(self):
"""
Initialize the CTCDecoder with a list of characters and a character map.
The character set includes digits, letters, special characters, and a "blank" token
(used by the CTC model for decoding purposes). A character map is created to map
indices to characters.
"""
self.characters = [
"blank",
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
":",
";",
"<",
"=",
">",
"?",
"@",
"A",
"B",
"C",
"D",
"E",
"F",
"G",
"H",
"I",
"J",
"K",
"L",
"M",
"N",
"O",
"P",
"Q",
"R",
"S",
"T",
"U",
"V",
"W",
"X",
"Y",
"Z",
"[",
"\\",
"]",
"^",
"_",
"`",
"a",
"b",
"c",
"d",
"e",
"f",
"g",
"h",
"i",
"j",
"k",
"l",
"m",
"n",
"o",
"p",
"q",
"r",
"s",
"t",
"u",
"v",
"w",
"x",
"y",
"z",
"{",
"|",
"}",
"~",
"!",
'"',
"#",
"$",
"%",
"&",
"'",
"(",
")",
"*",
"+",
",",
"-",
".",
"/",
" ",
" ",
]
self.char_map = {i: char for i, char in enumerate(self.characters)}
def __call__(
self, outputs: List[np.ndarray]
) -> Tuple[List[str], List[List[float]]]:
"""
Decode a batch of model outputs into character sequences and their confidence scores.
The method takes the output probability distributions for each time step and uses
the best path decoding strategy. It then merges repeating characters and ignores
blank tokens. Confidence scores for each decoded character are also calculated.
Args:
outputs (List[np.ndarray]): A list of model outputs, where each element is
a probability distribution for each time step.
Returns:
Tuple[List[str], List[List[float]]]: A tuple of decoded character sequences
and confidence scores for each sequence.
"""
results = []
confidences = []
for output in outputs:
seq_log_probs = np.log(output + 1e-8)
best_path = np.argmax(seq_log_probs, axis=1)
merged_path = []
merged_probs = []
for t, char_index in enumerate(best_path):
if char_index != 0 and (t == 0 or char_index != best_path[t - 1]):
merged_path.append(char_index)
merged_probs.append(seq_log_probs[t, char_index])
result = "".join(self.char_map[idx] for idx in merged_path)
results.append(result)
confidence = np.exp(merged_probs).tolist()
confidences.append(confidence)
return results, confidences

View File

@ -3,15 +3,18 @@
import base64
import logging
import os
import random
import string
import time
from numpy import ndarray
from playhouse.shortcuts import model_to_dict
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config.semantic_search import SemanticSearchConfig
from frigate.config import FrigateConfig
from frigate.const import (
CONFIG_DIR,
FACE_DIR,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_MODEL_STATE,
)
@ -59,23 +62,25 @@ def get_metadata(event: Event) -> dict:
class Embeddings:
"""SQLite-vec embeddings database."""
def __init__(
self, config: SemanticSearchConfig, db: SqliteVecQueueDatabase
) -> None:
def __init__(self, config: FrigateConfig, db: SqliteVecQueueDatabase) -> None:
self.config = config
self.db = db
self.requestor = InterProcessRequestor()
# Create tables if they don't exist
self.db.create_embeddings_tables()
self.db.create_embeddings_tables(self.config.face_recognition.enabled)
models = [
"jinaai/jina-clip-v1-text_model_fp16.onnx",
"jinaai/jina-clip-v1-tokenizer",
"jinaai/jina-clip-v1-vision_model_fp16.onnx"
if config.model_size == "large"
if config.semantic_search.model_size == "large"
else "jinaai/jina-clip-v1-vision_model_quantized.onnx",
"jinaai/jina-clip-v1-preprocessor_config.json",
"facenet-facenet.onnx",
"paddleocr-onnx-detection.onnx",
"paddleocr-onnx-classification.onnx",
"paddleocr-onnx-recognition.onnx",
]
for model in models:
@ -94,7 +99,7 @@ class Embeddings:
download_urls={
"text_model_fp16.onnx": "https://huggingface.co/jinaai/jina-clip-v1/resolve/main/onnx/text_model_fp16.onnx",
},
model_size=config.model_size,
model_size=config.semantic_search.model_size,
model_type=ModelTypeEnum.text,
requestor=self.requestor,
device="CPU",
@ -102,7 +107,7 @@ class Embeddings:
model_file = (
"vision_model_fp16.onnx"
if self.config.model_size == "large"
if self.config.semantic_search.model_size == "large"
else "vision_model_quantized.onnx"
)
@ -115,12 +120,69 @@ class Embeddings:
model_name="jinaai/jina-clip-v1",
model_file=model_file,
download_urls=download_urls,
model_size=config.model_size,
model_size=config.semantic_search.model_size,
model_type=ModelTypeEnum.vision,
requestor=self.requestor,
device="GPU" if config.model_size == "large" else "CPU",
device="GPU" if config.semantic_search.model_size == "large" else "CPU",
)
self.face_embedding = None
if self.config.face_recognition.enabled:
self.face_embedding = GenericONNXEmbedding(
model_name="facenet",
model_file="facenet.onnx",
download_urls={
"facenet.onnx": "https://github.com/NickM-27/facenet-onnx/releases/download/v1.0/facenet.onnx",
"facedet.onnx": "https://github.com/opencv/opencv_zoo/raw/refs/heads/main/models/face_detection_yunet/face_detection_yunet_2023mar_int8.onnx",
},
model_size="large",
model_type=ModelTypeEnum.face,
requestor=self.requestor,
device="GPU",
)
self.lpr_detection_model = None
self.lpr_classification_model = None
self.lpr_recognition_model = None
if self.config.lpr.enabled:
self.lpr_detection_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="detection.onnx",
download_urls={
"detection.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/detection.onnx"
},
model_size="large",
model_type=ModelTypeEnum.alpr_detect,
requestor=self.requestor,
device="CPU",
)
self.lpr_classification_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="classification.onnx",
download_urls={
"classification.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/classification.onnx"
},
model_size="large",
model_type=ModelTypeEnum.alpr_classify,
requestor=self.requestor,
device="CPU",
)
self.lpr_recognition_model = GenericONNXEmbedding(
model_name="paddleocr-onnx",
model_file="recognition.onnx",
download_urls={
"recognition.onnx": "https://github.com/hawkeye217/paddleocr-onnx/raw/refs/heads/master/models/recognition.onnx"
},
model_size="large",
model_type=ModelTypeEnum.alpr_recognize,
requestor=self.requestor,
device="CPU",
)
def embed_thumbnail(
self, event_id: str, thumbnail: bytes, upsert: bool = True
) -> ndarray:
@ -215,12 +277,40 @@ class Embeddings:
return embeddings
def embed_face(self, label: str, thumbnail: bytes, upsert: bool = False) -> ndarray:
embedding = self.face_embedding(thumbnail)[0]
if upsert:
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
id = f"{label}-{rand_id}"
# write face to library
folder = os.path.join(FACE_DIR, label)
file = os.path.join(folder, f"{id}.webp")
os.makedirs(folder, exist_ok=True)
# save face image
with open(file, "wb") as output:
output.write(thumbnail)
self.db.execute_sql(
"""
INSERT OR REPLACE INTO vec_faces(id, face_embedding)
VALUES(?, ?)
""",
(id, serialize(embedding)),
)
return embedding
def reindex(self) -> None:
logger.info("Indexing tracked object embeddings...")
self.db.drop_embeddings_tables()
logger.debug("Dropped embeddings tables.")
self.db.create_embeddings_tables()
self.db.create_embeddings_tables(self.config.face_recognition.enabled)
logger.debug("Created embeddings tables.")
# Delete the saved stats file

View File

@ -31,11 +31,16 @@ warnings.filterwarnings(
disable_progress_bar()
logger = logging.getLogger(__name__)
FACE_EMBEDDING_SIZE = 160
class ModelTypeEnum(str, Enum):
face = "face"
vision = "vision"
text = "text"
alpr_detect = "alpr_detect"
alpr_classify = "alpr_classify"
alpr_recognize = "alpr_recognize"
class GenericONNXEmbedding:
@ -47,7 +52,7 @@ class GenericONNXEmbedding:
model_file: str,
download_urls: Dict[str, str],
model_size: str,
model_type: str,
model_type: ModelTypeEnum,
requestor: InterProcessRequestor,
tokenizer_file: Optional[str] = None,
device: str = "AUTO",
@ -57,7 +62,7 @@ class GenericONNXEmbedding:
self.tokenizer_file = tokenizer_file
self.requestor = requestor
self.download_urls = download_urls
self.model_type = model_type # 'text' or 'vision'
self.model_type = model_type
self.model_size = model_size
self.device = device
self.download_path = os.path.join(MODEL_CACHE_DIR, self.model_name)
@ -87,12 +92,13 @@ class GenericONNXEmbedding:
files_names,
ModelStatusTypesEnum.downloaded,
)
self._load_model_and_tokenizer()
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
def _download_model(self, path: str):
try:
file_name = os.path.basename(path)
if file_name in self.download_urls:
ModelDownloader.download_from_url(self.download_urls[file_name], path)
elif (
@ -101,6 +107,7 @@ class GenericONNXEmbedding:
):
if not os.path.exists(path + "/" + self.model_name):
logger.info(f"Downloading {self.model_name} tokenizer")
tokenizer = AutoTokenizer.from_pretrained(
self.model_name,
trust_remote_code=True,
@ -125,14 +132,23 @@ class GenericONNXEmbedding:
},
)
def _load_model_and_tokenizer(self):
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:
self.downloader.wait_for_download()
if self.model_type == ModelTypeEnum.text:
self.tokenizer = self._load_tokenizer()
else:
elif self.model_type == ModelTypeEnum.vision:
self.feature_extractor = self._load_feature_extractor()
elif self.model_type == ModelTypeEnum.face:
self.feature_extractor = []
elif self.model_type == ModelTypeEnum.alpr_detect:
self.feature_extractor = []
elif self.model_type == ModelTypeEnum.alpr_classify:
self.feature_extractor = []
elif self.model_type == ModelTypeEnum.alpr_recognize:
self.feature_extractor = []
self.runner = ONNXModelRunner(
os.path.join(self.download_path, self.model_file),
self.device,
@ -172,23 +188,73 @@ class GenericONNXEmbedding:
self.feature_extractor(images=image, return_tensors="np")
for image in processed_images
]
elif self.model_type == ModelTypeEnum.face:
if isinstance(raw_inputs, list):
raise ValueError("Face embedding does not support batch inputs.")
pil = self._process_image(raw_inputs)
# handle images larger than input size
width, height = pil.size
if width != FACE_EMBEDDING_SIZE or height != FACE_EMBEDDING_SIZE:
if width > height:
new_height = int(((height / width) * FACE_EMBEDDING_SIZE) // 4 * 4)
pil = pil.resize((FACE_EMBEDDING_SIZE, new_height))
else:
new_width = int(((width / height) * FACE_EMBEDDING_SIZE) // 4 * 4)
pil = pil.resize((new_width, FACE_EMBEDDING_SIZE))
og = np.array(pil).astype(np.float32)
# Image must be FACE_EMBEDDING_SIZExFACE_EMBEDDING_SIZE
og_h, og_w, channels = og.shape
frame = np.full(
(FACE_EMBEDDING_SIZE, FACE_EMBEDDING_SIZE, channels),
(0, 0, 0),
dtype=np.float32,
)
# compute center offset
x_center = (FACE_EMBEDDING_SIZE - og_w) // 2
y_center = (FACE_EMBEDDING_SIZE - og_h) // 2
# copy img image into center of result image
frame[y_center : y_center + og_h, x_center : x_center + og_w] = og
frame = np.expand_dims(frame, axis=0)
return [{"input_2": frame}]
elif self.model_type == ModelTypeEnum.alpr_detect:
preprocessed = []
for x in raw_inputs:
preprocessed.append(x)
return [{"x": preprocessed[0]}]
elif self.model_type == ModelTypeEnum.alpr_classify:
processed = []
for img in raw_inputs:
processed.append({"x": img})
return processed
elif self.model_type == ModelTypeEnum.alpr_recognize:
processed = []
for img in raw_inputs:
processed.append({"x": img})
return processed
else:
raise ValueError(f"Unable to preprocess inputs for {self.model_type}")
def _process_image(self, image):
def _process_image(self, image, output: str = "RGB") -> Image.Image:
if isinstance(image, str):
if image.startswith("http"):
response = requests.get(image)
image = Image.open(BytesIO(response.content)).convert("RGB")
image = Image.open(BytesIO(response.content)).convert(output)
elif isinstance(image, bytes):
image = Image.open(BytesIO(image)).convert("RGB")
image = Image.open(BytesIO(image)).convert(output)
return image
def __call__(
self, inputs: Union[List[str], List[Image.Image], List[str]]
) -> List[np.ndarray]:
self._load_model_and_tokenizer()
self._load_model_and_utils()
if self.runner is None or (
self.tokenizer is None and self.feature_extractor is None
):

View File

@ -9,6 +9,7 @@ from typing import Optional
import cv2
import numpy as np
import requests
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
@ -20,17 +21,19 @@ from frigate.comms.event_metadata_updater import (
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR, UPDATE_EVENT_DESCRIPTION
from frigate.const import CLIPS_DIR, FRIGATE_LOCALHOST, UPDATE_EVENT_DESCRIPTION
from frigate.embeddings.alpr.alpr import LicensePlateRecognition
from frigate.events.types import EventTypeEnum
from frigate.genai import get_genai_client
from frigate.models import Event
from frigate.util.builtin import serialize
from frigate.util.image import SharedMemoryFrameManager, calculate_region
from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
from .embeddings import Embeddings
logger = logging.getLogger(__name__)
REQUIRED_FACES = 2
MAX_THUMBNAILS = 10
@ -45,7 +48,7 @@ class EmbeddingMaintainer(threading.Thread):
) -> None:
super().__init__(name="embeddings_maintainer")
self.config = config
self.embeddings = Embeddings(config.semantic_search, db)
self.embeddings = Embeddings(config, db)
# Check if we need to re-index events
if config.semantic_search.reindex:
@ -58,12 +61,43 @@ class EmbeddingMaintainer(threading.Thread):
)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
# set face recognition conditions
self.face_recognition_enabled = self.config.face_recognition.enabled
self.requires_face_detection = "face" not in self.config.objects.all_objects
self.detected_faces: dict[str, float] = {}
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.stop_event = stop_event
self.tracked_events = {}
self.tracked_events: dict[str, list[any]] = {}
self.genai_client = get_genai_client(config.genai)
# set license plate recognition conditions
self.lpr_config = self.config.lpr
self.requires_license_plate_detection = (
"license_plate" not in self.config.objects.all_objects
)
self.detected_license_plates: dict[str, dict[str, any]] = {}
if self.lpr_config.enabled:
self.license_plate_recognition = LicensePlateRecognition(
self.lpr_config, self.requestor, self.embeddings
)
@property
def face_detector(self) -> cv2.FaceDetectorYN:
# Lazily create the classifier.
if "face_detector" not in self.__dict__:
self.__dict__["face_detector"] = cv2.FaceDetectorYN.create(
"/config/model_cache/facenet/facedet.onnx",
config="",
input_size=(320, 320),
score_threshold=0.8,
nms_threshold=0.3,
)
return self.__dict__["face_detector"]
def run(self) -> None:
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
@ -82,7 +116,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_requests(self) -> None:
"""Process embeddings requests"""
def _handle_request(topic: str, data: str) -> str:
def _handle_request(topic: str, data: dict[str, any]) -> str:
try:
if topic == EmbeddingsRequestEnum.embed_description.value:
return serialize(
@ -101,6 +135,35 @@ class EmbeddingMaintainer(threading.Thread):
return serialize(
self.embeddings.text_embedding([data])[0], pack=False
)
elif topic == EmbeddingsRequestEnum.register_face.value:
if data.get("cropped"):
self.embeddings.embed_face(
data["face_name"],
base64.b64decode(data["image"]),
upsert=True,
)
return True
else:
img = cv2.imdecode(
np.frombuffer(
base64.b64decode(data["image"]), dtype=np.uint8
),
cv2.IMREAD_COLOR,
)
face_box = self._detect_face(img)
if not face_box:
return False
face = img[face_box[1] : face_box[3], face_box[0] : face_box[2]]
ret, webp = cv2.imencode(
".webp", face, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
self.embeddings.embed_face(
data["face_name"], webp.tobytes(), upsert=True
)
return False
except Exception as e:
logger.error(f"Unable to handle embeddings request {e}")
@ -108,7 +171,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_updates(self) -> None:
"""Process event updates"""
update = self.event_subscriber.check_for_update(timeout=0.1)
update = self.event_subscriber.check_for_update(timeout=0.01)
if update is None:
return
@ -119,41 +182,55 @@ class EmbeddingMaintainer(threading.Thread):
return
camera_config = self.config.cameras[camera]
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
# no need to process updated objects if face recognition, lpr, genai are disabled
if (
not camera_config.genai.enabled
or self.genai_client is None
or data["stationary"]
and not self.face_recognition_enabled
and not self.lpr_config.enabled
):
return
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
# Create our own thumbnail based on the bounding box and the frame time
try:
frame_id = f"{camera}{data['frame_time']}"
yuv_frame = self.frame_manager.get(frame_id, camera_config.frame_shape_yuv)
if yuv_frame is not None:
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
# Always keep the first thumbnail for the event
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_id)
except FileNotFoundError:
pass
if yuv_frame is None:
logger.debug(
"Unable to process object update because frame is unavailable."
)
return
if self.face_recognition_enabled:
self._process_face(data, yuv_frame)
if self.lpr_config.enabled:
self._process_license_plate(data, yuv_frame)
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self.genai_client is not None and not data["stationary"]:
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
# Always keep the first thumbnail for the event
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_id)
def _process_finalized(self) -> None:
"""Process the end of an event."""
while True:
ended = self.event_end_subscriber.check_for_update(timeout=0.1)
ended = self.event_end_subscriber.check_for_update(timeout=0.01)
if ended == None:
break
@ -161,6 +238,12 @@ class EmbeddingMaintainer(threading.Thread):
event_id, camera, updated_db = ended
camera_config = self.config.cameras[camera]
if event_id in self.detected_faces:
self.detected_faces.pop(event_id)
if event_id in self.detected_license_plates:
self.detected_license_plates.pop(event_id)
if updated_db:
try:
event: Event = Event.get(Event.id == event_id)
@ -243,7 +326,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_event_metadata(self):
# Check for regenerate description requests
(topic, event_id, source) = self.event_metadata_subscriber.check_for_update(
timeout=0.1
timeout=0.01
)
if topic is None:
@ -252,6 +335,347 @@ class EmbeddingMaintainer(threading.Thread):
if event_id:
self.handle_regenerate_description(event_id, source)
def _search_face(self, query_embedding: bytes) -> list[tuple[str, float]]:
"""Search for the face most closely matching the embedding."""
sql_query = f"""
SELECT
id,
distance
FROM vec_faces
WHERE face_embedding MATCH ?
AND k = {REQUIRED_FACES} ORDER BY distance
"""
return self.embeddings.db.execute_sql(sql_query, [query_embedding]).fetchall()
def _detect_face(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Detect faces in input image."""
self.face_detector.setInputSize((input.shape[1], input.shape[0]))
faces = self.face_detector.detect(input)
if faces[1] is None:
return None
face = None
for _, potential_face in enumerate(faces[1]):
raw_bbox = potential_face[0:4].astype(np.uint16)
x: int = max(raw_bbox[0], 0)
y: int = max(raw_bbox[1], 0)
w: int = raw_bbox[2]
h: int = raw_bbox[3]
bbox = (x, y, x + w, y + h)
if face is None or area(bbox) > area(face):
face = bbox
return face
def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> None:
"""Look for faces in image."""
id = obj_data["id"]
# don't run for non person objects
if obj_data.get("label") != "person":
logger.debug("Not a processing face for non person object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a face
if obj_data.get("sub_label") and id not in self.detected_faces:
logger.debug(
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
)
return
face: Optional[dict[str, any]] = None
if self.requires_face_detection:
logger.debug("Running manual face detection.")
person_box = obj_data.get("box")
if not person_box:
return None
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box
person = rgb[top:bottom, left:right]
face = self._detect_face(person)
if not face:
logger.debug("Detected no faces for person object.")
return
face_frame = person[face[1] : face[3], face[0] : face[2]]
face_frame = cv2.cvtColor(face_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "face":
continue
if face is None or attr.get("score", 0.0) > face.get("score", 0.0):
face = attr
# no faces detected in this frame
if not face:
return
face_box = face.get("box")
# check that face is valid
if not face_box or area(face_box) < self.config.face_recognition.min_area:
logger.debug(f"Invalid face box {face}")
return
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
face_frame = face_frame[
face_box[1] : face_box[3], face_box[0] : face_box[2]
]
ret, webp = cv2.imencode(
".webp", face_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 100]
)
if not ret:
logger.debug("Not processing face due to error creating cropped image.")
return
embedding = self.embeddings.embed_face("unknown", webp.tobytes(), upsert=False)
query_embedding = serialize(embedding)
best_faces = self._search_face(query_embedding)
logger.debug(f"Detected best faces for person as: {best_faces}")
if not best_faces or len(best_faces) < REQUIRED_FACES:
logger.debug(f"{len(best_faces)} < {REQUIRED_FACES} min required faces.")
return
sub_label = str(best_faces[0][0]).split("-")[0]
avg_score = 0
for face in best_faces:
score = 1.0 - face[1]
if face[0].split("-")[0] != sub_label:
logger.debug("Detected multiple faces, result is not valid.")
return
avg_score += score
avg_score = round(avg_score / REQUIRED_FACES, 2)
if avg_score < self.config.face_recognition.threshold or (
id in self.detected_faces and avg_score <= self.detected_faces[id]
):
logger.debug(
f"Recognized face score {avg_score} is less than threshold ({self.config.face_recognition.threshold}) / previous face score ({self.detected_faces.get(id)})."
)
return
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": avg_score,
},
)
if resp.status_code == 200:
self.detected_faces[id] = avg_score
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height]."""
height, width = input.shape[:2]
return (0, 0, width, height)
def _process_license_plate(
self, obj_data: dict[str, any], frame: np.ndarray
) -> None:
"""Look for license plates in image."""
id = obj_data["id"]
# don't run for non car objects
if obj_data.get("label") != "car":
logger.debug("Not a processing license plate for non car object.")
return
# don't run for stationary car objects
if obj_data.get("stationary") == True:
logger.debug("Not a processing license plate for a stationary car object.")
return
# don't overwrite sub label for objects that have a sub label
# that is not a license plate
if obj_data.get("sub_label") and id not in self.detected_license_plates:
logger.debug(
f"Not processing license plate due to existing sub label: {obj_data.get('sub_label')}."
)
return
license_plate: Optional[dict[str, any]] = None
if self.requires_license_plate_detection:
logger.debug("Running manual license_plate detection.")
car_box = obj_data.get("box")
if not car_box:
return None
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = car_box
car = rgb[top:bottom, left:right]
license_plate = self._detect_license_plate(car)
if not license_plate:
logger.debug("Detected no license plates for car object.")
return
license_plate_frame = car[
license_plate[1] : license_plate[3], license_plate[0] : license_plate[2]
]
license_plate_frame = cv2.cvtColor(license_plate_frame, cv2.COLOR_RGB2BGR)
else:
# don't run for object without attributes
if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.")
return
attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes:
if attr.get("label") != "license_plate":
continue
if license_plate is None or attr.get("score", 0.0) > license_plate.get(
"score", 0.0
):
license_plate = attr
# no license plates detected in this frame
if not license_plate:
return
license_plate_box = license_plate.get("box")
# check that license plate is valid
if (
not license_plate_box
or area(license_plate_box) < self.config.lpr.min_area
):
logger.debug(f"Invalid license plate box {license_plate}")
return
license_plate_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
license_plate_frame = license_plate_frame[
license_plate_box[1] : license_plate_box[3],
license_plate_box[0] : license_plate_box[2],
]
# run detection, returns results sorted by confidence, best first
license_plates, confidences, areas = (
self.license_plate_recognition.process_license_plate(license_plate_frame)
)
logger.debug(f"Text boxes: {license_plates}")
logger.debug(f"Confidences: {confidences}")
logger.debug(f"Areas: {areas}")
if license_plates:
for plate, confidence, text_area in zip(license_plates, confidences, areas):
avg_confidence = (
(sum(confidence) / len(confidence)) if confidence else 0
)
logger.debug(
f"Detected text: {plate} (average confidence: {avg_confidence:.2f}, area: {text_area} pixels)"
)
else:
# no plates found
logger.debug("No text detected")
return
top_plate, top_char_confidences, top_area = (
license_plates[0],
confidences[0],
areas[0],
)
avg_confidence = (
(sum(top_char_confidences) / len(top_char_confidences))
if top_char_confidences
else 0
)
# Check if we have a previously detected plate for this ID
if id in self.detected_license_plates:
prev_plate = self.detected_license_plates[id]["plate"]
prev_char_confidences = self.detected_license_plates[id]["char_confidences"]
prev_area = self.detected_license_plates[id]["area"]
prev_avg_confidence = (
(sum(prev_char_confidences) / len(prev_char_confidences))
if prev_char_confidences
else 0
)
# Define conditions for keeping the previous plate
shorter_than_previous = len(top_plate) < len(prev_plate)
lower_avg_confidence = avg_confidence <= prev_avg_confidence
smaller_area = top_area < prev_area
# Compare character-by-character confidence where possible
min_length = min(len(top_plate), len(prev_plate))
char_confidence_comparison = sum(
1
for i in range(min_length)
if top_char_confidences[i] <= prev_char_confidences[i]
)
worse_char_confidences = char_confidence_comparison >= min_length / 2
if (shorter_than_previous or smaller_area) and (
lower_avg_confidence and worse_char_confidences
):
logger.debug(
f"Keeping previous plate. New plate stats: "
f"length={len(top_plate)}, avg_conf={avg_confidence:.2f}, area={top_area} "
f"vs Previous: length={len(prev_plate)}, avg_conf={prev_avg_confidence:.2f}, area={prev_area}"
)
return
# Check against minimum confidence threshold
if avg_confidence < self.lpr_config.threshold:
logger.debug(
f"Average confidence {avg_confidence} is less than threshold ({self.lpr_config.threshold})"
)
return
# Determine subLabel based on known plates
# Default to the detected plate, use label name if there's a match
sub_label = top_plate
for label, plates in self.lpr_config.known_plates.items():
if top_plate in plates:
sub_label = label
break
# Send the result to the API
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
json={
"camera": obj_data.get("camera"),
"subLabel": sub_label,
"subLabelScore": avg_confidence,
},
)
if resp.status_code == 200:
self.detected_license_plates[id] = {
"plate": top_plate,
"char_confidences": top_char_confidences,
"area": top_area,
}
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)

View File

@ -101,7 +101,7 @@ class ModelDownloader:
self.download_complete.set()
@staticmethod
def download_from_url(url: str, save_path: str, silent: bool = False):
def download_from_url(url: str, save_path: str, silent: bool = False) -> Path:
temporary_filename = Path(save_path).with_name(
os.path.basename(save_path) + ".part"
)
@ -125,6 +125,8 @@ class ModelDownloader:
if not silent:
logger.info(f"Downloading complete: {url}")
return Path(save_path)
@staticmethod
def mark_files_state(
requestor: InterProcessRequestor,