Get stats for embeddings inferences

This commit is contained in:
Nicolas Mowen 2025-01-04 15:05:33 -07:00
parent fbcbb6b088
commit 3faadb633d
6 changed files with 89 additions and 15 deletions

View File

@ -41,6 +41,7 @@ from frigate.const import (
) )
from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.embeddings.types import EmbeddingsMetrics
from frigate.events.audio import AudioProcessor from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor from frigate.events.external import ExternalEventProcessor
@ -89,6 +90,9 @@ class FrigateApp:
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {} self.camera_metrics: dict[str, CameraMetrics] = {}
self.embeddings_metrics: EmbeddingsMetrics | None = (
EmbeddingsMetrics() if config.semantic_search.enabled else None
)
self.ptz_metrics: dict[str, PTZMetrics] = {} self.ptz_metrics: dict[str, PTZMetrics] = {}
self.processes: dict[str, int] = {} self.processes: dict[str, int] = {}
self.embeddings: Optional[EmbeddingsContext] = None self.embeddings: Optional[EmbeddingsContext] = None
@ -235,7 +239,10 @@ class FrigateApp:
embedding_process = util.Process( embedding_process = util.Process(
target=manage_embeddings, target=manage_embeddings,
name="embeddings_manager", name="embeddings_manager",
args=(self.config,), args=(
self.config,
self.embeddings_metrics,
),
) )
embedding_process.daemon = True embedding_process.daemon = True
self.embedding_process = embedding_process self.embedding_process = embedding_process
@ -497,7 +504,11 @@ class FrigateApp:
self.stats_emitter = StatsEmitter( self.stats_emitter = StatsEmitter(
self.config, self.config,
stats_init( stats_init(
self.config, self.camera_metrics, self.detectors, self.processes self.config,
self.camera_metrics,
self.embeddings_metrics,
self.detectors,
self.processes,
), ),
self.stop_event, self.stop_event,
) )

View File

@ -21,12 +21,13 @@ from frigate.util.builtin import serialize
from frigate.util.services import listen from frigate.util.services import listen
from .maintainer import EmbeddingMaintainer from .maintainer import EmbeddingMaintainer
from .types import EmbeddingsMetrics
from .util import ZScoreNormalization from .util import ZScoreNormalization
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def manage_embeddings(config: FrigateConfig) -> None: def manage_embeddings(config: FrigateConfig, metrics: EmbeddingsMetrics) -> None:
# Only initialize embeddings if semantic search is enabled # Only initialize embeddings if semantic search is enabled
if not config.semantic_search.enabled: if not config.semantic_search.enabled:
return return
@ -60,6 +61,7 @@ def manage_embeddings(config: FrigateConfig) -> None:
maintainer = EmbeddingMaintainer( maintainer = EmbeddingMaintainer(
db, db,
config, config,
metrics,
stop_event, stop_event,
) )
maintainer.start() maintainer.start()

View File

