embeddings maintainer and trigger post-processor

This commit is contained in:
Josh Hawkins 2025-06-29 16:40:38 -05:00
parent 34d184a3b9
commit 783c20a6fc
5 changed files with 362 additions and 122 deletions

View File

@ -0,0 +1,275 @@
"""Real time processor to trigger alerts by matching embeddings."""
import datetime
import logging
from typing import Any
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.data_processing.types import PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings.util import ZScoreNormalization
from frigate.models import Event, Trigger
from frigate.util.builtin import cosine_distance
from frigate.util.path import get_event_thumbnail_bytes
from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
WRITE_DEBUG_IMAGES = True
class SemanticTriggerProcessor(PostProcessorApi):
def __init__(
self,
db: SqliteVecQueueDatabase,
config: FrigateConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
embeddings,
):
super().__init__(config, metrics, None)
self.db = db
self.embeddings = embeddings
self.requestor = requestor
self.trigger_embeddings: list[np.ndarray] = []
self.thumb_stats = ZScoreNormalization()
def process_data(
self, data: dict[str, Any], data_type: PostProcessDataEnum
) -> None:
event_id = data["event_id"]
camera = data["camera"]
process_type = data["type"]
logger.info(
f"semantic trigger event_id: {event_id}, type: {process_type}, camera: {camera}"
)
# TODO: check if triggers exist for this camera, bail if none
# Get embeddings based on type
thumbnail_embedding = None
description_embedding = None
if process_type == "image":
cursor = self.db.execute_sql(
"""
SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?
""",
[event_id],
)
row = cursor.fetchone() if cursor else None
if row:
thumbnail_embedding = np.frombuffer(row[0], dtype=np.float32)
if process_type == "text":
cursor = self.db.execute_sql(
"""
SELECT description_embedding FROM vec_descriptions WHERE id = ?
""",
[event_id],
)
row = cursor.fetchone() if cursor else None
if row:
description_embedding = np.frombuffer(row[0], dtype=np.float32)
# Skip processing if we don't have any embeddings
if thumbnail_embedding is None and description_embedding is None:
logger.warning(f"No embeddings found for event_id: {event_id}")
return
triggers = (
Trigger.select(
Trigger.camera,
Trigger.name,
Trigger.data,
Trigger.type,
Trigger.embedding,
Trigger.threshold,
)
.where(Trigger.camera == camera)
.dicts()
.iterator()
)
for trigger in triggers:
logger.debug(f"Processing trigger: {trigger['camera']}_{trigger['name']}")
trigger_embedding = np.frombuffer(trigger["embedding"], dtype=np.float32)
# Determine which embedding to compare based on trigger type
if trigger["type"] == "image" and thumbnail_embedding is not None:
data_embedding = thumbnail_embedding
normalized_distance = self.thumb_stats.normalize(
[cosine_distance(data_embedding, trigger_embedding)],
save_stats=False,
)[0]
elif trigger["type"] == "text" and description_embedding is not None:
data_embedding = description_embedding
normalized_distance = cosine_distance(data_embedding, trigger_embedding)
elif trigger["type"] == "both":
# For "both" type triggers, check both embeddings and use the best match
similarities = []
similarity_sources = [] # Track which embedding produced each similarity
if thumbnail_embedding is not None:
thumb_distance = cosine_distance(
thumbnail_embedding, trigger_embedding
)
thumb_normalized = self.thumb_stats.normalize(
[thumb_distance], save_stats=False
)[0]
thumb_similarity = 1 - thumb_normalized
similarities.append(thumb_similarity)
similarity_sources.append("thumbnail")
if description_embedding is not None:
desc_distance = cosine_distance(
description_embedding, trigger_embedding
)
desc_similarity = 1 - desc_distance
similarities.append(desc_similarity)
similarity_sources.append("description")
if not similarities:
continue # Skip if no valid embeddings
# Find the best similarity and its source
max_similarity_idx = similarities.index(max(similarities))
similarity = similarities[max_similarity_idx]
selected_source = similarity_sources[max_similarity_idx]
normalized_distance = 1 - similarity
# Debug log showing all similarities and which was selected
if len(similarities) > 1:
logger.debug(
f"Both embeddings available for trigger '{trigger['name']}': "
f"thumbnail={similarities[0]:.4f}, description={similarities[1]:.4f}, "
f"selected={selected_source} with similarity={similarity:.4f}"
)
else:
logger.debug(
f"Single embedding available for trigger '{trigger['name']}': "
f"{selected_source}={similarity:.4f}"
)
else:
# Skip trigger if embedding type doesn't match available data
continue
similarity = 1 - normalized_distance
logger.debug(
f"Trigger for {trigger['data'] if trigger['type'] == 'text' else 'image/both'} "
f"(camera: {trigger['camera']}): normalized: {normalized_distance:.4f}, "
f"similarity: {similarity:.4f}, threshold: {trigger['threshold']}"
)
# Check if similarity meets threshold
if similarity >= trigger["threshold"]:
logger.info(
f"Trigger '{trigger['name']}' activated with similarity {similarity:.4f}"
)
if WRITE_DEBUG_IMAGES:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
return
# Skip the event if not an object
if event.data.get("type") != "object":
return
thumbnail_bytes = get_event_thumbnail_bytes(event)
nparr = np.frombuffer(thumbnail_bytes, np.uint8)
thumbnail = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
font_scale = 0.5
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(
thumbnail,
f"{similarity:.4f}",
(10, 30),
font,
fontScale=font_scale,
color=(0, 255, 0),
thickness=2,
)
current_time = int(datetime.datetime.now().timestamp())
cv2.imwrite(
f"debug/frames/trigger-{event_id}_{current_time}.jpg",
thumbnail,
)
if False:
if type == "image":
sql_query = """
SELECT
id,
distance
FROM vec_thumbnails
WHERE thumbnail_embedding MATCH ?
AND k = 100
"""
elif type == "text":
sql_query = """
SELECT
id,
distance
FROM vec_descriptions
WHERE description_embedding MATCH ?
AND k = 100
"""
# Add the IN clause if event_ids is provided and not empty
# this is the only filter supported by sqlite-vec as of 0.1.3
# but it seems to be broken in this version
# if event_id:
# sql_query += " AND id IN ({})".format(",".join("?" * len(event_id)))
# order by distance DESC is not implemented in this version of sqlite-vec
# when it's implemented, we can use cosine similarity
sql_query += " ORDER BY distance"
parameters = [
trigger_embedding
] # + event_ids if event_ids else [query_embedding]
results = self.db.execute_sql(sql_query, parameters).fetchall()
# Extract raw distances
raw_distances = [r[1] for r in results]
# Normalize
normalized_distances = self.thumb_stats.normalize(
raw_distances, save_stats=False
)
# Pair with IDs
normalized_results = list(
zip([r[0] for r in results], normalized_distances)
)
logger.info(
f"Semantic trigger results for event_id {event_id}: {len(normalized_results)} matches found."
)
# Optional: Log top few for inspection
for thumb_id, norm_score in normalized_results[:5]:
logger.debug(
f"Normalized match: {thumb_id} → z-score: {1 - norm_score:.4f}"
)
def handle_request(self, topic, request_data):
return None
def expire_object(self, object_id, camera):
pass

