Halfway point for fixing data processing

This commit is contained in:
Nicolas Mowen 2026-03-26 10:30:58 -06:00
parent 0cf9d7d5b1
commit 206261c322
11 changed files with 138 additions and 109 deletions

View File

@ -3,6 +3,7 @@ import os
import queue import queue
import threading import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any
import cv2 import cv2
import numpy as np import numpy as np
@ -21,7 +22,7 @@ class FaceRecognizer(ABC):
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig) -> None:
self.config = config self.config = config
self.landmark_detector: cv2.face.FacemarkLBF = None self.landmark_detector: cv2.face.Facemark | None = None
self.init_landmark_detector() self.init_landmark_detector()
@abstractmethod @abstractmethod
@ -38,13 +39,14 @@ class FaceRecognizer(ABC):
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
pass pass
@redirect_output_to_logger(logger, logging.DEBUG) @redirect_output_to_logger(logger, logging.DEBUG) # type: ignore[misc]
def init_landmark_detector(self) -> None: def init_landmark_detector(self) -> None:
landmark_model = os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml") landmark_model = os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml")
if os.path.exists(landmark_model): if os.path.exists(landmark_model):
self.landmark_detector = cv2.face.createFacemarkLBF() landmark_detector = cv2.face.createFacemarkLBF()
self.landmark_detector.loadModel(landmark_model) landmark_detector.loadModel(landmark_model)
self.landmark_detector = landmark_detector
def align_face( def align_face(
self, self,
@ -52,8 +54,10 @@ class FaceRecognizer(ABC):
output_width: int, output_width: int,
output_height: int, output_height: int,
) -> np.ndarray: ) -> np.ndarray:
# landmark is run on grayscale images if not self.landmark_detector:
raise ValueError("Landmark detector not initialized")
# landmark is run on grayscale images
if image.ndim == 3: if image.ndim == 3:
land_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) land_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else: else:
@ -131,8 +135,11 @@ class FaceRecognizer(ABC):
def similarity_to_confidence( def similarity_to_confidence(
cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12 cosine_similarity: float,
): median: float = 0.3,
range_width: float = 0.6,
slope_factor: float = 12,
) -> float:
""" """
Default sigmoid function to map cosine similarity to confidence. Default sigmoid function to map cosine similarity to confidence.
@ -151,14 +158,14 @@ def similarity_to_confidence(
bias = median bias = median
# Calculate confidence # Calculate confidence
confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias))) confidence: float = 1 / (1 + np.exp(-slope * (cosine_similarity - bias)))
return confidence return confidence
class FaceNetRecognizer(FaceRecognizer): class FaceNetRecognizer(FaceRecognizer):
def __init__(self, config: FrigateConfig): def __init__(self, config: FrigateConfig):
super().__init__(config) super().__init__(config)
self.mean_embs: dict[int, np.ndarray] = {} self.mean_embs: dict[str, np.ndarray] = {}
self.face_embedder: FaceNetEmbedding = FaceNetEmbedding() self.face_embedder: FaceNetEmbedding = FaceNetEmbedding()
self.model_builder_queue: queue.Queue | None = None self.model_builder_queue: queue.Queue | None = None
@ -168,7 +175,7 @@ class FaceNetRecognizer(FaceRecognizer):
def run_build_task(self) -> None: def run_build_task(self) -> None:
self.model_builder_queue = queue.Queue() self.model_builder_queue = queue.Queue()
def build_model(): def build_model() -> None:
face_embeddings_map: dict[str, list[np.ndarray]] = {} face_embeddings_map: dict[str, list[np.ndarray]] = {}
idx = 0 idx = 0
@ -187,20 +194,21 @@ class FaceNetRecognizer(FaceRecognizer):
img = cv2.imread(os.path.join(face_folder, image)) img = cv2.imread(os.path.join(face_folder, image))
if img is None: if img is None:
continue continue # type: ignore[unreachable]
img = self.align_face(img, img.shape[1], img.shape[0]) img = self.align_face(img, img.shape[1], img.shape[0])
emb = self.face_embedder([img])[0].squeeze() emb = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type]
face_embeddings_map[name].append(emb) face_embeddings_map[name].append(emb)
idx += 1 idx += 1
assert self.model_builder_queue is not None
self.model_builder_queue.put(face_embeddings_map) self.model_builder_queue.put(face_embeddings_map)
thread = threading.Thread(target=build_model, daemon=True) thread = threading.Thread(target=build_model, daemon=True)
thread.start() thread.start()
def build(self): def build(self) -> None:
if not self.landmark_detector: if not self.landmark_detector:
self.init_landmark_detector() self.init_landmark_detector()
return None return None
@ -226,7 +234,7 @@ class FaceNetRecognizer(FaceRecognizer):
logger.debug("Finished building ArcFace model") logger.debug("Finished building ArcFace model")
def classify(self, face_image): def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector: if not self.landmark_detector:
return None return None
@ -245,7 +253,7 @@ class FaceNetRecognizer(FaceRecognizer):
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0]) img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
embedding = self.face_embedder([img])[0].squeeze() embedding = self.face_embedder([img])[0].squeeze()
score = 0 score: float = 0
label = "" label = ""
for name, mean_emb in self.mean_embs.items(): for name, mean_emb in self.mean_embs.items():
@ -268,7 +276,7 @@ class FaceNetRecognizer(FaceRecognizer):
class ArcFaceRecognizer(FaceRecognizer): class ArcFaceRecognizer(FaceRecognizer):
def __init__(self, config: FrigateConfig): def __init__(self, config: FrigateConfig):
super().__init__(config) super().__init__(config)
self.mean_embs: dict[int, np.ndarray] = {} self.mean_embs: dict[str, np.ndarray] = {}
self.face_embedder: ArcfaceEmbedding = ArcfaceEmbedding(config.face_recognition) self.face_embedder: ArcfaceEmbedding = ArcfaceEmbedding(config.face_recognition)
self.model_builder_queue: queue.Queue | None = None self.model_builder_queue: queue.Queue | None = None
@ -278,7 +286,7 @@ class ArcFaceRecognizer(FaceRecognizer):
def run_build_task(self) -> None: def run_build_task(self) -> None:
self.model_builder_queue = queue.Queue() self.model_builder_queue = queue.Queue()
def build_model(): def build_model() -> None:
face_embeddings_map: dict[str, list[np.ndarray]] = {} face_embeddings_map: dict[str, list[np.ndarray]] = {}
idx = 0 idx = 0
@ -297,20 +305,21 @@ class ArcFaceRecognizer(FaceRecognizer):
img = cv2.imread(os.path.join(face_folder, image)) img = cv2.imread(os.path.join(face_folder, image))
if img is None: if img is None:
continue continue # type: ignore[unreachable]
img = self.align_face(img, img.shape[1], img.shape[0]) img = self.align_face(img, img.shape[1], img.shape[0])
emb = self.face_embedder([img])[0].squeeze() emb = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type]
face_embeddings_map[name].append(emb) face_embeddings_map[name].append(emb)
idx += 1 idx += 1
assert self.model_builder_queue is not None
self.model_builder_queue.put(face_embeddings_map) self.model_builder_queue.put(face_embeddings_map)
thread = threading.Thread(target=build_model, daemon=True) thread = threading.Thread(target=build_model, daemon=True)
thread.start() thread.start()
def build(self): def build(self) -> None:
if not self.landmark_detector: if not self.landmark_detector:
self.init_landmark_detector() self.init_landmark_detector()
return None return None
@ -336,7 +345,7 @@ class ArcFaceRecognizer(FaceRecognizer):
logger.debug("Finished building ArcFace model") logger.debug("Finished building ArcFace model")
def classify(self, face_image): def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
if not self.landmark_detector: if not self.landmark_detector:
return None return None
@ -353,9 +362,9 @@ class ArcFaceRecognizer(FaceRecognizer):
# align face and run recognition # align face and run recognition
img = self.align_face(face_image, face_image.shape[1], face_image.shape[0]) img = self.align_face(face_image, face_image.shape[1], face_image.shape[0])
embedding = self.face_embedder([img])[0].squeeze() embedding = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type]
score = 0 score: float = 0
label = "" label = ""
for name, mean_emb in self.mean_embs.items(): for name, mean_emb in self.mean_embs.items():

