frigate/frigate/embeddings/maintainer.py

821 lines
30 KiB
Python
Raw Normal View History

Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 23:30:45 +03:00
"""Maintain embeddings in SQLite-vec."""
import base64
import datetime
import logging
import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
EmbeddingsResponder,
)
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataSubscriber,
EventMetadataTypeEnum,
)
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.comms.review_updater import ReviewDataSubscriber
from frigate.config import CameraConfig, FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.const import (
CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION,
)
from frigate.data_processing.common.license_plate.model import (
LicensePlateModelRunner,
)
from frigate.data_processing.post.api import PostProcessorApi
from frigate.data_processing.post.audio_transcription import (
AudioTranscriptionPostProcessor,
)
from frigate.data_processing.post.license_plate import (
LicensePlatePostProcessor,
)
from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor
from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.real_time.api import RealTimeProcessorApi
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
from frigate.data_processing.real_time.custom_classification import (
CustomObjectClassificationProcessor,
CustomStateClassificationProcessor,
)
from frigate.data_processing.real_time.face import FaceRealTimeProcessor
from frigate.data_processing.real_time.license_plate import (
LicensePlateRealTimeProcessor,
)
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
Use Fork-Server As Spawn Method (#18682) * Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-06-12 21:12:34 +03:00
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client
from frigate.models import Event, Recordings, Trigger
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import (
SharedMemoryFrameManager,
calculate_region,
ensure_jpeg_bytes,
)
from frigate.util.path import get_event_thumbnail_bytes
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 23:30:45 +03:00
from .embeddings import Embeddings
logger = logging.getLogger(__name__)
MAX_THUMBNAILS = 10
class EmbeddingMaintainer(threading.Thread):
"""Handle embedding queue and post event updates."""
def __init__(
self,
config: FrigateConfig,
Use Fork-Server As Spawn Method (#18682) * Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-06-12 21:12:34 +03:00
metrics: DataProcessorMetrics | None,
stop_event: MpEvent,
) -> None:
super().__init__(name="embeddings_maintainer")
self.config = config
self.metrics = metrics
self.embeddings = None
self.config_updater = CameraConfigUpdateSubscriber(
self.config,
self.config.cameras,
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.object_genai,
CameraConfigUpdateEnum.review_genai,
CameraConfigUpdateEnum.semantic_search,
],
)
Use Fork-Server As Spawn Method (#18682) * Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-06-12 21:12:34 +03:00
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in config.cameras.values() if c.enabled])
),
load_vec_extension=True,
)
models = [Event, Recordings, Trigger]
Use Fork-Server As Spawn Method (#18682) * Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-06-12 21:12:34 +03:00
db.bind(models)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)
# Check if we need to re-index events
if config.semantic_search.reindex:
self.embeddings.reindex()
# Sync semantic search triggers in db with config
self.embeddings.sync_triggers()
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.event_subscriber = EventUpdateSubscriber()
self.event_end_subscriber = EventEndSubscriber()
self.event_metadata_publisher = EventMetadataPublisher()
self.event_metadata_subscriber = EventMetadataSubscriber(
EventMetadataTypeEnum.regenerate_description
)
self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through
)
self.review_subscriber = ReviewDataSubscriber("")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
self.embeddings_responder = EmbeddingsResponder()
self.frame_manager = SharedMemoryFrameManager()
self.detected_license_plates: dict[str, dict[str, Any]] = {}
self.genai_client = get_genai_client(config)
# model runners to share between realtime and post processors
if self.config.lpr.enabled:
lpr_model_runner = LicensePlateModelRunner(
self.requestor,
device=self.config.lpr.device,
model_size=self.config.lpr.model_size,
)
# realtime processors
self.realtime_processors: list[RealTimeProcessorApi] = []
if self.config.face_recognition.enabled:
self.realtime_processors.append(
FaceRealTimeProcessor(
self.config, self.requestor, self.event_metadata_publisher, metrics
)
)
if self.config.classification.bird.enabled:
self.realtime_processors.append(
BirdRealTimeProcessor(
self.config, self.event_metadata_publisher, metrics
)
)
if self.config.lpr.enabled:
self.realtime_processors.append(
LicensePlateRealTimeProcessor(
self.config,
self.requestor,
self.event_metadata_publisher,
metrics,
lpr_model_runner,
self.detected_license_plates,
)
)
for model_config in self.config.classification.custom.values():
self.realtime_processors.append(
CustomStateClassificationProcessor(
self.config, model_config, self.requestor, self.metrics
)
if model_config.state_config != None
else CustomObjectClassificationProcessor(
self.config,
model_config,
self.event_metadata_publisher,
self.metrics,
)
)
# post processors
self.post_processors: list[PostProcessorApi] = []
if any(c.review.genai.enabled_in_config for c in self.config.cameras.values()):
self.post_processors.append(
ReviewDescriptionProcessor(
self.config, self.requestor, self.metrics, self.genai_client
)
)
if self.config.lpr.enabled:
self.post_processors.append(
LicensePlatePostProcessor(
self.config,
self.requestor,
self.event_metadata_publisher,
metrics,
lpr_model_runner,
self.detected_license_plates,
)
)
if any(
c.enabled_in_config and c.audio_transcription.enabled
for c in self.config.cameras.values()
):
self.post_processors.append(
AudioTranscriptionPostProcessor(self.config, self.requestor, metrics)
)
if self.config.semantic_search.enabled:
self.post_processors.append(
SemanticTriggerProcessor(
db,
self.config,
self.requestor,
metrics,
self.embeddings,
)
)
self.stop_event = stop_event
self.tracked_events: dict[str, list[Any]] = {}
self.early_request_sent: dict[str, bool] = {}
# recordings data
self.recordings_available_through: dict[str, float] = {}
def run(self) -> None:
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 23:30:45 +03:00
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
self.config_updater.check_for_updates()
self._process_requests()
self._process_updates()
self._process_recordings_updates()
self._process_review_updates()
self._process_frame_updates()
self._expire_dedicated_lpr()
self._process_finalized()
self._process_event_metadata()
self.config_updater.stop()
self.event_subscriber.stop()
self.event_end_subscriber.stop()
self.recordings_subscriber.stop()
self.detection_subscriber.stop()
self.event_metadata_publisher.stop()
self.event_metadata_subscriber.stop()
self.embeddings_responder.stop()
self.requestor.stop()
logger.info("Exiting embeddings maintenance...")
def _process_requests(self) -> None:
"""Process embeddings requests"""
def _handle_request(topic: str, data: dict[str, Any]) -> str:
try:
# First handle the embedding-specific topics when semantic search is enabled
if self.config.semantic_search.enabled:
if topic == EmbeddingsRequestEnum.embed_description.value:
return serialize(
self.embeddings.embed_description(
data["id"], data["description"]
),
pack=False,
)
elif topic == EmbeddingsRequestEnum.embed_thumbnail.value:
thumbnail = base64.b64decode(data["thumbnail"])
return serialize(
self.embeddings.embed_thumbnail(data["id"], thumbnail),
pack=False,
)
elif topic == EmbeddingsRequestEnum.generate_search.value:
return serialize(
self.embeddings.embed_description("", data, upsert=False),
pack=False,
)
elif topic == EmbeddingsRequestEnum.reindex.value:
response = self.embeddings.start_reindex()
return "started" if response else "in_progress"
processors = [self.realtime_processors, self.post_processors]
for processor_list in processors:
for processor in processor_list:
resp = processor.handle_request(topic, data)
if resp is not None:
return resp
return None
except Exception as e:
logger.error(f"Unable to handle embeddings request {e}", exc_info=True)
self.embeddings_responder.check_for_request(_handle_request)
def _process_updates(self) -> None:
"""Process event updates"""
update = self.event_subscriber.check_for_update()
if update is None:
return
source_type, _, camera, frame_name, data = update
if not camera or source_type != EventTypeEnum.tracked_object:
return
if self.config.semantic_search.enabled:
self.embeddings.update_stats()
camera_config = self.config.cameras[camera]
License plate recognition (ALPR) backend (#14564) * Update version * Face recognition backend (#14495) * Add basic config and face recognition table * Reconfigure updates processing to handle face * Crop frame to face box * Implement face embedding calculation * Get matching face embeddings * Add support face recognition based on existing faces * Use arcface face embeddings instead of generic embeddings model * Add apis for managing faces * Implement face uploading API * Build out more APIs * Add min area config * Handle larger images * Add more debug logs * fix calculation * Reduce timeout * Small tweaks * Use webp images * Use facenet model * Improve face recognition (#14537) * Increase requirements for face to be set * Manage faces properly * Add basic docs * Simplify * Separate out face recognition frome semantic search * Update docs * Formatting * Fix access (#14540) * Face detection (#14544) * Add support for face detection * Add support for detecting faces during registration * Set body size to be larger * Undo * Update version * Face recognition backend (#14495) * Add basic config and face recognition table * Reconfigure updates processing to handle face * Crop frame to face box * Implement face embedding calculation * Get matching face embeddings * Add support face recognition based on existing faces * Use arcface face embeddings instead of generic embeddings model * Add apis for managing faces * Implement face uploading API * Build out more APIs * Add min area config * Handle larger images * Add more debug logs * fix calculation * Reduce timeout * Small tweaks * Use webp images * Use facenet model * Improve face recognition (#14537) * Increase requirements for face to be set * Manage faces properly * Add basic docs * Simplify * Separate out face recognition frome semantic search * Update docs * Formatting * Fix access (#14540) * Face detection (#14544) * Add support for face detection * Add support for detecting faces during registration * Set body size to be larger * Undo * initial foundation for alpr with paddleocr * initial foundation for alpr with paddleocr * initial foundation for alpr with paddleocr * config * config * lpr maintainer * clean up * clean up * fix processing * don't process for stationary cars * fix order * fixes * check for known plates * improved length and character by character confidence * model fixes and small tweaks * docs * placeholder for non frigate+ model lp detection --------- Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2024-10-26 20:07:45 +03:00
# no need to process updated objects if face recognition, lpr, genai are disabled
if (
not camera_config.objects.genai.enabled
and len(self.realtime_processors) == 0
):
return
# Create our own thumbnail based on the bounding box and the frame time
try:
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
except FileNotFoundError:
pass
if yuv_frame is None:
logger.debug(
"Unable to process object update because frame is unavailable."
)
return
for processor in self.realtime_processors:
processor.process_frame(data, yuv_frame)
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if self.genai_client is not None and not data["stationary"]:
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
# Always keep the first thumbnail for the event
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
# check if we're configured to send an early request after a minimum number of updates received
if (
self.genai_client is not None
and camera_config.objects.genai.send_triggers.after_significant_updates
):
if (
len(self.tracked_events.get(data["id"], []))
>= camera_config.objects.genai.send_triggers.after_significant_updates
and data["id"] not in self.early_request_sent
):
if data["has_clip"] and data["has_snapshot"]:
event: Event = Event.get(Event.id == data["id"])
if (
not camera_config.objects.genai.objects
or event.label in camera_config.objects.genai.objects
) and (
not camera_config.objects.genai.required_zones
or set(data["entered_zones"])
& set(camera_config.objects.genai.required_zones)
):
logger.debug(f"{camera} sending early request to GenAI")
self.early_request_sent[data["id"]] = True
threading.Thread(
target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}",
daemon=True,
args=(
event,
[
data["thumbnail"]
for data in self.tracked_events[data["id"]]
],
),
).start()
self.frame_manager.close(frame_name)
def _process_finalized(self) -> None:
"""Process the end of an event."""
while True:
ended = self.event_end_subscriber.check_for_update()
if ended == None:
break
event_id, camera, updated_db = ended
camera_config = self.config.cameras[camera]
# expire in realtime processors
for processor in self.realtime_processors:
processor.expire_object(event_id, camera)
if updated_db:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
continue
# Skip the event if not an object
if event.data.get("type") != "object":
continue
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 23:30:45 +03:00
# Extract valid thumbnail
thumbnail = get_event_thumbnail_bytes(event)
# Embed the thumbnail
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 23:30:45 +03:00
self._embed_thumbnail(event_id, thumbnail)
2025-03-03 20:01:02 +03:00
# Run GenAI
if (
camera_config.objects.genai.enabled
and camera_config.objects.genai.send_triggers.tracked_object_end
and self.genai_client is not None
and (
not camera_config.objects.genai.objects
or event.label in camera_config.objects.genai.objects
)
and (
not camera_config.objects.genai.required_zones
or set(event.zones)
& set(camera_config.objects.genai.required_zones)
)
):
2025-03-03 20:01:02 +03:00
self._process_genai_description(event, camera_config, thumbnail)
# call any defined post processors
for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor):
recordings_available = self.recordings_available_through.get(camera)
if (
recordings_available is not None
and event_id in self.detected_license_plates
and self.config.cameras[camera].type != "lpr"
):
processor.process_data(
{
"event_id": event_id,
"camera": camera,
"recordings_available": self.recordings_available_through[
camera
],
"obj_data": self.detected_license_plates[event_id][
"obj_data"
],
},
PostProcessDataEnum.recording,
)
elif isinstance(processor, AudioTranscriptionPostProcessor):
continue
elif isinstance(processor, SemanticTriggerProcessor):
processor.process_data(
{"event_id": event_id, "camera": camera, "type": "image"},
PostProcessDataEnum.tracked_object,
)
else:
processor.process_data(
{"event_id": event_id, "camera": camera},
PostProcessDataEnum.tracked_object,
)
# Delete tracked events based on the event_id
if event_id in self.tracked_events:
del self.tracked_events[event_id]
def _expire_dedicated_lpr(self) -> None:
"""Remove plates not seen for longer than expiration timeout for dedicated lpr cameras."""
now = datetime.datetime.now().timestamp()
to_remove = []
for id, data in self.detected_license_plates.items():
last_seen = data.get("last_seen", 0)
if not last_seen:
continue
if now - last_seen > self.config.cameras[data["camera"]].lpr.expire_time:
to_remove.append(id)
for id in to_remove:
self.event_metadata_publisher.publish(
(id, now),
EventMetadataTypeEnum.manual_event_end.value,
)
self.detected_license_plates.pop(id)
def _process_recordings_updates(self) -> None:
"""Process recordings updates."""
while True:
recordings_data = self.recordings_subscriber.check_for_update()
if recordings_data == None:
break
camera, recordings_available_through_timestamp = recordings_data
self.recordings_available_through[camera] = (
recordings_available_through_timestamp
)
logger.debug(
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
)
def _process_review_updates(self) -> None:
"""Process review updates."""
while True:
review_updates = self.review_subscriber.check_for_update()
if review_updates == None:
break
for processor in self.post_processors:
if isinstance(processor, ReviewDescriptionProcessor):
processor.process_data(review_updates, PostProcessDataEnum.review)
def _process_event_metadata(self):
# Check for regenerate description requests
(topic, payload) = self.event_metadata_subscriber.check_for_update()
if topic is None:
return
event_id, source, force = payload
if event_id:
self.handle_regenerate_description(
event_id, RegenerateDescriptionEnum(source), force
)
def _process_frame_updates(self) -> None:
"""Process event updates"""
(topic, data) = self.detection_subscriber.check_for_update()
if topic is None:
return
camera, frame_name, _, _, motion_boxes, _ = data
if not camera or len(motion_boxes) == 0:
return
camera_config = self.config.cameras[camera]
2025-05-25 20:02:48 +03:00
dedicated_lpr_enabled = (
camera_config.type == CameraTypeEnum.lpr
and "license_plate" not in camera_config.objects.track
)
2025-05-25 20:02:48 +03:00
if not dedicated_lpr_enabled and len(self.config.classification.custom) == 0:
# no active features that use this data
return
try:
yuv_frame = self.frame_manager.get(
frame_name, camera_config.frame_shape_yuv
)
except FileNotFoundError:
pass
if yuv_frame is None:
logger.debug(
"Unable to process dedicated LPR update because frame is unavailable."
)
return
for processor in self.realtime_processors:
2025-05-25 20:02:48 +03:00
if dedicated_lpr_enabled and isinstance(
processor, LicensePlateRealTimeProcessor
):
processor.process_frame(camera, yuv_frame, True)
if isinstance(processor, CustomStateClassificationProcessor):
processor.process_frame(
{"camera": camera, "motion": motion_boxes}, yuv_frame
)
self.frame_manager.close(frame_name)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
region = calculate_region(
frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4
)
frame = frame[region[1] : region[3], region[0] : region[2]]
width = int(height * frame.shape[1] / frame.shape[0])
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if ret:
return jpg.tobytes()
return None
Use sqlite-vec extension instead of chromadb for embeddings (#14163) * swap sqlite_vec for chroma in requirements * load sqlite_vec in embeddings manager * remove chroma and revamp Embeddings class for sqlite_vec * manual minilm onnx inference * remove chroma in clip model * migrate api from chroma to sqlite_vec * migrate event cleanup from chroma to sqlite_vec * migrate embedding maintainer from chroma to sqlite_vec * genai description for sqlite_vec * load sqlite_vec in main thread db * extend the SqliteQueueDatabase class and use peewee db.execute_sql * search with Event type for similarity * fix similarity search * install and add comment about transformers * fix normalization * add id filter * clean up * clean up * fully remove chroma and add transformers env var * readd uvicorn for fastapi * readd tokenizer parallelism env var * remove chroma from docs * remove chroma from UI * try removing custom pysqlite3 build * hard code limit * optimize queries * revert explore query * fix query * keep building pysqlite3 * single pass fetch and process * remove unnecessary re-embed * update deps * move SqliteVecQueueDatabase to db directory * make search thumbnail take up full size of results box * improve typing * improve model downloading and add status screen * daemon downloading thread * catch case when semantic search is disabled * fix typing * build sqlite_vec from source * resolve conflict * file permissions * try build deps * remove sources * sources * fix thread start * include git in build * reorder embeddings after detectors are started * build with sqlite amalgamation * non-platform specific * use wget instead of curl * remove unzip -d * remove sqlite_vec from requirements and load the compiled version * fix build * avoid race in db connection * add scale_factor and bias to description zscore normalization
2024-10-07 23:30:45 +03:00
def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None:
"""Embed the thumbnail for an event."""
if not self.config.semantic_search.enabled:
return
self.embeddings.embed_thumbnail(event_id, thumbnail)
def _process_genai_description(
self, event: Event, camera_config: CameraConfig, thumbnail
) -> None:
if event.has_snapshot and camera_config.objects.genai.use_snapshot:
2025-03-03 20:01:02 +03:00
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
if not snapshot_image:
return
num_thumbnails = len(self.tracked_events.get(event.id, []))
# ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail)
2025-03-03 20:01:02 +03:00
embed_image = (
[snapshot_image]
if event.has_snapshot and camera_config.objects.genai.use_snapshot
2025-03-03 20:01:02 +03:00
else (
[data["thumbnail"] for data in self.tracked_events[event.id]]
if num_thumbnails > 0
else [thumbnail]
)
)
if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0:
2025-03-03 20:01:02 +03:00
logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}")
Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir(
parents=True, exist_ok=True
)
for idx, data in enumerate(self.tracked_events[event.id], 1):
jpg_bytes: bytes = data["thumbnail"]
if jpg_bytes is None:
logger.warning(f"Unable to save thumbnail {idx} for {event.id}.")
else:
with open(
os.path.join(
CLIPS_DIR,
f"genai-requests/{event.id}/{idx}.jpg",
),
"wb",
) as j:
j.write(jpg_bytes)
# Generate the description. Call happens in a thread since it is network bound.
threading.Thread(
target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}",
daemon=True,
args=(
event,
embed_image,
),
).start()
def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None:
"""Embed the description for an event."""
camera_config = self.config.cameras[event.camera]
description = self.genai_client.generate_object_description(
camera_config, thumbnails, event
)
if not description:
logger.debug("Failed to generate description for %s", event.id)
return
# fire and forget description update
self.requestor.send_data(
UPDATE_EVENT_DESCRIPTION,
{
"type": TrackedObjectUpdateTypesEnum.description,
"id": event.id,
"description": description,
"camera": event.camera,
},
)
# Embed the description
if self.config.semantic_search.enabled:
self.embeddings.embed_description(event.id, description)
# Check semantic trigger for this description
for processor in self.post_processors:
if isinstance(processor, SemanticTriggerProcessor):
processor.process_data(
{"event_id": event.id, "camera": event.camera, "type": "text"},
PostProcessDataEnum.tracked_object,
)
else:
continue
logger.debug(
"Generated description for %s (%d images): %s",
event.id,
len(thumbnails),
description,
)
2025-03-03 20:01:02 +03:00
def _read_and_crop_snapshot(self, event: Event, camera_config) -> bytes | None:
"""Read, decode, and crop the snapshot image."""
snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
if not os.path.isfile(snapshot_file):
logger.error(
f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}"
)
return None
try:
with open(snapshot_file, "rb") as image_file:
snapshot_image = image_file.read()
img = cv2.imdecode(
np.frombuffer(snapshot_image, dtype=np.int8),
cv2.IMREAD_COLOR,
)
# Crop snapshot based on region
# provide full image if region doesn't exist (manual events)
height, width = img.shape[:2]
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
"region", [0, 0, 1, 1]
)
x1, y1 = int(x1_rel * width), int(y1_rel * height)
cropped_image = img[
y1 : y1 + int(height_rel * height),
x1 : x1 + int(width_rel * width),
]
_, buffer = cv2.imencode(".jpg", cropped_image)
return buffer.tobytes()
except Exception:
return None
def handle_regenerate_description(
self, event_id: str, source: str, force: bool
) -> None:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
logger.error(f"Event {event_id} not found for description regeneration")
return
if self.genai_client is None:
logger.error("GenAI not enabled")
return
camera_config = self.config.cameras[event.camera]
if not camera_config.objects.genai.enabled and not force:
logger.error(f"GenAI not enabled for camera {event.camera}")
return
thumbnail = get_event_thumbnail_bytes(event)
# ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail)
logger.debug(
f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}"
)
if event.has_snapshot and source == "snapshot":
2025-03-03 20:01:02 +03:00
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
if not snapshot_image:
return
embed_image = (
[snapshot_image]
if event.has_snapshot and source == "snapshot"
else (
[data["thumbnail"] for data in self.tracked_events[event_id]]
if len(self.tracked_events.get(event_id, [])) > 0
else [thumbnail]
)
)
2025-03-03 20:01:02 +03:00
self._genai_embed_description(event, embed_image)