From cf2640452cf9268989be2717e3384868bee1206e Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Tue, 1 Jul 2025 07:24:00 -0500 Subject: [PATCH] initial sync --- frigate/embeddings/embeddings.py | 118 +++++++++++++++++++++++++++++-- frigate/embeddings/maintainer.py | 3 +- 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 096077916..0a44f1407 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -6,9 +6,13 @@ import os import threading import time -from numpy import ndarray +import numpy as np +from peewee import IntegrityError from playhouse.shortcuts import model_to_dict +from frigate.comms.embeddings_updater import ( + EmbeddingsRequestEnum, +) from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig from frigate.config.classification import SemanticSearchModelEnum @@ -19,7 +23,7 @@ from frigate.const import ( ) from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.models import Event +from frigate.models import Event, Trigger from frigate.types import ModelStatusTypesEnum from frigate.util.builtin import EventsPerSecond, InferenceSpeed, serialize from frigate.util.path import get_event_thumbnail_bytes @@ -165,7 +169,7 @@ class Embeddings: def embed_thumbnail( self, event_id: str, thumbnail: bytes, upsert: bool = True - ) -> ndarray: + ) -> np.ndarray: """Embed thumbnail and optionally insert into DB. @param: event_id in Events DB @@ -192,7 +196,7 @@ class Embeddings: def batch_embed_thumbnail( self, event_thumbs: dict[str, bytes], upsert: bool = True - ) -> list[ndarray]: + ) -> list[np.ndarray]: """Embed thumbnails and optionally insert into DB. @param: event_thumbs Map of Event IDs in DB to thumbnail bytes in jpg format @@ -225,7 +229,7 @@ class Embeddings: def embed_description( self, event_id: str, description: str, upsert: bool = True - ) -> ndarray: + ) -> np.ndarray: start = datetime.datetime.now().timestamp() embedding = self.text_embedding([description])[0] @@ -245,7 +249,7 @@ class Embeddings: def batch_embed_description( self, event_descriptions: dict[str, str], upsert: bool = True - ) -> ndarray: + ) -> np.ndarray: start = datetime.datetime.now().timestamp() # upsert embeddings one by one to avoid token limit embeddings = [] @@ -401,3 +405,105 @@ class Embeddings: with self.reindex_lock: self.reindex_running = False self.reindex_thread = None + + def sync_triggers(self) -> None: + # TODO: fixme + return + for camera in self.config.cameras.values(): + # Get all existing triggers for this camera + existing_triggers = { + trigger.name: trigger + for trigger in Trigger.select().where(Trigger.camera == camera.name) + } + + # Get all configured trigger names + configured_trigger_names = { + trigger.name for trigger in camera.semantic_search.triggers + } + + # Create or update triggers from config + # TODO: copy event thumbnail to triggers image directory + for trigger in camera.semantic_search.triggers: + if trigger.name in existing_triggers: + # Update existing trigger if data has changed + existing_trigger = existing_triggers[trigger.name] + needs_embedding_update = False + + if ( + existing_trigger.type != trigger.type + or existing_trigger.data != trigger.data + or existing_trigger.threshold != trigger.threshold + ): + existing_trigger.type = trigger.type + existing_trigger.data = trigger.data + existing_trigger.threshold = trigger.threshold + needs_embedding_update = True + + # Check if embedding is missing or needs update + if not existing_trigger.embedding or needs_embedding_update: + existing_trigger.embedding = self._calculate_trigger_embedding( + trigger + ) + needs_embedding_update = True + + if needs_embedding_update: + existing_trigger.save() + else: + # Create new trigger + try: + # Calculate embedding for new trigger + embedding = self._calculate_trigger_embedding(trigger) + + Trigger.create( + camera=camera.name, + name=trigger.name, + type=trigger.type, + data=trigger.data, + threshold=trigger.threshold, + model=self.config.semantic_search.model, + embedding=embedding, + triggering_event_id="", + last_triggered=None, + ) + except IntegrityError: + pass # Handle duplicate creation attempts + + # Remove triggers that are no longer in config + triggers_to_remove = ( + set(existing_triggers.keys()) - configured_trigger_names + ) + if triggers_to_remove: + Trigger.delete().where( + Trigger.camera == camera.name, Trigger.name.in_(triggers_to_remove) + ).execute() + + def _calculate_trigger_embedding(self, trigger) -> bytes: + """Calculate embedding for a trigger based on its type and data.""" + if trigger.type == "description": + embedding = self.requestor.send_data( + EmbeddingsRequestEnum.embed_description.value, + {"id": None, "description": trigger.data, "upsert": False}, + ) + return embedding.astype(np.float32).tobytes() + elif trigger.type == "thumbnail": + # return self.requestor.send_data( + # EmbeddingsRequestEnum.embed_thumbnail.value, + # {"id": str(event_id), "thumbnail": str(thumbnail), "upsert": False}, + # ) + # For image triggers, trigger.data should be an image ID + # Get embedding from vec_thumbnails table + cursor = self.db.execute_sql( + "SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?", + [trigger.data], + ) + row = cursor.fetchone() if cursor else None + if row: + return row[0] # Already in bytes format + else: + logger.warning( + f"No thumbnail embedding found for image ID: {trigger.data}" + ) + return b"" + else: + logger.warning(f"Unknown trigger type: {trigger.type}") + return b"" diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 646292470..90a60e314 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -123,7 +123,8 @@ class EmbeddingMaintainer(threading.Thread): if config.semantic_search.reindex: self.embeddings.reindex() - # TODO: sync triggers + # Sync semantic search triggers in db with config + self.embeddings.sync_triggers() # create communication for updating event descriptions self.requestor = InterProcessRequestor()