@ -1,6 +1,7 @@
"""Maintain embeddings in SQLite-vec.""" """Maintain embeddings in SQLite-vec."""
import base64 import base64
import datetime
import logging import logging
import os import os
import random import random
@ -41,6 +42,7 @@ from frigate.util.image import SharedMemoryFrameManager, area, calculate_region
from frigate.util.model import FaceClassificationModel from frigate.util.model import FaceClassificationModel
from .embeddings import Embeddings from .embeddings import Embeddings
from .types import EmbeddingsMetrics
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -54,10 +56,12 @@ class EmbeddingMaintainer(threading.Thread):
self, self,
db: SqliteQueueDatabase, db: SqliteQueueDatabase,
config: FrigateConfig, config: FrigateConfig,
metrics: EmbeddingsMetrics,
stop_event: MpEvent, stop_event: MpEvent,
) -> None: ) -> None:
super().__init__(name="embeddings_maintainer") super().__init__(name="embeddings_maintainer")
self.config = config self.config = config
self.metrics = metrics
self.embeddings = Embeddings(config, db) self.embeddings = Embeddings(config, db)
# Check if we need to re-index events # Check if we need to re-index events
@ -219,10 +223,22 @@ class EmbeddingMaintainer(threading.Thread):
return return
if self.face_recognition_enabled: if self.face_recognition_enabled:
self._process_face(data, yuv_frame) start = datetime.datetime.now().timestamp()
processed = self._process_face(data, yuv_frame)
if processed:
duration = datetime.datetime.now().timestamp() - start
self.metrics.face_rec_fps.value = (
self.metrics.face_rec_fps.value * 9 + duration
) / 10
if self.lpr_config.enabled: if self.lpr_config.enabled:
start = datetime.datetime.now().timestamp()
self._process_license_plate(data, yuv_frame) self._process_license_plate(data, yuv_frame)
duration = datetime.datetime.now().timestamp() - start
self.metrics.alpr_pps.value = (
self.metrics.alpr_pps.value * 9 + duration
) / 10
# no need to save our own thumbnails if genai is not enabled # no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary # or if the object has become stationary
@ -402,14 +418,14 @@ class EmbeddingMaintainer(threading.Thread):
return face return face
def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> None: def _process_face(self, obj_data: dict[str, any], frame: np.ndarray) -> bool:
"""Look for faces in image.""" """Look for faces in image."""
id = obj_data["id"] id = obj_data["id"]
# don't run for non person objects # don't run for non person objects
if obj_data.get("label") != "person": if obj_data.get("label") != "person":
logger.debug("Not a processing face for non person object.") logger.debug("Not a processing face for non person object.")
return return False
# don't overwrite sub label for objects that have a sub label # don't overwrite sub label for objects that have a sub label
# that is not a face # that is not a face
@ -417,7 +433,7 @@ class EmbeddingMaintainer(threading.Thread):
logger.debug( logger.debug(
f"Not processing face due to existing sub label: {obj_data.get('sub_label')}." f"Not processing face due to existing sub label: {obj_data.get('sub_label')}."
) )
return return False
face: Optional[dict[str, any]] = None face: Optional[dict[str, any]] = None
@ -426,7 +442,7 @@ class EmbeddingMaintainer(threading.Thread):
person_box = obj_data.get("box") person_box = obj_data.get("box")
if not person_box: if not person_box:
return None return False
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
left, top, right, bottom = person_box left, top, right, bottom = person_box
@ -435,7 +451,7 @@ class EmbeddingMaintainer(threading.Thread):
if not face_box: if not face_box:
logger.debug("Detected no faces for person object.") logger.debug("Detected no faces for person object.")
return return False
margin = int((face_box[2] - face_box[0]) * 0.25) margin = int((face_box[2] - face_box[0]) * 0.25)
face_frame = person[ face_frame = person[
@ -451,7 +467,7 @@ class EmbeddingMaintainer(threading.Thread):
# don't run for object without attributes # don't run for object without attributes
if not obj_data.get("current_attributes"): if not obj_data.get("current_attributes"):
logger.debug("No attributes to parse.") logger.debug("No attributes to parse.")
return return False
attributes: list[dict[str, any]] = obj_data.get("current_attributes", []) attributes: list[dict[str, any]] = obj_data.get("current_attributes", [])
for attr in attributes: for attr in attributes:
@ -463,14 +479,14 @@ class EmbeddingMaintainer(threading.Thread):
# no faces detected in this frame # no faces detected in this frame
if not face: if not face:
return return False
face_box = face.get("box") face_box = face.get("box")
# check that face is valid # check that face is valid
if not face_box or area(face_box) < self.config.face_recognition.min_area: if not face_box or area(face_box) < self.config.face_recognition.min_area:
logger.debug(f"Invalid face box {face}") logger.debug(f"Invalid face box {face}")
return return False
face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) face_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
margin = int((face_box[2] - face_box[0]) * 0.25) margin = int((face_box[2] - face_box[0]) * 0.25)
@ -487,7 +503,7 @@ class EmbeddingMaintainer(threading.Thread):
res = self.face_classifier.classify_face(face_frame) res = self.face_classifier.classify_face(face_frame)
if not res: if not res:
return return False
sub_label, score = res sub_label, score = res
@ -512,13 +528,13 @@ class EmbeddingMaintainer(threading.Thread):
logger.debug( logger.debug(
f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}" f"Recognized face distance {score} is less than threshold {self.config.face_recognition.threshold}"
) )
return return True
if id in self.detected_faces and face_score <= self.detected_faces[id]: if id in self.detected_faces and face_score <= self.detected_faces[id]:
logger.debug( logger.debug(
f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})." f"Recognized face distance {score} and overall score {face_score} is less than previous overall face score ({self.detected_faces.get(id)})."
) )
return return True
resp = requests.post( resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label", f"{FRIGATE_LOCALHOST}/api/events/{id}/sub_label",
@ -532,6 +548,8 @@ class EmbeddingMaintainer(threading.Thread):
if resp.status_code == 200: if resp.status_code == 200:
self.detected_faces[id] = face_score self.detected_faces[id] = face_score
return True
def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]: def _detect_license_plate(self, input: np.ndarray) -> tuple[int, int, int, int]:
"""Return the dimensions of the input image as [x, y, width, height].""" """Return the dimensions of the input image as [x, y, width, height]."""
height, width = input.shape[:2] height, width = input.shape[:2]