View File

@ -1,81 +0,0 @@
"""Real time processor to trigger alerts by matching embeddings."""
import datetime
import logging
from typing import Any
import cv2
import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import CameraSemanticSearchConfig
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from ..types import DataProcessorMetrics
from .api import RealTimeProcessorApi
logger = logging.getLogger(__name__)
class SemanticTriggerProcessor(RealTimeProcessorApi):
def __init__(
self,
config: FrigateConfig,
trigger_config: CameraSemanticSearchConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
embeddings,
):
super().__init__(config, metrics)
self.embeddings = embeddings
self.trigger_config = trigger_config
self.requestor = requestor
self.image_inference_speed = InferenceSpeed(self.metrics.image_embeddings_speed)
self.image_eps = EventsPerSecond()
self.text_inference_speed = InferenceSpeed(self.metrics.text_embeddings_speed)
self.text_eps = EventsPerSecond()
self.trigger_embeddings: list[np.ndarray] = []
self.last_run = datetime.datetime.now().timestamp()
self.__generate_trigger_embeddings()
def __generate_trigger_embeddings(self) -> None:
self.image_eps.start()
self.text_eps.start()
for trigger in self.trigger_config.triggers:
embedding = self.embeddings.embed_description(None, trigger, upsert=False)
self.trigger_embeddings.append(embedding)
def __update_metrics(self, duration: float) -> None:
self.image_eps.update()
self.image_inference_speed.update(duration)
def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray):
# self.metrics.classification_cps[
# self.model_config.name
# ].value = self.classifications_per_second.eps()
camera = frame_data.get("camera")
now = datetime.datetime.now().timestamp()
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
img_embedding = self.embeddings.embed_thumbnail(None, rgb, upsert=False)
self.__update_metrics(datetime.datetime.now().timestamp() - now)
if camera != "framecache":
return
for trigger_embedding in self.trigger_embeddings:
for trigger in self.trigger_config.triggers:
dot_product = np.dot(img_embedding, trigger_embedding)
norm_img_embedding = np.linalg.norm(img_embedding)
norm_trigger_embedding = np.linalg.norm(trigger_embedding)
logger.info(
f"{camera}: Cosine similarity is {dot_product / (norm_img_embedding * norm_trigger_embedding)}"
)
def handle_request(self, topic, request_data):
return None
def expire_object(self, object_id, camera):
pass

View File

@ -293,3 +293,9 @@ class EmbeddingsContext:
EmbeddingsRequestEnum.embed_description.value,
{"id": None, "description": text, "upsert": False},
)
def generate_image_embedding(self, event_id: str, thumbnail: bytes) -> None:
return self.requestor.send_data(
EmbeddingsRequestEnum.embed_thumbnail.value,
{"id": str(event_id), "thumbnail": str(thumbnail), "upsert": False},
)

View File