View File

@ -17,7 +17,7 @@ class PostProcessorApi(ABC):
self, self,
config: FrigateConfig, config: FrigateConfig,
metrics: DataProcessorMetrics, metrics: DataProcessorMetrics,
model_runner: DataProcessorModelRunner, model_runner: DataProcessorModelRunner | None,
) -> None: ) -> None:
self.config = config self.config = config
self.metrics = metrics self.metrics = metrics
@ -41,7 +41,7 @@ class PostProcessorApi(ABC):
@abstractmethod @abstractmethod
def handle_request( def handle_request(
self, topic: str, request_data: dict[str, Any] self, topic: str, request_data: dict[str, Any]
) -> dict[str, Any] | None: ) -> dict[str, Any] | str | None:
"""Handle metadata requests. """Handle metadata requests.
Args: Args:
request_data (dict): containing data about requested change to process. request_data (dict): containing data about requested change to process.

View File

@ -71,7 +71,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
# don't run LPR post processing for now # don't run LPR post processing for now
return return
event_id = data["event_id"] event_id = data["event_id"] # type: ignore[unreachable]
camera_name = data["camera"] camera_name = data["camera"]
if data_type == PostProcessDataEnum.recording: if data_type == PostProcessDataEnum.recording:
@ -225,7 +225,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
logger.debug(f"Post processing plate: {event_id}, {frame_time}") logger.debug(f"Post processing plate: {event_id}, {frame_time}")
self.lpr_process(keyframe_obj_data, frame) self.lpr_process(keyframe_obj_data, frame)
def handle_request(self, topic, request_data) -> dict[str, Any] | None: def handle_request(self, topic: str, request_data: dict) -> dict[str, Any] | None:
if topic == EmbeddingsRequestEnum.reprocess_plate.value: if topic == EmbeddingsRequestEnum.reprocess_plate.value:
event = request_data["event"] event = request_data["event"]
@ -242,3 +242,5 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
"message": "Successfully requested reprocessing of license plate.", "message": "Successfully requested reprocessing of license plate.",
"success": True, "success": True,
} }
return None

