From abe4ce43f3200d5aba467af80b5293807900b6d3 Mon Sep 17 00:00:00 2001 From: George Tsiamasiotis Date: Tue, 1 Oct 2024 09:03:32 +0300 Subject: [PATCH] Merge EmbeddingsMaintainer --- frigate/app.py | 10 ++---- frigate/embeddings/__init__.py | 58 ------------------------------ frigate/embeddings/maintainer.py | 61 ++++++++++++++++++++------------ 3 files changed, 42 insertions(+), 87 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 587f592ae..c211fdf64 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -36,7 +36,8 @@ from frigate.const import ( MODEL_CACHE_DIR, RECORD_DIR, ) -from frigate.embeddings import EmbeddingsContext, manage_embeddings +from frigate.embeddings import EmbeddingsContext +from frigate.embeddings.maintainer import EmbeddingsMaintainer from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.external import ExternalEventProcessor @@ -195,12 +196,7 @@ class FrigateApp: # Create a client for other processes to use self.embeddings = EmbeddingsContext() - embedding_process = util.Process( - target=manage_embeddings, - name="embeddings_manager", - args=(self.config,), - daemon=True, - ) + embedding_process = EmbeddingsMaintainer(self.config) self.embedding_process = embedding_process embedding_process.start() self.processes["embeddings"] = embedding_process.pid or 0 diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index b3ad22874..f3a41a11d 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -1,70 +1,12 @@ """ChromaDB embeddings database.""" import json -import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional -from playhouse.sqliteq import SqliteQueueDatabase -from setproctitle import setproctitle - -from frigate.config import FrigateConfig from frigate.const import CONFIG_DIR -from frigate.models import Event -from frigate.util.services import listen from .embeddings import Embeddings -from .maintainer import EmbeddingMaintainer from .util import ZScoreNormalization -logger = logging.getLogger(__name__) - - -def manage_embeddings(config: FrigateConfig) -> None: - # Only initialize embeddings if semantic search is enabled - if not config.semantic_search.enabled: - return - - stop_event = mp.Event() - - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:embeddings_manager" - setproctitle("frigate.embeddings_manager") - listen() - - # Configure Frigate DB - db = SqliteQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - ) - models = [Event] - db.bind(models) - - embeddings = Embeddings() - - # Check if we need to re-index events - if config.semantic_search.reindex: - embeddings.reindex() - - maintainer = EmbeddingMaintainer( - config, - stop_event, - ) - maintainer.start() - class EmbeddingsContext: def __init__(self): diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index cbe4554ce..f955c9e0f 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -2,16 +2,16 @@ import base64 import io -import logging import threading -from multiprocessing.synchronize import Event as MpEvent from typing import Optional import cv2 import numpy as np from peewee import DoesNotExist from PIL import Image +from playhouse.sqliteq import SqliteQueueDatabase +from frigate import util from frigate.comms.event_metadata_updater import ( EventMetadataSubscriber, EventMetadataTypeEnum, @@ -27,20 +27,38 @@ from frigate.util.image import SharedMemoryFrameManager, calculate_region from .embeddings import Embeddings, get_metadata -logger = logging.getLogger(__name__) +class EmbeddingsMaintainer(util.Process): + """Maintain a Chroma vector database for semantic search.""" -class EmbeddingMaintainer(threading.Thread): - """Handle embedding queue and post event updates.""" - - def __init__( - self, - config: FrigateConfig, - stop_event: MpEvent, - ) -> None: - threading.Thread.__init__(self) - self.name = "embeddings_maintainer" + def __init__(self, config: FrigateConfig): + super().__init__(name="frigate.embeddings_manager", daemon=True) self.config = config + + def run(self): + # Only initialize embeddings if semantic search is enabled + if not self.config.semantic_search.enabled: + return + + # Configure Frigate DB + db = SqliteQueueDatabase( + self.config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max(60, sum(10 for c in self.config.cameras.values() if c.enabled)), + ) + models = [Event] + db.bind(models) + + embeddings = Embeddings() + + # Check if we need to re-index events + if self.config.semantic_search.reindex: + embeddings.reindex() + self.embeddings = Embeddings() self.event_subscriber = EventUpdateSubscriber() self.event_end_subscriber = EventEndSubscriber() @@ -50,12 +68,9 @@ class EmbeddingMaintainer(threading.Thread): self.frame_manager = SharedMemoryFrameManager() # create communication for updating event descriptions self.requestor = InterProcessRequestor() - self.stop_event = stop_event self.tracked_events = {} - self.genai_client = get_genai_client(config.genai) + self.genai_client = get_genai_client(self.config.genai) - def run(self) -> None: - """Maintain a Chroma vector database for semantic search.""" while not self.stop_event.is_set(): self._process_updates() self._process_finalized() @@ -65,7 +80,7 @@ class EmbeddingMaintainer(threading.Thread): self.event_end_subscriber.stop() self.event_metadata_subscriber.stop() self.requestor.stop() - logger.info("Exiting embeddings maintenance...") + self.logger.info("Exiting embeddings maintenance...") def _process_updates(self) -> None: """Process event updates""" @@ -205,7 +220,7 @@ class EmbeddingMaintainer(threading.Thread): ) if not description: - logger.debug("Failed to generate description for %s", event.id) + self.logger.debug("Failed to generate description for %s", event.id) return # fire and forget description update @@ -221,7 +236,7 @@ class EmbeddingMaintainer(threading.Thread): ids=[event.id], ) - logger.debug( + self.logger.debug( "Generated description for %s (%d images): %s", event.id, len(thumbnails), @@ -232,12 +247,14 @@ class EmbeddingMaintainer(threading.Thread): try: event: Event = Event.get(Event.id == event_id) except DoesNotExist: - logger.error(f"Event {event_id} not found for description regeneration") + self.logger.error( + f"Event {event_id} not found for description regeneration" + ) return camera_config = self.config.cameras[event.camera] if not camera_config.genai.enabled or self.genai_client is None: - logger.error(f"GenAI not enabled for camera {event.camera}") + self.logger.error(f"GenAI not enabled for camera {event.camera}") return metadata = get_metadata(event)