@ -14,7 +14,10 @@ import numpy as np
from peewee import DoesNotExist
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
EmbeddingsResponder,
)
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataSubscriber,
@ -46,6 +49,7 @@ from frigate.data_processing.post.audio_transcription import (
from frigate.data_processing.post.license_plate import (
LicensePlatePostProcessor,
)
from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.real_time.api import RealTimeProcessorApi
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
from frigate.data_processing.real_time.custom_classification import (
@ -56,12 +60,11 @@ from frigate.data_processing.real_time.face import FaceRealTimeProcessor
from frigate.data_processing.real_time.license_plate import (
LicensePlateRealTimeProcessor,
)
from frigate.data_processing.real_time.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client
from frigate.models import Event, Recordings
from frigate.models import Event, Recordings, Trigger
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import (
@ -110,7 +113,7 @@ class EmbeddingMaintainer(threading.Thread):
),
load_vec_extension=True,
)
models = [Event, Recordings]
models = [Event, Recordings, Trigger]
db.bind(models)
if config.semantic_search.enabled:
@ -120,6 +123,8 @@ class EmbeddingMaintainer(threading.Thread):
if config.semantic_search.reindex:
self.embeddings.reindex()
# TODO: sync triggers
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
@ -189,16 +194,6 @@ class EmbeddingMaintainer(threading.Thread):
)
)
self.realtime_processors.append(
SemanticTriggerProcessor(
self.config,
self.config.cameras["orlandocam"].semantic_search,
self.requestor,
metrics,
self.embeddings,
)
)
# post processors
self.post_processors: list[PostProcessorApi] = []
@ -222,6 +217,17 @@ class EmbeddingMaintainer(threading.Thread):
AudioTranscriptionPostProcessor(self.config, self.requestor, metrics)
)
if self.config.semantic_search.enabled:
self.post_processors.append(
SemanticTriggerProcessor(
db,
self.config,
self.requestor,
metrics,
self.embeddings,
)
)
self.stop_event = stop_event
self.tracked_events: dict[str, list[Any]] = {}
self.early_request_sent: dict[str, bool] = {}
@ -398,33 +404,6 @@ class EmbeddingMaintainer(threading.Thread):
event_id, camera, updated_db = ended
camera_config = self.config.cameras[camera]
# call any defined post processors
for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor):
recordings_available = self.recordings_available_through.get(camera)
if (
recordings_available is not None
and event_id in self.detected_license_plates
and self.config.cameras[camera].type != "lpr"
):
processor.process_data(
{
"event_id": event_id,
"camera": camera,
"recordings_available": self.recordings_available_through[
camera
],
"obj_data": self.detected_license_plates[event_id][
"obj_data"
],
},
PostProcessDataEnum.recording,
)
elif isinstance(processor, AudioTranscriptionPostProcessor):
continue
else:
processor.process_data(event_id, PostProcessDataEnum.event_id)
# expire in realtime processors
for processor in self.realtime_processors:
processor.expire_object(event_id, camera)
@ -461,6 +440,41 @@ class EmbeddingMaintainer(threading.Thread):
):
self._process_genai_description(event, camera_config, thumbnail)
# call any defined post processors
for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor):
recordings_available = self.recordings_available_through.get(camera)
if (
recordings_available is not None
and event_id in self.detected_license_plates
and self.config.cameras[camera].type != "lpr"
):
processor.process_data(
{
"event_id": event_id,
"camera": camera,
"recordings_available": self.recordings_available_through[
camera
],
"obj_data": self.detected_license_plates[event_id][
"obj_data"
],
},
PostProcessDataEnum.recording,
)
elif isinstance(processor, AudioTranscriptionPostProcessor):
continue
elif isinstance(processor, SemanticTriggerProcessor):
processor.process_data(
{"event_id": event_id, "camera": camera, "type": "image"},
PostProcessDataEnum.tracked_object,
)
else:
processor.process_data(
{"event_id": event_id, "camera": camera},
PostProcessDataEnum.tracked_object,
)
# Delete tracked events based on the event_id
if event_id in self.tracked_events:
del self.tracked_events[event_id]
@ -669,6 +683,16 @@ class EmbeddingMaintainer(threading.Thread):
if self.config.semantic_search.enabled:
self.embeddings.embed_description(event.id, description)
# Check semantic trigger for this description
for processor in self.post_processors:
if isinstance(processor, SemanticTriggerProcessor):
processor.process_data(
{"event_id": event.id, "camera": event.camera, "type": "text"},
PostProcessDataEnum.tracked_object,
)
else:
continue
logger.debug(
"Generated description for %s (%d images): %s",
event.id,

View File

@ -428,3 +428,19 @@ def sanitize_float(value):
if isinstance(value, (int, float)) and not math.isfinite(value):
return 0.0
return value
def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float:
return 1 - cosine_distance(a, b)
def cosine_distance(a: np.ndarray, b: np.ndarray) -> float:
"""Returns cosine distance to match sqlite-vec's calculation."""
dot = np.dot(a, b)
a_mag = np.dot(a, a) # ||a||^2
b_mag = np.dot(b, b) # ||b||^2
if a_mag == 0 or b_mag == 0:
return 1.0
return 1.0 - (dot / (np.sqrt(a_mag) * np.sqrt(b_mag)))