View File

@ -24,7 +24,7 @@ from frigate.util.file import get_event_thumbnail_bytes, load_event_snapshot_ima
from frigate.util.image import create_thumbnail, ensure_jpeg_bytes from frigate.util.image import create_thumbnail, ensure_jpeg_bytes
if TYPE_CHECKING: if TYPE_CHECKING:
from frigate.embeddings import Embeddings from frigate.embeddings.embeddings import Embeddings
from ..post.api import PostProcessorApi from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics from ..types import DataProcessorMetrics
@ -139,7 +139,7 @@ class ObjectDescriptionProcessor(PostProcessorApi):
): ):
self._process_genai_description(event, camera_config, thumbnail) self._process_genai_description(event, camera_config, thumbnail)
else: else:
self.cleanup_event(event.id) self.cleanup_event(str(event.id))
def __regenerate_description(self, event_id: str, source: str, force: bool) -> None: def __regenerate_description(self, event_id: str, source: str, force: bool) -> None:
"""Regenerate the description for an event.""" """Regenerate the description for an event."""
@ -149,17 +149,17 @@ class ObjectDescriptionProcessor(PostProcessorApi):
logger.error(f"Event {event_id} not found for description regeneration") logger.error(f"Event {event_id} not found for description regeneration")
return return
if self.genai_client is None: camera_config = self.config.cameras[str(event.camera)]
logger.error("GenAI not enabled")
return
camera_config = self.config.cameras[event.camera]
if not camera_config.objects.genai.enabled and not force: if not camera_config.objects.genai.enabled and not force:
logger.error(f"GenAI not enabled for camera {event.camera}") logger.error(f"GenAI not enabled for camera {event.camera}")
return return
thumbnail = get_event_thumbnail_bytes(event) thumbnail = get_event_thumbnail_bytes(event)
if thumbnail is None:
logger.error("No thumbnail available for %s", event.id)
return
# ensure we have a jpeg to pass to the model # ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail) thumbnail = ensure_jpeg_bytes(thumbnail)
@ -187,7 +187,9 @@ class ObjectDescriptionProcessor(PostProcessorApi):
) )
) )
self._genai_embed_description(event, embed_image) self._genai_embed_description(
event, [img for img in embed_image if img is not None]
)
def process_data(self, frame_data: dict, data_type: PostProcessDataEnum) -> None: def process_data(self, frame_data: dict, data_type: PostProcessDataEnum) -> None:
"""Process a frame update.""" """Process a frame update."""
@ -241,7 +243,7 @@ class ObjectDescriptionProcessor(PostProcessorApi):
# Crop snapshot based on region # Crop snapshot based on region
# provide full image if region doesn't exist (manual events) # provide full image if region doesn't exist (manual events)
height, width = img.shape[:2] height, width = img.shape[:2]
x1_rel, y1_rel, width_rel, height_rel = event.data.get( x1_rel, y1_rel, width_rel, height_rel = event.data.get( # type: ignore[attr-defined]
"region", [0, 0, 1, 1] "region", [0, 0, 1, 1]
) )
x1, y1 = int(x1_rel * width), int(y1_rel * height) x1, y1 = int(x1_rel * width), int(y1_rel * height)
@ -258,14 +260,16 @@ class ObjectDescriptionProcessor(PostProcessorApi):
return None return None
def _process_genai_description( def _process_genai_description(
self, event: Event, camera_config: CameraConfig, thumbnail self, event: Event, camera_config: CameraConfig, thumbnail: bytes
) -> None: ) -> None:
event_id = str(event.id)
if event.has_snapshot and camera_config.objects.genai.use_snapshot: if event.has_snapshot and camera_config.objects.genai.use_snapshot:
snapshot_image = self._read_and_crop_snapshot(event) snapshot_image = self._read_and_crop_snapshot(event)
if not snapshot_image: if not snapshot_image:
return return
num_thumbnails = len(self.tracked_events.get(event.id, [])) num_thumbnails = len(self.tracked_events.get(event_id, []))
# ensure we have a jpeg to pass to the model # ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail) thumbnail = ensure_jpeg_bytes(thumbnail)
@ -277,7 +281,7 @@ class ObjectDescriptionProcessor(PostProcessorApi):
else ( else (
[ [
data["thumbnail"][:] if data.get("thumbnail") else None data["thumbnail"][:] if data.get("thumbnail") else None
for data in self.tracked_events[event.id] for data in self.tracked_events[event_id]
if data.get("thumbnail") if data.get("thumbnail")
] ]
if num_thumbnails > 0 if num_thumbnails > 0
@ -286,22 +290,22 @@ class ObjectDescriptionProcessor(PostProcessorApi):
) )
if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0: if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0:
logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}") logger.debug(f"Saving {num_thumbnails} thumbnails for event {event_id}")
Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir( Path(os.path.join(CLIPS_DIR, f"genai-requests/{event_id}")).mkdir(
parents=True, exist_ok=True parents=True, exist_ok=True
) )
for idx, data in enumerate(self.tracked_events[event.id], 1): for idx, data in enumerate(self.tracked_events[event_id], 1):
jpg_bytes: bytes | None = data["thumbnail"] jpg_bytes: bytes | None = data["thumbnail"]
if jpg_bytes is None: if jpg_bytes is None:
logger.warning(f"Unable to save thumbnail {idx} for {event.id}.") logger.warning(f"Unable to save thumbnail {idx} for {event_id}.")
else: else:
with open( with open(
os.path.join( os.path.join(
CLIPS_DIR, CLIPS_DIR,
f"genai-requests/{event.id}/{idx}.jpg", f"genai-requests/{event_id}/{idx}.jpg",
), ),
"wb", "wb",
) as j: ) as j:
@ -310,7 +314,7 @@ class ObjectDescriptionProcessor(PostProcessorApi):
# Generate the description. Call happens in a thread since it is network bound. # Generate the description. Call happens in a thread since it is network bound.
threading.Thread( threading.Thread(
target=self._genai_embed_description, target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}", name=f"_genai_embed_description_{event_id}",
daemon=True, daemon=True,
args=( args=(
event, event,
@ -319,12 +323,12 @@ class ObjectDescriptionProcessor(PostProcessorApi):
).start() ).start()
# Clean up tracked events and early request state # Clean up tracked events and early request state
self.cleanup_event(event.id) self.cleanup_event(event_id)
def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None: def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None:
"""Embed the description for an event.""" """Embed the description for an event."""
start = datetime.datetime.now().timestamp() start = datetime.datetime.now().timestamp()
camera_config = self.config.cameras[event.camera] camera_config = self.config.cameras[str(event.camera)]
description = self.genai_client.generate_object_description( description = self.genai_client.generate_object_description(
camera_config, thumbnails, event camera_config, thumbnails, event
) )
@ -346,7 +350,7 @@ class ObjectDescriptionProcessor(PostProcessorApi):
# Embed the description # Embed the description
if self.config.semantic_search.enabled: if self.config.semantic_search.enabled:
self.embeddings.embed_description(event.id, description) self.embeddings.embed_description(str(event.id), description)
# Check semantic trigger for this description # Check semantic trigger for this description
if self.semantic_trigger_processor is not None: if self.semantic_trigger_processor is not None:

View File

@ -48,8 +48,8 @@ class ReviewDescriptionProcessor(PostProcessorApi):
self.metrics = metrics self.metrics = metrics
self.genai_client = client self.genai_client = client
self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed) self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed)
self.review_descs_dps = EventsPerSecond() self.review_desc_dps = EventsPerSecond()
self.review_descs_dps.start() self.review_desc_dps.start()
def calculate_frame_count( def calculate_frame_count(
self, self,
@ -59,7 +59,7 @@ class ReviewDescriptionProcessor(PostProcessorApi):
) -> int: ) -> int:
"""Calculate optimal number of frames based on context size, image source, and resolution. """Calculate optimal number of frames based on context size, image source, and resolution.
Token usage varies by resolution: larger images (ultrawide aspect ratios) use more tokens. Token usage varies by resolution: larger images (ultra-wide aspect ratios) use more tokens.
Estimates ~1 token per 1250 pixels. Targets 98% context utilization with safety margin. Estimates ~1 token per 1250 pixels. Targets 98% context utilization with safety margin.
Capped at 20 frames. Capped at 20 frames.
""" """
@ -68,7 +68,11 @@ class ReviewDescriptionProcessor(PostProcessorApi):
detect_width = camera_config.detect.width detect_width = camera_config.detect.width
detect_height = camera_config.detect.height detect_height = camera_config.detect.height
aspect_ratio = detect_width / detect_height
if not detect_width or not detect_height:
aspect_ratio = 16 / 9
else:
aspect_ratio = detect_width / detect_height
if image_source == ImageSourceEnum.recordings: if image_source == ImageSourceEnum.recordings:
if aspect_ratio >= 1: if aspect_ratio >= 1:
@ -99,8 +103,10 @@ class ReviewDescriptionProcessor(PostProcessorApi):
return min(max(max_frames, 3), 20) return min(max(max_frames, 3), 20)
def process_data(self, data, data_type): def process_data(
self.metrics.review_desc_dps.value = self.review_descs_dps.eps() self, data: dict[str, Any], data_type: PostProcessDataEnum
) -> None:
self.metrics.review_desc_dps.value = self.review_desc_dps.eps()
if data_type != PostProcessDataEnum.review: if data_type != PostProcessDataEnum.review:
return return
@ -186,7 +192,7 @@ class ReviewDescriptionProcessor(PostProcessorApi):
) )
# kickoff analysis # kickoff analysis
self.review_descs_dps.update() self.review_desc_dps.update()
threading.Thread( threading.Thread(
target=run_analysis, target=run_analysis,
args=( args=(
@ -202,7 +208,7 @@ class ReviewDescriptionProcessor(PostProcessorApi):
), ),
).start() ).start()
def handle_request(self, topic, request_data): def handle_request(self, topic: str, request_data: dict[str, Any]) -> str | None:
if topic == EmbeddingsRequestEnum.summarize_review.value: if topic == EmbeddingsRequestEnum.summarize_review.value:
start_ts = request_data["start_ts"] start_ts = request_data["start_ts"]
end_ts = request_data["end_ts"] end_ts = request_data["end_ts"]
@ -327,7 +333,7 @@ class ReviewDescriptionProcessor(PostProcessorApi):
file_start = f"preview_{camera}-" file_start = f"preview_{camera}-"
start_file = f"{file_start}{start_time}.webp" start_file = f"{file_start}{start_time}.webp"
end_file = f"{file_start}{end_time}.webp" end_file = f"{file_start}{end_time}.webp"
all_frames = [] all_frames: list[str] = []
for file in sorted(os.listdir(preview_dir)): for file in sorted(os.listdir(preview_dir)):
if not file.startswith(file_start): if not file.startswith(file_start):
@ -465,7 +471,7 @@ class ReviewDescriptionProcessor(PostProcessorApi):
thumb_data = cv2.imread(thumb_path) thumb_data = cv2.imread(thumb_path)
if thumb_data is None: if thumb_data is None:
logger.warning( logger.warning( # type: ignore[unreachable]
"Could not read preview frame at %s, skipping", thumb_path "Could not read preview frame at %s, skipping", thumb_path
) )
continue continue
@ -488,13 +494,12 @@ class ReviewDescriptionProcessor(PostProcessorApi):
return thumbs return thumbs
@staticmethod
def run_analysis( def run_analysis(
requestor: InterProcessRequestor, requestor: InterProcessRequestor,
genai_client: GenAIClient, genai_client: GenAIClient,
review_inference_speed: InferenceSpeed, review_inference_speed: InferenceSpeed,
camera_config: CameraConfig, camera_config: CameraConfig,
final_data: dict[str, str], final_data: dict[str, Any],
thumbs: list[bytes], thumbs: list[bytes],
genai_config: GenAIReviewConfig, genai_config: GenAIReviewConfig,
labelmap_objects: list[str], labelmap_objects: list[str],

View File

@ -19,6 +19,7 @@ from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR from frigate.const import CONFIG_DIR
from frigate.data_processing.types import PostProcessDataEnum from frigate.data_processing.types import PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings.embeddings import Embeddings
from frigate.embeddings.util import ZScoreNormalization from frigate.embeddings.util import ZScoreNormalization
from frigate.models import Event, Trigger from frigate.models import Event, Trigger
from frigate.util.builtin import cosine_distance from frigate.util.builtin import cosine_distance
@ -40,8 +41,8 @@ class SemanticTriggerProcessor(PostProcessorApi):
requestor: InterProcessRequestor, requestor: InterProcessRequestor,
sub_label_publisher: EventMetadataPublisher, sub_label_publisher: EventMetadataPublisher,
metrics: DataProcessorMetrics, metrics: DataProcessorMetrics,
embeddings, embeddings: Embeddings,
): ) -> None:
super().__init__(config, metrics, None) super().__init__(config, metrics, None)
self.db = db self.db = db
self.embeddings = embeddings self.embeddings = embeddings
@ -236,11 +237,14 @@ class SemanticTriggerProcessor(PostProcessorApi):
return return
# Skip the event if not an object # Skip the event if not an object
if event.data.get("type") != "object": if event.data.get("type") != "object": # type: ignore[attr-defined]
return return
thumbnail_bytes = get_event_thumbnail_bytes(event) thumbnail_bytes = get_event_thumbnail_bytes(event)
if thumbnail_bytes is None:
return
nparr = np.frombuffer(thumbnail_bytes, np.uint8) nparr = np.frombuffer(thumbnail_bytes, np.uint8)
thumbnail = cv2.imdecode(nparr, cv2.IMREAD_COLOR) thumbnail = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
@ -262,8 +266,10 @@ class SemanticTriggerProcessor(PostProcessorApi):
thumbnail, thumbnail,
) )
def handle_request(self, topic, request_data): def handle_request(
self, topic: str, request_data: dict[str, Any]
) -> dict[str, Any] | str | None:
return None return None
def expire_object(self, object_id, camera): def expire_object(self, object_id: str, camera: str) -> None:
pass pass

