From 3b529ede36a16037f7f9e4e3f6cc6d479fa2e149 Mon Sep 17 00:00:00 2001 From: Jason Hunter Date: Mon, 18 Dec 2023 02:12:53 -0500 Subject: [PATCH] Initial vector similarity implementation --- .devcontainer/devcontainer.json | 6 +- docker/main/Dockerfile | 5 +- docker/main/requirements-wheels.txt | 6 + .../s6-rc.d/chroma-log/consumer-for | 1 + .../chroma-log/dependencies.d/log-prepare | 0 .../s6-rc.d/chroma-log/pipeline-name | 1 + .../etc/s6-overlay/s6-rc.d/chroma-log/run | 4 + .../etc/s6-overlay/s6-rc.d/chroma-log/type | 1 + .../s6-rc.d/chroma/dependencies.d/base | 0 .../etc/s6-overlay/s6-rc.d/chroma/finish | 28 +++ .../s6-overlay/s6-rc.d/chroma/producer-for | 1 + .../rootfs/etc/s6-overlay/s6-rc.d/chroma/run | 16 ++ .../s6-overlay/s6-rc.d/chroma/timeout-kill | 1 + .../rootfs/etc/s6-overlay/s6-rc.d/chroma/type | 1 + .../s6-rc.d/frigate/dependencies.d/chroma | 0 .../etc/s6-overlay/s6-rc.d/log-prepare/run | 2 +- docker/main/rootfs/usr/local/chroma | 14 ++ frigate/__main__.py | 5 + frigate/app.py | 14 ++ frigate/config.py | 38 ++++ frigate/const.py | 1 + frigate/embeddings/__init__.py | 0 frigate/embeddings/functions/clip.py | 62 +++++++ frigate/embeddings/functions/minilm_l6_v2.py | 12 ++ frigate/embeddings/processor.py | 170 ++++++++++++++++++ frigate/events/maintainer.py | 3 + frigate/http.py | 68 ++++++- web-old/src/components/TextField.jsx | 2 +- web-old/src/routes/Events.jsx | 19 +- 29 files changed, 475 insertions(+), 6 deletions(-) create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/dependencies.d/log-prepare create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name create mode 100755 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/dependencies.d/base create mode 100755 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for create mode 100755 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type create mode 100644 docker/main/rootfs/etc/s6-overlay/s6-rc.d/frigate/dependencies.d/chroma create mode 100755 docker/main/rootfs/usr/local/chroma create mode 100644 frigate/embeddings/__init__.py create mode 100644 frigate/embeddings/functions/clip.py create mode 100644 frigate/embeddings/functions/minilm_l6_v2.py create mode 100644 frigate/embeddings/processor.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 0c460cfad..deb90c3d7 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -10,7 +10,7 @@ "features": { "ghcr.io/devcontainers/features/common-utils:1": {} }, - "forwardPorts": [5000, 5001, 5173, 1935, 8554, 8555], + "forwardPorts": [5000, 5001, 5173, 1935, 8000, 8554, 8555], "portsAttributes": { "5000": { "label": "NGINX", @@ -28,6 +28,10 @@ "label": "RTMP", "onAutoForward": "silent" }, + "8000": { + "label": "Chroma", + "onAutoForward": "silent" + }, "8554": { "label": "gortc RTSP", "onAutoForward": "silent" diff --git a/docker/main/Dockerfile b/docker/main/Dockerfile index 32cb72df3..0d0607c95 100644 --- a/docker/main/Dockerfile +++ b/docker/main/Dockerfile @@ -8,7 +8,7 @@ ARG SLIM_BASE=debian:11-slim FROM ${BASE_IMAGE} AS base -FROM --platform=${BUILDPLATFORM} debian:11 AS base_host +FROM --platform=${BUILDPLATFORM} ${BASE_IMAGE} AS base_host FROM ${SLIM_BASE} AS slim-base @@ -176,6 +176,9 @@ ARG APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=DontWarn ENV NVIDIA_VISIBLE_DEVICES=all ENV NVIDIA_DRIVER_CAPABILITIES="compute,video,utility" +# Turn off Chroma Telemetry: https://docs.trychroma.com/telemetry#opting-out +ENV ANONYMIZED_TELEMETRY=False + ENV PATH="/usr/lib/btbn-ffmpeg/bin:/usr/local/go2rtc/bin:/usr/local/nginx/sbin:${PATH}" # Install dependencies diff --git a/docker/main/requirements-wheels.txt b/docker/main/requirements-wheels.txt index f4167744e..8319dc397 100644 --- a/docker/main/requirements-wheels.txt +++ b/docker/main/requirements-wheels.txt @@ -27,3 +27,9 @@ unidecode == 1.3.* # Openvino Library - Custom built with MYRIAD support openvino @ https://github.com/NateMeyer/openvino-wheels/releases/download/multi-arch_2022.3.1/openvino-2022.3.1-1-cp39-cp39-manylinux_2_31_x86_64.whl; platform_machine == 'x86_64' openvino @ https://github.com/NateMeyer/openvino-wheels/releases/download/multi-arch_2022.3.1/openvino-2022.3.1-1-cp39-cp39-linux_aarch64.whl; platform_machine == 'aarch64' +# Embeddings +onnxruntime == 1.16.* +onnx_clip == 4.0.* +pysqlite3-binary == 0.5.2 +chromadb == 0.4.20 +google-generativeai == 0.3.* \ No newline at end of file diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for new file mode 100644 index 000000000..d0509d5a2 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/consumer-for @@ -0,0 +1 @@ +chroma diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/dependencies.d/log-prepare b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/dependencies.d/log-prepare new file mode 100644 index 000000000..e69de29bb diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name new file mode 100644 index 000000000..10f0be663 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/pipeline-name @@ -0,0 +1 @@ +chroma-pipeline diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run new file mode 100755 index 000000000..2e47fd3eb --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/run @@ -0,0 +1,4 @@ +#!/command/with-contenv bash +# shellcheck shell=bash + +exec logutil-service /dev/shm/logs/chroma diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type new file mode 100644 index 000000000..5883cff0c --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma-log/type @@ -0,0 +1 @@ +longrun diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/dependencies.d/base b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/dependencies.d/base new file mode 100644 index 000000000..e69de29bb diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish new file mode 100755 index 000000000..b6206b4cc --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/finish @@ -0,0 +1,28 @@ +#!/command/with-contenv bash +# shellcheck shell=bash +# Take down the S6 supervision tree when the service exits + +set -o errexit -o nounset -o pipefail + +# Logs should be sent to stdout so that s6 can collect them + +declare exit_code_container +exit_code_container=$(cat /run/s6-linux-init-container-results/exitcode) +readonly exit_code_container +readonly exit_code_service="${1}" +readonly exit_code_signal="${2}" +readonly service="ChromaDB" + +echo "[INFO] Service ${service} exited with code ${exit_code_service} (by signal ${exit_code_signal})" + +if [[ "${exit_code_service}" -eq 256 ]]; then + if [[ "${exit_code_container}" -eq 0 ]]; then + echo $((128 + exit_code_signal)) >/run/s6-linux-init-container-results/exitcode + fi +elif [[ "${exit_code_service}" -ne 0 ]]; then + if [[ "${exit_code_container}" -eq 0 ]]; then + echo "${exit_code_service}" >/run/s6-linux-init-container-results/exitcode + fi +fi + +exec /run/s6/basedir/bin/halt diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for new file mode 100644 index 000000000..c17b71e87 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/producer-for @@ -0,0 +1 @@ +chroma-log diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run new file mode 100755 index 000000000..45b44779b --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/run @@ -0,0 +1,16 @@ +#!/command/with-contenv bash +# shellcheck shell=bash +# Start the Frigate service + +set -o errexit -o nounset -o pipefail + +# Logs should be sent to stdout so that s6 can collect them + +# Tell S6-Overlay not to restart this service +s6-svc -O . + +echo "[INFO] Starting ChromaDB..." + +# Replace the bash process with the Frigate process, redirecting stderr to stdout +exec 2>&1 +exec /usr/local/chroma run --path /config/chroma --host 127.0.0.1 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill new file mode 100644 index 000000000..6f4f41844 --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/timeout-kill @@ -0,0 +1 @@ +120000 diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type new file mode 100644 index 000000000..5883cff0c --- /dev/null +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/chroma/type @@ -0,0 +1 @@ +longrun diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/frigate/dependencies.d/chroma b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/frigate/dependencies.d/chroma new file mode 100644 index 000000000..e69de29bb diff --git a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run index 0d8d73ce2..6d5398c6f 100755 --- a/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run +++ b/docker/main/rootfs/etc/s6-overlay/s6-rc.d/log-prepare/run @@ -4,7 +4,7 @@ set -o errexit -o nounset -o pipefail -dirs=(/dev/shm/logs/frigate /dev/shm/logs/go2rtc /dev/shm/logs/nginx) +dirs=(/dev/shm/logs/frigate /dev/shm/logs/go2rtc /dev/shm/logs/nginx /dev/shm/logs/chroma) mkdir -p "${dirs[@]}" chown nobody:nogroup "${dirs[@]}" diff --git a/docker/main/rootfs/usr/local/chroma b/docker/main/rootfs/usr/local/chroma new file mode 100755 index 000000000..5147db387 --- /dev/null +++ b/docker/main/rootfs/usr/local/chroma @@ -0,0 +1,14 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*-s +__import__("pysqlite3") + +import re +import sys + +sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") + +from chromadb.cli.cli import app + +if __name__ == "__main__": + sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0]) + sys.exit(app()) diff --git a/frigate/__main__.py b/frigate/__main__.py index 844206908..276589f17 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,3 +1,8 @@ +__import__("pysqlite3") +import sys + +sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") + import faulthandler import threading diff --git a/frigate/app.py b/frigate/app.py index 8ad337b1f..cad2a551d 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -31,6 +31,7 @@ from frigate.const import ( MODEL_CACHE_DIR, RECORD_DIR, ) +from frigate.embeddings.processor import EmbeddingProcessor from frigate.events.audio import listen_to_audio from frigate.events.cleanup import EventCleanup from frigate.events.external import ExternalEventProcessor @@ -273,6 +274,9 @@ class FrigateApp: # Queue for timeline events self.timeline_queue: Queue = mp.Queue() + # Queue for embeddings process + self.embeddings_queue: Queue = mp.Queue() + # Queue for inter process communication self.inter_process_queue: Queue = mp.Queue() @@ -584,6 +588,12 @@ class FrigateApp: ) self.timeline_processor.start() + def start_embeddings_processor(self) -> None: + self.embeddings_processor = EmbeddingProcessor( + self.config, self.embeddings_queue, self.stop_event + ) + self.embeddings_processor.start() + def start_event_processor(self) -> None: self.event_processor = EventProcessor( self.config, @@ -591,6 +601,7 @@ class FrigateApp: self.event_queue, self.event_processed_queue, self.timeline_queue, + self.embeddings_queue, self.stop_event, ) self.event_processor.start() @@ -700,6 +711,7 @@ class FrigateApp: self.init_external_event_processor() self.init_web_server() self.start_timeline_processor() + self.start_embeddings_processor() self.start_event_processor() self.start_event_cleanup() self.start_record_cleanup() @@ -742,6 +754,7 @@ class FrigateApp: self.record_cleanup.join() self.stats_emitter.join() self.frigate_watchdog.join() + self.embeddings_processor.join() self.db.stop() while len(self.detection_shms) > 0: @@ -758,6 +771,7 @@ class FrigateApp: self.audio_recordings_info_queue, self.log_queue, self.inter_process_queue, + self.embeddings_queue, ]: if queue is not None: while not queue.empty(): diff --git a/frigate/config.py b/frigate/config.py index 3f68f302a..4c5f510dc 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -680,6 +680,23 @@ class SnapshotsConfig(FrigateBaseModel): ) +class SemanticSearchConfig(FrigateBaseModel): + enabled: bool = Field(default=False, title="Enable semantic search.") + + +class GeminiConfig(FrigateBaseModel): + enabled: bool = Field(default=False, title="Enable Google Gemini captioning.") + override_existing: bool = Field( + default=False, title="Override existing sub labels." + ) + api_key: str = Field(default="", title="Google AI Studio API Key.") + prompt: str = Field( + default="Describe the {label} in this image with as much detail as possible. Do not describe the background.", + title="Default caption prompt.", + ) + object_prompts: Dict[str, str] = Field(default={}, title="Object specific prompts.") + + class ColorConfig(FrigateBaseModel): red: int = Field(default=255, ge=0, le=255, title="Red") green: int = Field(default=255, ge=0, le=255, title="Green") @@ -783,6 +800,9 @@ class CameraConfig(FrigateBaseModel): onvif: OnvifConfig = Field( default_factory=OnvifConfig, title="Camera Onvif Configuration." ) + gemini: GeminiConfig = Field( + default_factory=GeminiConfig, title="Google Gemini Configuration." + ) ui: CameraUiConfig = Field( default_factory=CameraUiConfig, title="Camera UI Modifications." ) @@ -1051,6 +1071,12 @@ class FrigateConfig(FrigateBaseModel): snapshots: SnapshotsConfig = Field( default_factory=SnapshotsConfig, title="Global snapshots configuration." ) + semantic_search: SemanticSearchConfig = Field( + default_factory=SemanticSearchConfig, title="Semantic Search configuration." + ) + gemini: GeminiConfig = Field( + default_factory=GeminiConfig, title="Global Google Gemini Configuration." + ) live: CameraLiveConfig = Field( default_factory=CameraLiveConfig, title="Live playback settings." ) @@ -1090,6 +1116,10 @@ class FrigateConfig(FrigateBaseModel): config.mqtt.user = config.mqtt.user.format(**FRIGATE_ENV_VARS) config.mqtt.password = config.mqtt.password.format(**FRIGATE_ENV_VARS) + # Gemini API Key substitutions + if config.gemini.api_key: + config.gemini.api_key = config.gemini.api_key.format(**FRIGATE_ENV_VARS) + # set default min_score for object attributes for attribute in ALL_ATTRIBUTE_LABELS: if not config.objects.filters.get(attribute): @@ -1110,6 +1140,7 @@ class FrigateConfig(FrigateBaseModel): "detect": ..., "ffmpeg": ..., "timestamp_style": ..., + "gemini": ..., }, exclude_unset=True, ) @@ -1176,6 +1207,13 @@ class FrigateConfig(FrigateBaseModel): camera_config.onvif.password = camera_config.onvif.password.format( **FRIGATE_ENV_VARS ) + + # Gemini substitution + if camera_config.gemini.api_key: + camera_config.gemini.api_key = camera_config.gemini.api_key.format( + **FRIGATE_ENV_VARS + ) + # set config pre-value camera_config.record.enabled_in_config = camera_config.record.enabled camera_config.audio.enabled_in_config = camera_config.audio.enabled diff --git a/frigate/const.py b/frigate/const.py index 28bc95f2e..dfeb987e6 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -1,6 +1,7 @@ CONFIG_DIR = "/config" DEFAULT_DB_PATH = f"{CONFIG_DIR}/frigate.db" MODEL_CACHE_DIR = f"{CONFIG_DIR}/model_cache" +DEFAULT_CHROMA_DB_PATH = f"{CONFIG_DIR}/chroma" BASE_DIR = "/media/frigate" CLIPS_DIR = f"{BASE_DIR}/clips" RECORD_DIR = f"{BASE_DIR}/recordings" diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/frigate/embeddings/functions/clip.py b/frigate/embeddings/functions/clip.py new file mode 100644 index 000000000..5344e67c1 --- /dev/null +++ b/frigate/embeddings/functions/clip.py @@ -0,0 +1,62 @@ +"""CLIP Embeddings for Frigate.""" +import os +from typing import Tuple, Union + +import onnxruntime as ort +from chromadb import EmbeddingFunction, Embeddings +from chromadb.api.types import ( + Documents, + Images, + is_document, + is_image, +) +from onnx_clip import OnnxClip + +from frigate.const import MODEL_CACHE_DIR + + +class Clip(OnnxClip): + """Override load models to download to cache directory.""" + + @staticmethod + def _load_models( + model: str, + silent: bool, + ) -> Tuple[ort.InferenceSession, ort.InferenceSession]: + """ + These models are a part of the container. Treat as as such. + """ + if model == "ViT-B/32": + IMAGE_MODEL_FILE = "clip_image_model_vitb32.onnx" + TEXT_MODEL_FILE = "clip_text_model_vitb32.onnx" + elif model == "RN50": + IMAGE_MODEL_FILE = "clip_image_model_rn50.onnx" + TEXT_MODEL_FILE = "clip_text_model_rn50.onnx" + else: + raise ValueError(f"Unexpected model {model}. No `.onnx` file found.") + + models = [] + for model_file in [IMAGE_MODEL_FILE, TEXT_MODEL_FILE]: + path = os.path.join(MODEL_CACHE_DIR, "clip", model_file) + models.append(OnnxClip._load_model(path, silent)) + + return models[0], models[1] + + +class ClipEmbedding(EmbeddingFunction): + """Embedding function for CLIP model used in Chroma.""" + + def __init__(self, model: str = "ViT-B/32"): + """Initialize CLIP Embedding function.""" + self.model = Clip(model) + + def __call__(self, input: Union[Documents, Images]) -> Embeddings: + embeddings: Embeddings = [] + for item in input: + if is_image(item): + result = self.model.get_image_embeddings([item]) + embeddings.append(result[0, :].tolist()) + elif is_document(item): + result = self.model.get_text_embeddings([item]) + embeddings.append(result[0, :].tolist()) + return embeddings diff --git a/frigate/embeddings/functions/minilm_l6_v2.py b/frigate/embeddings/functions/minilm_l6_v2.py new file mode 100644 index 000000000..8e3037c28 --- /dev/null +++ b/frigate/embeddings/functions/minilm_l6_v2.py @@ -0,0 +1,12 @@ +"""Embedding function for ONNX MiniLM-L6 model used in Chroma.""" + + +from chromadb.utils.embedding_functions import ONNXMiniLM_L6_V2 + +from frigate.const import MODEL_CACHE_DIR + + +class MiniLMEmbedding(ONNXMiniLM_L6_V2): + """Override DOWNLOAD_PATH to download to cache directory.""" + + DOWNLOAD_PATH = f"{MODEL_CACHE_DIR}/all-MiniLM-L6-v2" diff --git a/frigate/embeddings/processor.py b/frigate/embeddings/processor.py new file mode 100644 index 000000000..b58bfdb4e --- /dev/null +++ b/frigate/embeddings/processor.py @@ -0,0 +1,170 @@ +"""Create a Chroma vector database for semantic search.""" + +import base64 +import io +import logging +import queue +import threading +from multiprocessing import Queue +from multiprocessing.synchronize import Event as MpEvent + +import google.generativeai as genai +import numpy as np +from chromadb import Collection +from chromadb import HttpClient as ChromaClient +from chromadb.config import Settings +from peewee import DoesNotExist +from PIL import Image +from playhouse.shortcuts import model_to_dict + +from frigate.config import FrigateConfig +from frigate.models import Event + +from .functions.clip import ClipEmbedding +from .functions.minilm_l6_v2 import MiniLMEmbedding + +logger = logging.getLogger(__name__) + + +class EmbeddingProcessor(threading.Thread): + """Handle gemini queue and post event updates.""" + + def __init__( + self, + config: FrigateConfig, + queue: Queue, + stop_event: MpEvent, + ) -> None: + threading.Thread.__init__(self) + self.name = "chroma" + self.config = config + self.queue = queue + self.stop_event = stop_event + self.chroma: ChromaClient = None + self.thumbnail: Collection = None + self.description: Collection = None + self.gemini: genai.GenerativeModel = None + + def run(self) -> None: + """Maintain a Chroma vector database for semantic search.""" + + # Exit if disabled + if not self.config.semantic_search.enabled: + return + + # Create the database + self.chroma = ChromaClient(settings=Settings(anonymized_telemetry=False)) + + # Create/Load the collection(s) + self.thumbnail = self.chroma.get_or_create_collection( + name="event_thumbnail", embedding_function=ClipEmbedding() + ) + self.description = self.chroma.get_or_create_collection( + name="event_description", embedding_function=MiniLMEmbedding() + ) + + ## Initialize Gemini + if self.config.gemini.enabled: + genai.configure(api_key=self.config.gemini.api_key) + self.gemini = genai.GenerativeModel("gemini-pro-vision") + + # Process events + while not self.stop_event.is_set(): + try: + ( + event_id, + camera, + ) = self.queue.get(timeout=1) + except queue.Empty: + continue + + camera_config = self.config.cameras[camera] + + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + continue + + # Extract valid event metadata + metadata = { + k: v + for k, v in model_to_dict(event).items() + if k not in ["id", "thumbnail"] + and v is not None + and isinstance(v, (str, int, float, bool)) + } + thumbnail = base64.b64decode(event.thumbnail) + + # Encode the thumbnail + self._embed_thumbnail(event.id, thumbnail, metadata) + + # Skip if we aren't generating descriptions with Gemini + if not camera_config.gemini.enabled or ( + not camera_config.gemini.override_existing + and event.data.get("description") is not None + ): + continue + + # Generate the description. Call happens in a thread since it is network bound. + threading.Thread( + target=self._embed_description, + name=f"_embed_description_{event.id}", + daemon=True, + args=( + event, + thumbnail, + metadata, + ), + ).start() + + def _embed_thumbnail(self, event_id: str, thumbnail: bytes, metadata: dict) -> None: + """Embed the thumbnail for an event.""" + + # Encode the thumbnail + img = np.array(Image.open(io.BytesIO(thumbnail)).convert("RGB")) + self.thumbnail.add( + images=[img], + metadatas=[metadata], + ids=[event_id], + ) + + def _embed_description( + self, event: Event, thumbnail: bytes, metadata: dict + ) -> None: + """Embed the description for an event.""" + + content = { + "mime_type": "image/jpeg", + "data": thumbnail, + } + + # Fetch the prompt from the config and format the string replacing variables from the event + prompt = self.config.gemini.object_prompts.get( + event.label, self.config.gemini.prompt + ).format(**metadata) + + response = self.gemini.generate_content( + [content, prompt], + generation_config=genai.types.GenerationConfig( + candidate_count=1, + ), + ) + + try: + description = response.text.strip() + except ValueError: + # No description was generated + return + + # Update the event to add the description + event.data["description"] = description + event.save() + + # Encode the description + self.description.add( + documents=[description], + metadatas=[metadata], + ids=[event.id], + ) + + logger.info("Generated description for %s: %s", event.id, description) diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index 19bb44ef4..b9d0c55cd 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -62,6 +62,7 @@ class EventProcessor(threading.Thread): event_queue: Queue, event_processed_queue: Queue, timeline_queue: Queue, + embeddings_queue: Queue, stop_event: MpEvent, ): threading.Thread.__init__(self) @@ -71,6 +72,7 @@ class EventProcessor(threading.Thread): self.event_queue = event_queue self.event_processed_queue = event_processed_queue self.timeline_queue = timeline_queue + self.embeddings_queue = embeddings_queue self.events_in_process: Dict[str, Event] = {} self.stop_event = stop_event @@ -240,6 +242,7 @@ class EventProcessor(threading.Thread): if event_type == "end": del self.events_in_process[event_data["id"]] self.event_processed_queue.put((event_data["id"], camera)) + self.embeddings_queue.put((event_data["id"], camera)) def handle_external_detection(self, event_type: str, event_data: Event) -> None: if event_type == "new": diff --git a/frigate/http.py b/frigate/http.py index 6b7ff8c3a..5cded11b6 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -17,6 +17,9 @@ import cv2 import numpy as np import pytz import requests +from chromadb import Collection, QueryResult +from chromadb import HttpClient as ChromaClient +from chromadb.config import Settings from flask import ( Blueprint, Flask, @@ -42,6 +45,8 @@ from frigate.const import ( MAX_SEGMENT_DURATION, RECORD_DIR, ) +from frigate.embeddings.functions.clip import ClipEmbedding +from frigate.embeddings.functions.minilm_l6_v2 import MiniLMEmbedding from frigate.events.external import ExternalEventProcessor from frigate.models import Event, Previews, Recordings, Regions, Timeline from frigate.object_processing import TrackedObject @@ -103,6 +108,13 @@ def create_app( app.plus_api = plus_api app.camera_error_image = None app.hwaccel_errors = [] + app.chroma = ChromaClient(settings=Settings(anonymized_telemetry=False)) + app.thumbnail_collection = app.chroma.get_or_create_collection( + name="event_thumbnail", embedding_function=ClipEmbedding() + ) + app.description_collection = app.chroma.get_or_create_collection( + name="event_description", embedding_function=MiniLMEmbedding() + ) app.register_blueprint(bp) @@ -998,6 +1010,7 @@ def events(): is_submitted = request.args.get("is_submitted", type=int) min_length = request.args.get("min_length", type=float) max_length = request.args.get("max_length", type=float) + search = request.args.get("search", type=str) or None clauses = [] @@ -1019,16 +1032,24 @@ def events(): Event.data, ] + # Start collecting filters for the embeddings metadata. + # We won't be able to do all the filters like we do against the DB + # because the table might have been modified after, but we should + # do what we can. + embeddings_filters = [] + if camera != "all": clauses.append((Event.camera == camera)) if cameras != "all": camera_list = cameras.split(",") clauses.append((Event.camera << camera_list)) + embeddings_filters.append({"camera": {"$in": camera_list}}) if labels != "all": label_list = labels.split(",") clauses.append((Event.label << label_list)) + embeddings_filters.append({"label": {"$in": label_list}}) if sub_labels != "all": # use matching so joined sub labels are included @@ -1071,9 +1092,11 @@ def events(): if after: clauses.append((Event.start_time > after)) + embeddings_filters.append({"start_time": {"$gt": after}}) if before: clauses.append((Event.start_time < before)) + embeddings_filters.append({"start_time": {"$lt": before}}) if time_range != DEFAULT_TIME_RANGE: # get timezone arg to ensure browser times are used @@ -1141,6 +1164,40 @@ def events(): if len(clauses) == 0: clauses.append((True)) + # Handle semantic search + event_order = None + if search is not None: + where = None + if len(embeddings_filters) > 1: + where = {"$and": embeddings_filters} + elif len(embeddings_filters) == 1: + where = embeddings_filters[0] + + # Grab the ids of the events that match based on CLIP embeddings + thumbnails: Collection = current_app.thumbnail_collection + thumb_result: QueryResult = thumbnails.query( + query_texts=[search], + n_results=int(limit), + where=where, + ) + thumb_ids = dict(zip(thumb_result["ids"][0], thumb_result["distances"][0])) + + # Grab the ids of the events that match based on MiniLM embeddings + descriptions: Collection = current_app.description_collection + desc_result: QueryResult = descriptions.query( + query_texts=[search], + n_results=int(limit), + where=where, + ) + desc_ids = dict(zip(desc_result["ids"][0], desc_result["distances"][0])) + + event_order = { + k: min(i for i in (thumb_ids.get(k), desc_ids.get(k)) if i is not None) + for k in thumb_ids.keys() | desc_ids + } + + clauses.append((Event.id << list(event_order.keys()))) + events = ( Event.select(*selected_columns) .where(reduce(operator.and_, clauses)) @@ -1149,8 +1206,16 @@ def events(): .dicts() .iterator() ) + events = list(events) - return jsonify(list(events)) + if event_order is not None: + events = [ + {**events, "search_similarity": event_order[events["id"]]} + for events in events + ] + events = sorted(events, key=lambda x: x["search_similarity"]) + + return jsonify(events) @bp.route("/events//