View File

@ -0,0 +1,17 @@
"""Embeddings types."""
import multiprocessing as mp
from multiprocessing.sharedctypes import Synchronized
class EmbeddingsMetrics:
image_embeddings_fps: Synchronized
text_embeddings_sps: Synchronized
face_rec_fps: Synchronized
alpr_pps: Synchronized
def __init__(self):
self.image_embeddings_fps = mp.Value("d", 0.01)
self.text_embeddings_sps = mp.Value("d", 0.01)
self.face_rec_fps = mp.Value("d", 0.01)
self.alpr_pps = mp.Value("d", 0.01)

View File

@ -14,6 +14,7 @@ from requests.exceptions import RequestException
from frigate.camera import CameraMetrics from frigate.camera import CameraMetrics
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.embeddings.types import EmbeddingsMetrics
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
from frigate.types import StatsTrackingTypes from frigate.types import StatsTrackingTypes
from frigate.util.services import ( from frigate.util.services import (
@ -51,11 +52,13 @@ def get_latest_version(config: FrigateConfig) -> str:
def stats_init( def stats_init(
config: FrigateConfig, config: FrigateConfig,
camera_metrics: dict[str, CameraMetrics], camera_metrics: dict[str, CameraMetrics],
embeddings_metrics: EmbeddingsMetrics | None,
detectors: dict[str, ObjectDetectProcess], detectors: dict[str, ObjectDetectProcess],
processes: dict[str, int], processes: dict[str, int],
) -> StatsTrackingTypes: ) -> StatsTrackingTypes:
stats_tracking: StatsTrackingTypes = { stats_tracking: StatsTrackingTypes = {
"camera_metrics": camera_metrics, "camera_metrics": camera_metrics,
"embeddings_metrics": embeddings_metrics,
"detectors": detectors, "detectors": detectors,
"started": int(time.time()), "started": int(time.time()),
"latest_frigate_version": get_latest_version(config), "latest_frigate_version": get_latest_version(config),
@ -279,6 +282,27 @@ def stats_snapshot(
} }
stats["detection_fps"] = round(total_detection_fps, 2) stats["detection_fps"] = round(total_detection_fps, 2)
if config.semantic_search.enabled:
embeddings_metrics = stats_tracking["embeddings_metrics"]
stats["embeddings"] = {
"image_embedding_speed": round(
embeddings_metrics.image_embeddings_fps.value * 1000, 2
),
"text_embedding_speed": round(
embeddings_metrics.text_embeddings_sps.value * 1000, 2
),
}
if config.face_recognition.enabled:
stats["embeddings"]["face_recognition_speed"] = round(
embeddings_metrics.face_rec_fps.value * 1000, 2
)
if config.lpr.enabled:
stats["embeddings"]["plate_recognition_speed"] = round(
embeddings_metrics.alpr_pps.value * 1000, 2
)
get_processing_stats(config, stats, hwaccel_errors) get_processing_stats(config, stats, hwaccel_errors)
stats["service"] = { stats["service"] = {

View File

@ -2,11 +2,13 @@ from enum import Enum
from typing import TypedDict from typing import TypedDict
from frigate.camera import CameraMetrics from frigate.camera import CameraMetrics
from frigate.embeddings.types import EmbeddingsMetrics
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
class StatsTrackingTypes(TypedDict): class StatsTrackingTypes(TypedDict):
camera_metrics: dict[str, CameraMetrics] camera_metrics: dict[str, CameraMetrics]
embeddings_metrics: EmbeddingsMetrics | None
detectors: dict[str, ObjectDetectProcess] detectors: dict[str, ObjectDetectProcess]
started: int started: int
latest_frigate_version: str latest_frigate_version: str