From 783c20a6fcd52872dfe381952a70ca75a05167d7 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sun, 29 Jun 2025 16:40:38 -0500 Subject: [PATCH] embeddings maintainer and trigger post-processor --- .../data_processing/post/semantic_trigger.py | 275 ++++++++++++++++++ .../real_time/semantic_trigger.py | 81 ------ frigate/embeddings/__init__.py | 6 + frigate/embeddings/maintainer.py | 106 ++++--- frigate/util/builtin.py | 16 + 5 files changed, 362 insertions(+), 122 deletions(-) create mode 100644 frigate/data_processing/post/semantic_trigger.py delete mode 100644 frigate/data_processing/real_time/semantic_trigger.py diff --git a/frigate/data_processing/post/semantic_trigger.py b/frigate/data_processing/post/semantic_trigger.py new file mode 100644 index 000000000..e96960377 --- /dev/null +++ b/frigate/data_processing/post/semantic_trigger.py @@ -0,0 +1,275 @@ +"""Real time processor to trigger alerts by matching embeddings.""" + +import datetime +import logging +from typing import Any + +import cv2 +import numpy as np +from peewee import DoesNotExist + +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import FrigateConfig +from frigate.data_processing.types import PostProcessDataEnum +from frigate.db.sqlitevecq import SqliteVecQueueDatabase +from frigate.embeddings.util import ZScoreNormalization +from frigate.models import Event, Trigger +from frigate.util.builtin import cosine_distance +from frigate.util.path import get_event_thumbnail_bytes + +from ..post.api import PostProcessorApi +from ..types import DataProcessorMetrics + +logger = logging.getLogger(__name__) + +WRITE_DEBUG_IMAGES = True + + +class SemanticTriggerProcessor(PostProcessorApi): + def __init__( + self, + db: SqliteVecQueueDatabase, + config: FrigateConfig, + requestor: InterProcessRequestor, + metrics: DataProcessorMetrics, + embeddings, + ): + super().__init__(config, metrics, None) + self.db = db + self.embeddings = embeddings + self.requestor = requestor + self.trigger_embeddings: list[np.ndarray] = [] + + self.thumb_stats = ZScoreNormalization() + + def process_data( + self, data: dict[str, Any], data_type: PostProcessDataEnum + ) -> None: + event_id = data["event_id"] + camera = data["camera"] + process_type = data["type"] + logger.info( + f"semantic trigger event_id: {event_id}, type: {process_type}, camera: {camera}" + ) + + # TODO: check if triggers exist for this camera, bail if none + + # Get embeddings based on type + thumbnail_embedding = None + description_embedding = None + + if process_type == "image": + cursor = self.db.execute_sql( + """ + SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ? + """, + [event_id], + ) + row = cursor.fetchone() if cursor else None + if row: + thumbnail_embedding = np.frombuffer(row[0], dtype=np.float32) + + if process_type == "text": + cursor = self.db.execute_sql( + """ + SELECT description_embedding FROM vec_descriptions WHERE id = ? + """, + [event_id], + ) + row = cursor.fetchone() if cursor else None + if row: + description_embedding = np.frombuffer(row[0], dtype=np.float32) + + # Skip processing if we don't have any embeddings + if thumbnail_embedding is None and description_embedding is None: + logger.warning(f"No embeddings found for event_id: {event_id}") + return + + triggers = ( + Trigger.select( + Trigger.camera, + Trigger.name, + Trigger.data, + Trigger.type, + Trigger.embedding, + Trigger.threshold, + ) + .where(Trigger.camera == camera) + .dicts() + .iterator() + ) + + for trigger in triggers: + logger.debug(f"Processing trigger: {trigger['camera']}_{trigger['name']}") + + trigger_embedding = np.frombuffer(trigger["embedding"], dtype=np.float32) + + # Determine which embedding to compare based on trigger type + if trigger["type"] == "image" and thumbnail_embedding is not None: + data_embedding = thumbnail_embedding + normalized_distance = self.thumb_stats.normalize( + [cosine_distance(data_embedding, trigger_embedding)], + save_stats=False, + )[0] + elif trigger["type"] == "text" and description_embedding is not None: + data_embedding = description_embedding + normalized_distance = cosine_distance(data_embedding, trigger_embedding) + elif trigger["type"] == "both": + # For "both" type triggers, check both embeddings and use the best match + similarities = [] + similarity_sources = [] # Track which embedding produced each similarity + + if thumbnail_embedding is not None: + thumb_distance = cosine_distance( + thumbnail_embedding, trigger_embedding + ) + thumb_normalized = self.thumb_stats.normalize( + [thumb_distance], save_stats=False + )[0] + thumb_similarity = 1 - thumb_normalized + similarities.append(thumb_similarity) + similarity_sources.append("thumbnail") + + if description_embedding is not None: + desc_distance = cosine_distance( + description_embedding, trigger_embedding + ) + desc_similarity = 1 - desc_distance + similarities.append(desc_similarity) + similarity_sources.append("description") + + if not similarities: + continue # Skip if no valid embeddings + + # Find the best similarity and its source + max_similarity_idx = similarities.index(max(similarities)) + similarity = similarities[max_similarity_idx] + selected_source = similarity_sources[max_similarity_idx] + normalized_distance = 1 - similarity + + # Debug log showing all similarities and which was selected + if len(similarities) > 1: + logger.debug( + f"Both embeddings available for trigger '{trigger['name']}': " + f"thumbnail={similarities[0]:.4f}, description={similarities[1]:.4f}, " + f"selected={selected_source} with similarity={similarity:.4f}" + ) + else: + logger.debug( + f"Single embedding available for trigger '{trigger['name']}': " + f"{selected_source}={similarity:.4f}" + ) + else: + # Skip trigger if embedding type doesn't match available data + continue + + similarity = 1 - normalized_distance + + logger.debug( + f"Trigger for {trigger['data'] if trigger['type'] == 'text' else 'image/both'} " + f"(camera: {trigger['camera']}): normalized: {normalized_distance:.4f}, " + f"similarity: {similarity:.4f}, threshold: {trigger['threshold']}" + ) + + # Check if similarity meets threshold + if similarity >= trigger["threshold"]: + logger.info( + f"Trigger '{trigger['name']}' activated with similarity {similarity:.4f}" + ) + + if WRITE_DEBUG_IMAGES: + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + return + + # Skip the event if not an object + if event.data.get("type") != "object": + return + + thumbnail_bytes = get_event_thumbnail_bytes(event) + + nparr = np.frombuffer(thumbnail_bytes, np.uint8) + thumbnail = cv2.imdecode(nparr, cv2.IMREAD_COLOR) + + font_scale = 0.5 + font = cv2.FONT_HERSHEY_SIMPLEX + cv2.putText( + thumbnail, + f"{similarity:.4f}", + (10, 30), + font, + fontScale=font_scale, + color=(0, 255, 0), + thickness=2, + ) + + current_time = int(datetime.datetime.now().timestamp()) + cv2.imwrite( + f"debug/frames/trigger-{event_id}_{current_time}.jpg", + thumbnail, + ) + + if False: + if type == "image": + sql_query = """ + SELECT + id, + distance + FROM vec_thumbnails + WHERE thumbnail_embedding MATCH ? + AND k = 100 + """ + elif type == "text": + sql_query = """ + SELECT + id, + distance + FROM vec_descriptions + WHERE description_embedding MATCH ? + AND k = 100 + """ + + # Add the IN clause if event_ids is provided and not empty + # this is the only filter supported by sqlite-vec as of 0.1.3 + # but it seems to be broken in this version + # if event_id: + # sql_query += " AND id IN ({})".format(",".join("?" * len(event_id))) + + # order by distance DESC is not implemented in this version of sqlite-vec + # when it's implemented, we can use cosine similarity + sql_query += " ORDER BY distance" + + parameters = [ + trigger_embedding + ] # + event_ids if event_ids else [query_embedding] + + results = self.db.execute_sql(sql_query, parameters).fetchall() + # Extract raw distances + raw_distances = [r[1] for r in results] + + # Normalize + normalized_distances = self.thumb_stats.normalize( + raw_distances, save_stats=False + ) + + # Pair with IDs + normalized_results = list( + zip([r[0] for r in results], normalized_distances) + ) + + logger.info( + f"Semantic trigger results for event_id {event_id}: {len(normalized_results)} matches found." + ) + + # Optional: Log top few for inspection + for thumb_id, norm_score in normalized_results[:5]: + logger.debug( + f"Normalized match: {thumb_id} → z-score: {1 - norm_score:.4f}" + ) + + def handle_request(self, topic, request_data): + return None + + def expire_object(self, object_id, camera): + pass diff --git a/frigate/data_processing/real_time/semantic_trigger.py b/frigate/data_processing/real_time/semantic_trigger.py deleted file mode 100644 index c37076e4a..000000000 --- a/frigate/data_processing/real_time/semantic_trigger.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Real time processor to trigger alerts by matching embeddings.""" - -import datetime -import logging -from typing import Any - -import cv2 -import numpy as np - -from frigate.comms.inter_process import InterProcessRequestor -from frigate.config import FrigateConfig -from frigate.config.classification import CameraSemanticSearchConfig -from frigate.util.builtin import EventsPerSecond, InferenceSpeed - -from ..types import DataProcessorMetrics -from .api import RealTimeProcessorApi - -logger = logging.getLogger(__name__) - - -class SemanticTriggerProcessor(RealTimeProcessorApi): - def __init__( - self, - config: FrigateConfig, - trigger_config: CameraSemanticSearchConfig, - requestor: InterProcessRequestor, - metrics: DataProcessorMetrics, - embeddings, - ): - super().__init__(config, metrics) - self.embeddings = embeddings - self.trigger_config = trigger_config - self.requestor = requestor - self.image_inference_speed = InferenceSpeed(self.metrics.image_embeddings_speed) - self.image_eps = EventsPerSecond() - self.text_inference_speed = InferenceSpeed(self.metrics.text_embeddings_speed) - self.text_eps = EventsPerSecond() - self.trigger_embeddings: list[np.ndarray] = [] - self.last_run = datetime.datetime.now().timestamp() - self.__generate_trigger_embeddings() - - def __generate_trigger_embeddings(self) -> None: - self.image_eps.start() - self.text_eps.start() - for trigger in self.trigger_config.triggers: - embedding = self.embeddings.embed_description(None, trigger, upsert=False) - self.trigger_embeddings.append(embedding) - - def __update_metrics(self, duration: float) -> None: - self.image_eps.update() - self.image_inference_speed.update(duration) - - def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray): - # self.metrics.classification_cps[ - # self.model_config.name - # ].value = self.classifications_per_second.eps() - camera = frame_data.get("camera") - - now = datetime.datetime.now().timestamp() - - rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420) - img_embedding = self.embeddings.embed_thumbnail(None, rgb, upsert=False) - self.__update_metrics(datetime.datetime.now().timestamp() - now) - - if camera != "framecache": - return - - for trigger_embedding in self.trigger_embeddings: - for trigger in self.trigger_config.triggers: - dot_product = np.dot(img_embedding, trigger_embedding) - norm_img_embedding = np.linalg.norm(img_embedding) - norm_trigger_embedding = np.linalg.norm(trigger_embedding) - logger.info( - f"{camera}: Cosine similarity is {dot_product / (norm_img_embedding * norm_trigger_embedding)}" - ) - - def handle_request(self, topic, request_data): - return None - - def expire_object(self, object_id, camera): - pass diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 12b42c7bb..f3acbbcf7 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -293,3 +293,9 @@ class EmbeddingsContext: EmbeddingsRequestEnum.embed_description.value, {"id": None, "description": text, "upsert": False}, ) + + def generate_image_embedding(self, event_id: str, thumbnail: bytes) -> None: + return self.requestor.send_data( + EmbeddingsRequestEnum.embed_thumbnail.value, + {"id": str(event_id), "thumbnail": str(thumbnail), "upsert": False}, + ) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 6df167495..646292470 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -14,7 +14,10 @@ import numpy as np from peewee import DoesNotExist from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum -from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder +from frigate.comms.embeddings_updater import ( + EmbeddingsRequestEnum, + EmbeddingsResponder, +) from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, EventMetadataSubscriber, @@ -46,6 +49,7 @@ from frigate.data_processing.post.audio_transcription import ( from frigate.data_processing.post.license_plate import ( LicensePlatePostProcessor, ) +from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor from frigate.data_processing.real_time.api import RealTimeProcessorApi from frigate.data_processing.real_time.bird import BirdRealTimeProcessor from frigate.data_processing.real_time.custom_classification import ( @@ -56,12 +60,11 @@ from frigate.data_processing.real_time.face import FaceRealTimeProcessor from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, ) -from frigate.data_processing.real_time.semantic_trigger import SemanticTriggerProcessor from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.genai import get_genai_client -from frigate.models import Event, Recordings +from frigate.models import Event, Recordings, Trigger from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize from frigate.util.image import ( @@ -110,7 +113,7 @@ class EmbeddingMaintainer(threading.Thread): ), load_vec_extension=True, ) - models = [Event, Recordings] + models = [Event, Recordings, Trigger] db.bind(models) if config.semantic_search.enabled: @@ -120,6 +123,8 @@ class EmbeddingMaintainer(threading.Thread): if config.semantic_search.reindex: self.embeddings.reindex() + # TODO: sync triggers + # create communication for updating event descriptions self.requestor = InterProcessRequestor() @@ -189,16 +194,6 @@ class EmbeddingMaintainer(threading.Thread): ) ) - self.realtime_processors.append( - SemanticTriggerProcessor( - self.config, - self.config.cameras["orlandocam"].semantic_search, - self.requestor, - metrics, - self.embeddings, - ) - ) - # post processors self.post_processors: list[PostProcessorApi] = [] @@ -222,6 +217,17 @@ class EmbeddingMaintainer(threading.Thread): AudioTranscriptionPostProcessor(self.config, self.requestor, metrics) ) + if self.config.semantic_search.enabled: + self.post_processors.append( + SemanticTriggerProcessor( + db, + self.config, + self.requestor, + metrics, + self.embeddings, + ) + ) + self.stop_event = stop_event self.tracked_events: dict[str, list[Any]] = {} self.early_request_sent: dict[str, bool] = {} @@ -398,33 +404,6 @@ class EmbeddingMaintainer(threading.Thread): event_id, camera, updated_db = ended camera_config = self.config.cameras[camera] - # call any defined post processors - for processor in self.post_processors: - if isinstance(processor, LicensePlatePostProcessor): - recordings_available = self.recordings_available_through.get(camera) - if ( - recordings_available is not None - and event_id in self.detected_license_plates - and self.config.cameras[camera].type != "lpr" - ): - processor.process_data( - { - "event_id": event_id, - "camera": camera, - "recordings_available": self.recordings_available_through[ - camera - ], - "obj_data": self.detected_license_plates[event_id][ - "obj_data" - ], - }, - PostProcessDataEnum.recording, - ) - elif isinstance(processor, AudioTranscriptionPostProcessor): - continue - else: - processor.process_data(event_id, PostProcessDataEnum.event_id) - # expire in realtime processors for processor in self.realtime_processors: processor.expire_object(event_id, camera) @@ -461,6 +440,41 @@ class EmbeddingMaintainer(threading.Thread): ): self._process_genai_description(event, camera_config, thumbnail) + # call any defined post processors + for processor in self.post_processors: + if isinstance(processor, LicensePlatePostProcessor): + recordings_available = self.recordings_available_through.get(camera) + if ( + recordings_available is not None + and event_id in self.detected_license_plates + and self.config.cameras[camera].type != "lpr" + ): + processor.process_data( + { + "event_id": event_id, + "camera": camera, + "recordings_available": self.recordings_available_through[ + camera + ], + "obj_data": self.detected_license_plates[event_id][ + "obj_data" + ], + }, + PostProcessDataEnum.recording, + ) + elif isinstance(processor, AudioTranscriptionPostProcessor): + continue + elif isinstance(processor, SemanticTriggerProcessor): + processor.process_data( + {"event_id": event_id, "camera": camera, "type": "image"}, + PostProcessDataEnum.tracked_object, + ) + else: + processor.process_data( + {"event_id": event_id, "camera": camera}, + PostProcessDataEnum.tracked_object, + ) + # Delete tracked events based on the event_id if event_id in self.tracked_events: del self.tracked_events[event_id] @@ -669,6 +683,16 @@ class EmbeddingMaintainer(threading.Thread): if self.config.semantic_search.enabled: self.embeddings.embed_description(event.id, description) + # Check semantic trigger for this description + for processor in self.post_processors: + if isinstance(processor, SemanticTriggerProcessor): + processor.process_data( + {"event_id": event.id, "camera": event.camera, "type": "text"}, + PostProcessDataEnum.tracked_object, + ) + else: + continue + logger.debug( "Generated description for %s (%d images): %s", event.id, diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index d4f8d7e37..5ab29a6ea 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -428,3 +428,19 @@ def sanitize_float(value): if isinstance(value, (int, float)) and not math.isfinite(value): return 0.0 return value + + +def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: + return 1 - cosine_distance(a, b) + + +def cosine_distance(a: np.ndarray, b: np.ndarray) -> float: + """Returns cosine distance to match sqlite-vec's calculation.""" + dot = np.dot(a, b) + a_mag = np.dot(a, a) # ||a||^2 + b_mag = np.dot(b, b) # ||b||^2 + + if a_mag == 0 or b_mag == 0: + return 1.0 + + return 1.0 - (dot / (np.sqrt(a_mag) * np.sqrt(b_mag)))