initial sync

This commit is contained in:
Josh Hawkins 2025-07-01 07:24:00 -05:00
parent dcdae14fac
commit cf2640452c
2 changed files with 114 additions and 7 deletions

View File

@ -6,9 +6,13 @@ import os
import threading
import time
from numpy import ndarray
import numpy as np
from peewee import IntegrityError
from playhouse.shortcuts import model_to_dict
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import SemanticSearchModelEnum
@ -19,7 +23,7 @@ from frigate.const import (
)
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event
from frigate.models import Event, Trigger
from frigate.types import ModelStatusTypesEnum
from frigate.util.builtin import EventsPerSecond, InferenceSpeed, serialize
from frigate.util.path import get_event_thumbnail_bytes
@ -165,7 +169,7 @@ class Embeddings:
def embed_thumbnail(
self, event_id: str, thumbnail: bytes, upsert: bool = True
) -> ndarray:
) -> np.ndarray:
"""Embed thumbnail and optionally insert into DB.
@param: event_id in Events DB
@ -192,7 +196,7 @@ class Embeddings:
def batch_embed_thumbnail(
self, event_thumbs: dict[str, bytes], upsert: bool = True
) -> list[ndarray]:
) -> list[np.ndarray]:
"""Embed thumbnails and optionally insert into DB.
@param: event_thumbs Map of Event IDs in DB to thumbnail bytes in jpg format
@ -225,7 +229,7 @@ class Embeddings:
def embed_description(
self, event_id: str, description: str, upsert: bool = True
) -> ndarray:
) -> np.ndarray:
start = datetime.datetime.now().timestamp()
embedding = self.text_embedding([description])[0]
@ -245,7 +249,7 @@ class Embeddings:
def batch_embed_description(
self, event_descriptions: dict[str, str], upsert: bool = True
) -> ndarray:
) -> np.ndarray:
start = datetime.datetime.now().timestamp()
# upsert embeddings one by one to avoid token limit
embeddings = []
@ -401,3 +405,105 @@ class Embeddings:
with self.reindex_lock:
self.reindex_running = False
self.reindex_thread = None
def sync_triggers(self) -> None:
# TODO: fixme
return
for camera in self.config.cameras.values():
# Get all existing triggers for this camera
existing_triggers = {
trigger.name: trigger
for trigger in Trigger.select().where(Trigger.camera == camera.name)
}
# Get all configured trigger names
configured_trigger_names = {
trigger.name for trigger in camera.semantic_search.triggers
}
# Create or update triggers from config
# TODO: copy event thumbnail to triggers image directory
for trigger in camera.semantic_search.triggers:
if trigger.name in existing_triggers:
# Update existing trigger if data has changed
existing_trigger = existing_triggers[trigger.name]
needs_embedding_update = False
if (
existing_trigger.type != trigger.type
or existing_trigger.data != trigger.data
or existing_trigger.threshold != trigger.threshold
):
existing_trigger.type = trigger.type
existing_trigger.data = trigger.data
existing_trigger.threshold = trigger.threshold
needs_embedding_update = True
# Check if embedding is missing or needs update
if not existing_trigger.embedding or needs_embedding_update:
existing_trigger.embedding = self._calculate_trigger_embedding(
trigger
)
needs_embedding_update = True
if needs_embedding_update:
existing_trigger.save()
else:
# Create new trigger
try:
# Calculate embedding for new trigger
embedding = self._calculate_trigger_embedding(trigger)
Trigger.create(
camera=camera.name,
name=trigger.name,
type=trigger.type,
data=trigger.data,
threshold=trigger.threshold,
model=self.config.semantic_search.model,
embedding=embedding,
triggering_event_id="",
last_triggered=None,
)
except IntegrityError:
pass # Handle duplicate creation attempts
# Remove triggers that are no longer in config
triggers_to_remove = (
set(existing_triggers.keys()) - configured_trigger_names
)
if triggers_to_remove:
Trigger.delete().where(
Trigger.camera == camera.name, Trigger.name.in_(triggers_to_remove)
).execute()
def _calculate_trigger_embedding(self, trigger) -> bytes:
"""Calculate embedding for a trigger based on its type and data."""
if trigger.type == "description":
embedding = self.requestor.send_data(
EmbeddingsRequestEnum.embed_description.value,
{"id": None, "description": trigger.data, "upsert": False},
)
return embedding.astype(np.float32).tobytes()
elif trigger.type == "thumbnail":
# return self.requestor.send_data(
# EmbeddingsRequestEnum.embed_thumbnail.value,
# {"id": str(event_id), "thumbnail": str(thumbnail), "upsert": False},
# )
# For image triggers, trigger.data should be an image ID
# Get embedding from vec_thumbnails table
cursor = self.db.execute_sql(
"SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?",
[trigger.data],
)
row = cursor.fetchone() if cursor else None
if row:
return row[0] # Already in bytes format
else:
logger.warning(
f"No thumbnail embedding found for image ID: {trigger.data}"
)
return b""
else:
logger.warning(f"Unknown trigger type: {trigger.type}")
return b""

View File

@ -123,7 +123,8 @@ class EmbeddingMaintainer(threading.Thread):
if config.semantic_search.reindex:
self.embeddings.reindex()
# TODO: sync triggers
# Sync semantic search triggers in db with config
self.embeddings.sync_triggers()
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()