Merge EmbeddingsMaintainer

This commit is contained in:
George Tsiamasiotis 2024-10-01 09:03:32 +03:00
parent 486793e454
commit abe4ce43f3
3 changed files with 42 additions and 87 deletions

View File

@ -36,7 +36,8 @@ from frigate.const import (
MODEL_CACHE_DIR,
RECORD_DIR,
)
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.embeddings import EmbeddingsContext
from frigate.embeddings.maintainer import EmbeddingsMaintainer
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
@ -195,12 +196,7 @@ class FrigateApp:
# Create a client for other processes to use
self.embeddings = EmbeddingsContext()
embedding_process = util.Process(
target=manage_embeddings,
name="embeddings_manager",
args=(self.config,),
daemon=True,
)
embedding_process = EmbeddingsMaintainer(self.config)
self.embedding_process = embedding_process
embedding_process.start()
self.processes["embeddings"] = embedding_process.pid or 0

View File

@ -1,70 +1,12 @@
"""ChromaDB embeddings database."""
import json
import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR
from frigate.models import Event
from frigate.util.services import listen
from .embeddings import Embeddings
from .maintainer import EmbeddingMaintainer
from .util import ZScoreNormalization
logger = logging.getLogger(__name__)
def manage_embeddings(config: FrigateConfig) -> None:
# Only initialize embeddings if semantic search is enabled
if not config.semantic_search.enabled:
return
stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:embeddings_manager"
setproctitle("frigate.embeddings_manager")
listen()
# Configure Frigate DB
db = SqliteQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
)
models = [Event]
db.bind(models)
embeddings = Embeddings()
# Check if we need to re-index events
if config.semantic_search.reindex:
embeddings.reindex()
maintainer = EmbeddingMaintainer(
config,
stop_event,
)
maintainer.start()
class EmbeddingsContext:
def __init__(self):

View File

@ -2,16 +2,16 @@
import base64
import io
import logging
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from PIL import Image
from playhouse.sqliteq import SqliteQueueDatabase
from frigate import util
from frigate.comms.event_metadata_updater import (
EventMetadataSubscriber,
EventMetadataTypeEnum,
@ -27,20 +27,38 @@ from frigate.util.image import SharedMemoryFrameManager, calculate_region
from .embeddings import Embeddings, get_metadata
logger = logging.getLogger(__name__)
class EmbeddingsMaintainer(util.Process):
"""Maintain a Chroma vector database for semantic search."""
class EmbeddingMaintainer(threading.Thread):
"""Handle embedding queue and post event updates."""
def __init__(
self,
config: FrigateConfig,
stop_event: MpEvent,
) -> None:
threading.Thread.__init__(self)
self.name = "embeddings_maintainer"
def __init__(self, config: FrigateConfig):
super().__init__(name="frigate.embeddings_manager", daemon=True)
self.config = config
def run(self):
# Only initialize embeddings if semantic search is enabled
if not self.config.semantic_search.enabled:
return
# Configure Frigate DB
db = SqliteQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, sum(10 for c in self.config.cameras.values() if c.enabled)),
)
models = [Event]
db.bind(models)
embeddings = Embeddings()
# Check if we need to re-index events
if self.config.semantic_search.reindex:
embeddings.reindex()
self.embeddings = Embeddings()
self.event_subscriber = EventUpdateSubscriber()
self.event_end_subscriber = EventEndSubscriber()
@ -50,12 +68,9 @@ class EmbeddingMaintainer(threading.Thread):
self.frame_manager = SharedMemoryFrameManager()
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
self.stop_event = stop_event
self.tracked_events = {}
self.genai_client = get_genai_client(config.genai)
self.genai_client = get_genai_client(self.config.genai)
def run(self) -> None:
"""Maintain a Chroma vector database for semantic search."""
while not self.stop_event.is_set():
self._process_updates()
self._process_finalized()
@ -65,7 +80,7 @@ class EmbeddingMaintainer(threading.Thread):
self.event_end_subscriber.stop()
self.event_metadata_subscriber.stop()
self.requestor.stop()
logger.info("Exiting embeddings maintenance...")
self.logger.info("Exiting embeddings maintenance...")
def _process_updates(self) -> None:
"""Process event updates"""
@ -205,7 +220,7 @@ class EmbeddingMaintainer(threading.Thread):
)
if not description:
logger.debug("Failed to generate description for %s", event.id)
self.logger.debug("Failed to generate description for %s", event.id)
return
# fire and forget description update
@ -221,7 +236,7 @@ class EmbeddingMaintainer(threading.Thread):
ids=[event.id],
)
logger.debug(
self.logger.debug(
"Generated description for %s (%d images): %s",
event.id,
len(thumbnails),
@ -232,12 +247,14 @@ class EmbeddingMaintainer(threading.Thread):
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
logger.error(f"Event {event_id} not found for description regeneration")
self.logger.error(
f"Event {event_id} not found for description regeneration"
)
return
camera_config = self.config.cameras[event.camera]
if not camera_config.genai.enabled or self.genai_client is None:
logger.error(f"GenAI not enabled for camera {event.camera}")
self.logger.error(f"GenAI not enabled for camera {event.camera}")
return
metadata = get_metadata(event)