View File

@ -52,11 +52,11 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.face_config = config.face_recognition self.face_config = config.face_recognition
self.requestor = requestor self.requestor = requestor
self.sub_label_publisher = sub_label_publisher self.sub_label_publisher = sub_label_publisher
self.face_detector: cv2.FaceDetectorYN = None self.face_detector: cv2.FaceDetectorYN | None = None
self.requires_face_detection = "face" not in self.config.objects.all_objects self.requires_face_detection = "face" not in self.config.objects.all_objects
self.person_face_history: dict[str, list[tuple[str, float, int]]] = {} self.person_face_history: dict[str, list[tuple[str, float, int]]] = {}
self.camera_current_people: dict[str, list[str]] = {} self.camera_current_people: dict[str, list[str]] = {}
self.recognizer: FaceRecognizer | None = None self.recognizer: FaceRecognizer
self.faces_per_second = EventsPerSecond() self.faces_per_second = EventsPerSecond()
self.inference_speed = InferenceSpeed(self.metrics.face_rec_speed) self.inference_speed = InferenceSpeed(self.metrics.face_rec_speed)
@ -78,7 +78,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.downloader = ModelDownloader( self.downloader = ModelDownloader(
model_name="facedet", model_name="facedet",
download_path=download_path, download_path=download_path,
file_names=self.model_files.keys(), file_names=list(self.model_files.keys()),
download_func=self.__download_models, download_func=self.__download_models,
complete_func=self.__build_detector, complete_func=self.__build_detector,
) )
@ -134,7 +134,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
def __detect_face( def __detect_face(
self, input: np.ndarray, threshold: float self, input: np.ndarray, threshold: float
) -> tuple[int, int, int, int]: ) -> tuple[int, int, int, int] | None:
"""Detect faces in input image.""" """Detect faces in input image."""
if not self.face_detector: if not self.face_detector:
return None return None
@ -153,7 +153,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
faces = self.face_detector.detect(input) faces = self.face_detector.detect(input)
if faces is None or faces[1] is None: if faces is None or faces[1] is None:
return None return None # type: ignore[unreachable]
face = None face = None
@ -168,7 +168,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
h: int = int(raw_bbox[3] / scale_factor) h: int = int(raw_bbox[3] / scale_factor)
bbox = (x, y, x + w, y + h) bbox = (x, y, x + w, y + h)
if face is None or area(bbox) > area(face): if face is None or area(bbox) > area(face): # type: ignore[unreachable]
face = bbox face = bbox
return face return face
@ -177,7 +177,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.faces_per_second.update() self.faces_per_second.update()
self.inference_speed.update(duration) self.inference_speed.update(duration)
def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray): def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None:
"""Look for faces in image.""" """Look for faces in image."""
self.metrics.face_rec_fps.value = self.faces_per_second.eps() self.metrics.face_rec_fps.value = self.faces_per_second.eps()
camera = obj_data["camera"] camera = obj_data["camera"]
@ -349,7 +349,9 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.__update_metrics(datetime.datetime.now().timestamp() - start) self.__update_metrics(datetime.datetime.now().timestamp() - start)
def handle_request(self, topic, request_data) -> dict[str, Any] | None: def handle_request(
self, topic: str, request_data: dict[str, Any]
) -> dict[str, Any] | None:
if topic == EmbeddingsRequestEnum.clear_face_classifier.value: if topic == EmbeddingsRequestEnum.clear_face_classifier.value:
self.recognizer.clear() self.recognizer.clear()
return {"success": True, "message": "Face classifier cleared."} return {"success": True, "message": "Face classifier cleared."}
@ -432,7 +434,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
img = cv2.imread(current_file) img = cv2.imread(current_file)
if img is None: if img is None:
return { return { # type: ignore[unreachable]
"message": "Invalid image file.", "message": "Invalid image file.",
"success": False, "success": False,
} }
@ -469,7 +471,9 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
"score": score, "score": score,
} }
def expire_object(self, object_id: str, camera: str): return None
def expire_object(self, object_id: str, camera: str) -> None:
if object_id in self.person_face_history: if object_id in self.person_face_history:
self.person_face_history.pop(object_id) self.person_face_history.pop(object_id)
@ -478,7 +482,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
def weighted_average( def weighted_average(
self, results_list: list[tuple[str, float, int]], max_weight: int = 4000 self, results_list: list[tuple[str, float, int]], max_weight: int = 4000
): ) -> tuple[str | None, float]:
""" """
Calculates a robust weighted average, capping the area weight and giving more weight to higher scores. Calculates a robust weighted average, capping the area weight and giving more weight to higher scores.
@ -493,8 +497,8 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
return None, 0.0 return None, 0.0
counts: dict[str, int] = {} counts: dict[str, int] = {}
weighted_scores: dict[str, int] = {} weighted_scores: dict[str, float] = {}
total_weights: dict[str, int] = {} total_weights: dict[str, float] = {}
for name, score, face_area in results_list: for name, score, face_area in results_list:
if name == "unknown": if name == "unknown":
@ -509,7 +513,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
counts[name] += 1 counts[name] += 1
# Capped weight based on face area # Capped weight based on face area
weight = min(face_area, max_weight) weight: float = min(face_area, max_weight)
# Score-based weighting (higher scores get more weight) # Score-based weighting (higher scores get more weight)
weight *= (score - self.face_config.unknown_score) * 10 weight *= (score - self.face_config.unknown_score) * 10
@ -519,7 +523,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
if not weighted_scores: if not weighted_scores:
return None, 0.0 return None, 0.0
best_name = max(weighted_scores, key=weighted_scores.get) best_name = max(weighted_scores, key=lambda k: weighted_scores[k])
# If the number of faces for this person < min_faces, we are not confident it is a correct result # If the number of faces for this person < min_faces, we are not confident it is a correct result
if counts[best_name] < self.face_config.min_faces: if counts[best_name] < self.face_config.min_faces:

View File

@ -61,14 +61,16 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess
self, self,
obj_data: dict[str, Any], obj_data: dict[str, Any],
frame: np.ndarray, frame: np.ndarray,
dedicated_lpr: bool | None = False, dedicated_lpr: bool = False,
): ) -> None:
"""Look for license plates in image.""" """Look for license plates in image."""
self.lpr_process(obj_data, frame, dedicated_lpr) self.lpr_process(obj_data, frame, dedicated_lpr)
def handle_request(self, topic, request_data) -> dict[str, Any] | None: def handle_request(
return self, topic: str, request_data: dict[str, Any]
) -> dict[str, Any] | None:
return None
def expire_object(self, object_id: str, camera: str): def expire_object(self, object_id: str, camera: str) -> None:
"""Expire lpr objects.""" """Expire lpr objects."""
self.lpr_expire(object_id, camera) self.lpr_expire(object_id, camera)

View File

@ -1,8 +1,8 @@
"""Embeddings types.""" """Embeddings types."""
from enum import Enum from enum import Enum
from multiprocessing.managers import SyncManager from multiprocessing.managers import DictProxy, SyncManager, ValueProxy
from multiprocessing.sharedctypes import Synchronized from typing import Any
import sherpa_onnx import sherpa_onnx
@ -10,22 +10,22 @@ from frigate.data_processing.real_time.whisper_online import FasterWhisperASR
class DataProcessorMetrics: class DataProcessorMetrics:
image_embeddings_speed: Synchronized image_embeddings_speed: ValueProxy[float]
image_embeddings_eps: Synchronized image_embeddings_eps: ValueProxy[float]
text_embeddings_speed: Synchronized text_embeddings_speed: ValueProxy[float]
text_embeddings_eps: Synchronized text_embeddings_eps: ValueProxy[float]
face_rec_speed: Synchronized face_rec_speed: ValueProxy[float]
face_rec_fps: Synchronized face_rec_fps: ValueProxy[float]
alpr_speed: Synchronized alpr_speed: ValueProxy[float]
alpr_pps: Synchronized alpr_pps: ValueProxy[float]
yolov9_lpr_speed: Synchronized yolov9_lpr_speed: ValueProxy[float]
yolov9_lpr_pps: Synchronized yolov9_lpr_pps: ValueProxy[float]
review_desc_speed: Synchronized review_desc_speed: ValueProxy[float]
review_desc_dps: Synchronized review_desc_dps: ValueProxy[float]
object_desc_speed: Synchronized object_desc_speed: ValueProxy[float]
object_desc_dps: Synchronized object_desc_dps: ValueProxy[float]
classification_speeds: dict[str, Synchronized] classification_speeds: DictProxy[str, ValueProxy[float]]
classification_cps: dict[str, Synchronized] classification_cps: DictProxy[str, ValueProxy[float]]
def __init__(self, manager: SyncManager, custom_classification_models: list[str]): def __init__(self, manager: SyncManager, custom_classification_models: list[str]):
self.image_embeddings_speed = manager.Value("d", 0.0) self.image_embeddings_speed = manager.Value("d", 0.0)
@ -52,7 +52,7 @@ class DataProcessorMetrics:
class DataProcessorModelRunner: class DataProcessorModelRunner:
def __init__(self, requestor, device: str = "CPU", model_size: str = "large"): def __init__(self, requestor: Any, device: str = "CPU", model_size: str = "large"):
self.requestor = requestor self.requestor = requestor
self.device = device self.device = device
self.model_size = model_size self.model_size = model_size

View File

@ -30,9 +30,6 @@ ignore_errors = true
[mypy-frigate.config.*] [mypy-frigate.config.*]
ignore_errors = true ignore_errors = true
[mypy-frigate.data_processing.*]
ignore_errors = true
[mypy-frigate.db.*] [mypy-frigate.db.*]
ignore_errors = true ignore_errors = true

View File

@ -12,7 +12,7 @@ import shlex
import struct import struct
import urllib.parse import urllib.parse
from collections.abc import Mapping from collections.abc import Mapping
from multiprocessing.sharedctypes import Synchronized from multiprocessing.managers import ValueProxy
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Union from typing import Any, Dict, Optional, Tuple, Union
@ -64,7 +64,7 @@ class EventsPerSecond:
class InferenceSpeed: class InferenceSpeed:
def __init__(self, metric: Synchronized) -> None: def __init__(self, metric: ValueProxy[float]) -> None:
self.__metric = metric self.__metric = metric
self.__initialized = False self.__initialized = False