mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-12-06 13:34:13 +03:00
* Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
750 lines
27 KiB
Python
750 lines
27 KiB
Python
"""Maintain embeddings in SQLite-vec."""
|
|
|
|
import base64
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import threading
|
|
from multiprocessing.synchronize import Event as MpEvent
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from peewee import DoesNotExist
|
|
|
|
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
|
|
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
|
|
from frigate.comms.event_metadata_updater import (
|
|
EventMetadataPublisher,
|
|
EventMetadataSubscriber,
|
|
EventMetadataTypeEnum,
|
|
)
|
|
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
|
|
from frigate.comms.inter_process import InterProcessRequestor
|
|
from frigate.comms.recordings_updater import (
|
|
RecordingsDataSubscriber,
|
|
RecordingsDataTypeEnum,
|
|
)
|
|
from frigate.config import FrigateConfig
|
|
from frigate.config.camera.camera import CameraTypeEnum
|
|
from frigate.config.camera.updater import (
|
|
CameraConfigUpdateEnum,
|
|
CameraConfigUpdateSubscriber,
|
|
)
|
|
from frigate.const import (
|
|
CLIPS_DIR,
|
|
UPDATE_EVENT_DESCRIPTION,
|
|
)
|
|
from frigate.data_processing.common.license_plate.model import (
|
|
LicensePlateModelRunner,
|
|
)
|
|
from frigate.data_processing.post.api import PostProcessorApi
|
|
from frigate.data_processing.post.audio_transcription import (
|
|
AudioTranscriptionPostProcessor,
|
|
)
|
|
from frigate.data_processing.post.license_plate import (
|
|
LicensePlatePostProcessor,
|
|
)
|
|
from frigate.data_processing.real_time.api import RealTimeProcessorApi
|
|
from frigate.data_processing.real_time.bird import BirdRealTimeProcessor
|
|
from frigate.data_processing.real_time.custom_classification import (
|
|
CustomObjectClassificationProcessor,
|
|
CustomStateClassificationProcessor,
|
|
)
|
|
from frigate.data_processing.real_time.face import FaceRealTimeProcessor
|
|
from frigate.data_processing.real_time.license_plate import (
|
|
LicensePlateRealTimeProcessor,
|
|
)
|
|
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
|
|
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
|
|
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
|
|
from frigate.genai import get_genai_client
|
|
from frigate.models import Event, Recordings
|
|
from frigate.types import TrackedObjectUpdateTypesEnum
|
|
from frigate.util.builtin import serialize
|
|
from frigate.util.image import (
|
|
SharedMemoryFrameManager,
|
|
calculate_region,
|
|
ensure_jpeg_bytes,
|
|
)
|
|
from frigate.util.path import get_event_thumbnail_bytes
|
|
|
|
from .embeddings import Embeddings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAX_THUMBNAILS = 10
|
|
|
|
|
|
class EmbeddingMaintainer(threading.Thread):
|
|
"""Handle embedding queue and post event updates."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: FrigateConfig,
|
|
metrics: DataProcessorMetrics | None,
|
|
stop_event: MpEvent,
|
|
) -> None:
|
|
super().__init__(name="embeddings_maintainer")
|
|
self.config = config
|
|
self.metrics = metrics
|
|
self.embeddings = None
|
|
self.config_updater = CameraConfigUpdateSubscriber(
|
|
self.config,
|
|
self.config.cameras,
|
|
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
|
|
)
|
|
|
|
# Configure Frigate DB
|
|
db = SqliteVecQueueDatabase(
|
|
config.database.path,
|
|
pragmas={
|
|
"auto_vacuum": "FULL", # Does not defragment database
|
|
"cache_size": -512 * 1000, # 512MB of cache
|
|
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
|
|
},
|
|
timeout=max(
|
|
60, 10 * len([c for c in config.cameras.values() if c.enabled])
|
|
),
|
|
load_vec_extension=True,
|
|
)
|
|
models = [Event, Recordings]
|
|
db.bind(models)
|
|
|
|
if config.semantic_search.enabled:
|
|
self.embeddings = Embeddings(config, db, metrics)
|
|
|
|
# Check if we need to re-index events
|
|
if config.semantic_search.reindex:
|
|
self.embeddings.reindex()
|
|
|
|
# create communication for updating event descriptions
|
|
self.requestor = InterProcessRequestor()
|
|
|
|
self.event_subscriber = EventUpdateSubscriber()
|
|
self.event_end_subscriber = EventEndSubscriber()
|
|
self.event_metadata_publisher = EventMetadataPublisher()
|
|
self.event_metadata_subscriber = EventMetadataSubscriber(
|
|
EventMetadataTypeEnum.regenerate_description
|
|
)
|
|
self.recordings_subscriber = RecordingsDataSubscriber(
|
|
RecordingsDataTypeEnum.recordings_available_through
|
|
)
|
|
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video)
|
|
self.embeddings_responder = EmbeddingsResponder()
|
|
self.frame_manager = SharedMemoryFrameManager()
|
|
|
|
self.detected_license_plates: dict[str, dict[str, Any]] = {}
|
|
|
|
# model runners to share between realtime and post processors
|
|
if self.config.lpr.enabled:
|
|
lpr_model_runner = LicensePlateModelRunner(
|
|
self.requestor,
|
|
device=self.config.lpr.device,
|
|
model_size=self.config.lpr.model_size,
|
|
)
|
|
|
|
# realtime processors
|
|
self.realtime_processors: list[RealTimeProcessorApi] = []
|
|
|
|
if self.config.face_recognition.enabled:
|
|
self.realtime_processors.append(
|
|
FaceRealTimeProcessor(
|
|
self.config, self.requestor, self.event_metadata_publisher, metrics
|
|
)
|
|
)
|
|
|
|
if self.config.classification.bird.enabled:
|
|
self.realtime_processors.append(
|
|
BirdRealTimeProcessor(
|
|
self.config, self.event_metadata_publisher, metrics
|
|
)
|
|
)
|
|
|
|
if self.config.lpr.enabled:
|
|
self.realtime_processors.append(
|
|
LicensePlateRealTimeProcessor(
|
|
self.config,
|
|
self.requestor,
|
|
self.event_metadata_publisher,
|
|
metrics,
|
|
lpr_model_runner,
|
|
self.detected_license_plates,
|
|
)
|
|
)
|
|
|
|
for model_config in self.config.classification.custom.values():
|
|
self.realtime_processors.append(
|
|
CustomStateClassificationProcessor(
|
|
self.config, model_config, self.requestor, self.metrics
|
|
)
|
|
if model_config.state_config != None
|
|
else CustomObjectClassificationProcessor(
|
|
self.config,
|
|
model_config,
|
|
self.event_metadata_publisher,
|
|
self.metrics,
|
|
)
|
|
)
|
|
|
|
# post processors
|
|
self.post_processors: list[PostProcessorApi] = []
|
|
|
|
if self.config.lpr.enabled:
|
|
self.post_processors.append(
|
|
LicensePlatePostProcessor(
|
|
self.config,
|
|
self.requestor,
|
|
self.event_metadata_publisher,
|
|
metrics,
|
|
lpr_model_runner,
|
|
self.detected_license_plates,
|
|
)
|
|
)
|
|
|
|
if any(
|
|
c.enabled_in_config and c.audio_transcription.enabled
|
|
for c in self.config.cameras.values()
|
|
):
|
|
self.post_processors.append(
|
|
AudioTranscriptionPostProcessor(self.config, self.requestor, metrics)
|
|
)
|
|
|
|
self.stop_event = stop_event
|
|
self.tracked_events: dict[str, list[Any]] = {}
|
|
self.early_request_sent: dict[str, bool] = {}
|
|
self.genai_client = get_genai_client(config)
|
|
|
|
# recordings data
|
|
self.recordings_available_through: dict[str, float] = {}
|
|
|
|
def run(self) -> None:
|
|
"""Maintain a SQLite-vec database for semantic search."""
|
|
while not self.stop_event.is_set():
|
|
self.config_updater.check_for_updates()
|
|
self._process_requests()
|
|
self._process_updates()
|
|
self._process_recordings_updates()
|
|
self._process_frame_updates()
|
|
self._expire_dedicated_lpr()
|
|
self._process_finalized()
|
|
self._process_event_metadata()
|
|
|
|
self.config_updater.stop()
|
|
self.event_subscriber.stop()
|
|
self.event_end_subscriber.stop()
|
|
self.recordings_subscriber.stop()
|
|
self.detection_subscriber.stop()
|
|
self.event_metadata_publisher.stop()
|
|
self.event_metadata_subscriber.stop()
|
|
self.embeddings_responder.stop()
|
|
self.requestor.stop()
|
|
logger.info("Exiting embeddings maintenance...")
|
|
|
|
def _process_requests(self) -> None:
|
|
"""Process embeddings requests"""
|
|
|
|
def _handle_request(topic: str, data: dict[str, Any]) -> str:
|
|
try:
|
|
# First handle the embedding-specific topics when semantic search is enabled
|
|
if self.config.semantic_search.enabled:
|
|
if topic == EmbeddingsRequestEnum.embed_description.value:
|
|
return serialize(
|
|
self.embeddings.embed_description(
|
|
data["id"], data["description"]
|
|
),
|
|
pack=False,
|
|
)
|
|
elif topic == EmbeddingsRequestEnum.embed_thumbnail.value:
|
|
thumbnail = base64.b64decode(data["thumbnail"])
|
|
return serialize(
|
|
self.embeddings.embed_thumbnail(data["id"], thumbnail),
|
|
pack=False,
|
|
)
|
|
elif topic == EmbeddingsRequestEnum.generate_search.value:
|
|
return serialize(
|
|
self.embeddings.embed_description("", data, upsert=False),
|
|
pack=False,
|
|
)
|
|
elif topic == EmbeddingsRequestEnum.reindex.value:
|
|
response = self.embeddings.start_reindex()
|
|
return "started" if response else "in_progress"
|
|
|
|
processors = [self.realtime_processors, self.post_processors]
|
|
for processor_list in processors:
|
|
for processor in processor_list:
|
|
resp = processor.handle_request(topic, data)
|
|
if resp is not None:
|
|
return resp
|
|
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unable to handle embeddings request {e}", exc_info=True)
|
|
|
|
self.embeddings_responder.check_for_request(_handle_request)
|
|
|
|
def _process_updates(self) -> None:
|
|
"""Process event updates"""
|
|
update = self.event_subscriber.check_for_update()
|
|
|
|
if update is None:
|
|
return
|
|
|
|
source_type, _, camera, frame_name, data = update
|
|
|
|
if not camera or source_type != EventTypeEnum.tracked_object:
|
|
return
|
|
|
|
if self.config.semantic_search.enabled:
|
|
self.embeddings.update_stats()
|
|
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
# no need to process updated objects if face recognition, lpr, genai are disabled
|
|
if not camera_config.genai.enabled and len(self.realtime_processors) == 0:
|
|
return
|
|
|
|
# Create our own thumbnail based on the bounding box and the frame time
|
|
try:
|
|
yuv_frame = self.frame_manager.get(
|
|
frame_name, camera_config.frame_shape_yuv
|
|
)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if yuv_frame is None:
|
|
logger.debug(
|
|
"Unable to process object update because frame is unavailable."
|
|
)
|
|
return
|
|
|
|
for processor in self.realtime_processors:
|
|
processor.process_frame(data, yuv_frame)
|
|
|
|
# no need to save our own thumbnails if genai is not enabled
|
|
# or if the object has become stationary
|
|
if self.genai_client is not None and not data["stationary"]:
|
|
if data["id"] not in self.tracked_events:
|
|
self.tracked_events[data["id"]] = []
|
|
|
|
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
|
|
|
|
# Limit the number of thumbnails saved
|
|
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
|
|
# Always keep the first thumbnail for the event
|
|
self.tracked_events[data["id"]].pop(1)
|
|
|
|
self.tracked_events[data["id"]].append(data)
|
|
|
|
# check if we're configured to send an early request after a minimum number of updates received
|
|
if (
|
|
self.genai_client is not None
|
|
and camera_config.genai.send_triggers.after_significant_updates
|
|
):
|
|
if (
|
|
len(self.tracked_events.get(data["id"], []))
|
|
>= camera_config.genai.send_triggers.after_significant_updates
|
|
and data["id"] not in self.early_request_sent
|
|
):
|
|
if data["has_clip"] and data["has_snapshot"]:
|
|
event: Event = Event.get(Event.id == data["id"])
|
|
|
|
if (
|
|
not camera_config.genai.objects
|
|
or event.label in camera_config.genai.objects
|
|
) and (
|
|
not camera_config.genai.required_zones
|
|
or set(data["entered_zones"])
|
|
& set(camera_config.genai.required_zones)
|
|
):
|
|
logger.debug(f"{camera} sending early request to GenAI")
|
|
|
|
self.early_request_sent[data["id"]] = True
|
|
threading.Thread(
|
|
target=self._genai_embed_description,
|
|
name=f"_genai_embed_description_{event.id}",
|
|
daemon=True,
|
|
args=(
|
|
event,
|
|
[
|
|
data["thumbnail"]
|
|
for data in self.tracked_events[data["id"]]
|
|
],
|
|
),
|
|
).start()
|
|
|
|
self.frame_manager.close(frame_name)
|
|
|
|
def _process_finalized(self) -> None:
|
|
"""Process the end of an event."""
|
|
while True:
|
|
ended = self.event_end_subscriber.check_for_update()
|
|
|
|
if ended == None:
|
|
break
|
|
|
|
event_id, camera, updated_db = ended
|
|
camera_config = self.config.cameras[camera]
|
|
|
|
# call any defined post processors
|
|
for processor in self.post_processors:
|
|
if isinstance(processor, LicensePlatePostProcessor):
|
|
recordings_available = self.recordings_available_through.get(camera)
|
|
if (
|
|
recordings_available is not None
|
|
and event_id in self.detected_license_plates
|
|
and self.config.cameras[camera].type != "lpr"
|
|
):
|
|
processor.process_data(
|
|
{
|
|
"event_id": event_id,
|
|
"camera": camera,
|
|
"recordings_available": self.recordings_available_through[
|
|
camera
|
|
],
|
|
"obj_data": self.detected_license_plates[event_id][
|
|
"obj_data"
|
|
],
|
|
},
|
|
PostProcessDataEnum.recording,
|
|
)
|
|
elif isinstance(processor, AudioTranscriptionPostProcessor):
|
|
continue
|
|
else:
|
|
processor.process_data(event_id, PostProcessDataEnum.event_id)
|
|
|
|
# expire in realtime processors
|
|
for processor in self.realtime_processors:
|
|
processor.expire_object(event_id, camera)
|
|
|
|
if updated_db:
|
|
try:
|
|
event: Event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
continue
|
|
|
|
# Skip the event if not an object
|
|
if event.data.get("type") != "object":
|
|
continue
|
|
|
|
# Extract valid thumbnail
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
|
|
# Embed the thumbnail
|
|
self._embed_thumbnail(event_id, thumbnail)
|
|
|
|
# Run GenAI
|
|
if (
|
|
camera_config.genai.enabled
|
|
and camera_config.genai.send_triggers.tracked_object_end
|
|
and self.genai_client is not None
|
|
and (
|
|
not camera_config.genai.objects
|
|
or event.label in camera_config.genai.objects
|
|
)
|
|
and (
|
|
not camera_config.genai.required_zones
|
|
or set(event.zones) & set(camera_config.genai.required_zones)
|
|
)
|
|
):
|
|
self._process_genai_description(event, camera_config, thumbnail)
|
|
|
|
# Delete tracked events based on the event_id
|
|
if event_id in self.tracked_events:
|
|
del self.tracked_events[event_id]
|
|
|
|
def _expire_dedicated_lpr(self) -> None:
|
|
"""Remove plates not seen for longer than expiration timeout for dedicated lpr cameras."""
|
|
now = datetime.datetime.now().timestamp()
|
|
|
|
to_remove = []
|
|
|
|
for id, data in self.detected_license_plates.items():
|
|
last_seen = data.get("last_seen", 0)
|
|
if not last_seen:
|
|
continue
|
|
|
|
if now - last_seen > self.config.cameras[data["camera"]].lpr.expire_time:
|
|
to_remove.append(id)
|
|
for id in to_remove:
|
|
self.event_metadata_publisher.publish(
|
|
EventMetadataTypeEnum.manual_event_end,
|
|
(id, now),
|
|
)
|
|
self.detected_license_plates.pop(id)
|
|
|
|
def _process_recordings_updates(self) -> None:
|
|
"""Process recordings updates."""
|
|
while True:
|
|
recordings_data = self.recordings_subscriber.check_for_update()
|
|
|
|
if recordings_data == None:
|
|
break
|
|
|
|
camera, recordings_available_through_timestamp = recordings_data
|
|
|
|
self.recordings_available_through[camera] = (
|
|
recordings_available_through_timestamp
|
|
)
|
|
|
|
logger.debug(
|
|
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
|
|
)
|
|
|
|
def _process_event_metadata(self):
|
|
# Check for regenerate description requests
|
|
(topic, payload) = self.event_metadata_subscriber.check_for_update()
|
|
|
|
if topic is None:
|
|
return
|
|
|
|
event_id, source, force = payload
|
|
|
|
if event_id:
|
|
self.handle_regenerate_description(
|
|
event_id, RegenerateDescriptionEnum(source), force
|
|
)
|
|
|
|
def _process_frame_updates(self) -> None:
|
|
"""Process event updates"""
|
|
(topic, data) = self.detection_subscriber.check_for_update()
|
|
|
|
if topic is None:
|
|
return
|
|
|
|
camera, frame_name, _, _, motion_boxes, _ = data
|
|
|
|
if not camera or len(motion_boxes) == 0:
|
|
return
|
|
|
|
camera_config = self.config.cameras[camera]
|
|
dedicated_lpr_enabled = (
|
|
camera_config.type == CameraTypeEnum.lpr
|
|
and "license_plate" not in camera_config.objects.track
|
|
)
|
|
|
|
if not dedicated_lpr_enabled and len(self.config.classification.custom) == 0:
|
|
# no active features that use this data
|
|
return
|
|
|
|
try:
|
|
yuv_frame = self.frame_manager.get(
|
|
frame_name, camera_config.frame_shape_yuv
|
|
)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
if yuv_frame is None:
|
|
logger.debug(
|
|
"Unable to process dedicated LPR update because frame is unavailable."
|
|
)
|
|
return
|
|
|
|
for processor in self.realtime_processors:
|
|
if dedicated_lpr_enabled and isinstance(
|
|
processor, LicensePlateRealTimeProcessor
|
|
):
|
|
processor.process_frame(camera, yuv_frame, True)
|
|
|
|
if isinstance(processor, CustomStateClassificationProcessor):
|
|
processor.process_frame(
|
|
{"camera": camera, "motion": motion_boxes}, yuv_frame
|
|
)
|
|
|
|
self.frame_manager.close(frame_name)
|
|
|
|
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
|
|
"""Return jpg thumbnail of a region of the frame."""
|
|
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
|
|
region = calculate_region(
|
|
frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4
|
|
)
|
|
frame = frame[region[1] : region[3], region[0] : region[2]]
|
|
width = int(height * frame.shape[1] / frame.shape[0])
|
|
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
|
|
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
|
|
|
|
if ret:
|
|
return jpg.tobytes()
|
|
|
|
return None
|
|
|
|
def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None:
|
|
"""Embed the thumbnail for an event."""
|
|
if not self.config.semantic_search.enabled:
|
|
return
|
|
|
|
self.embeddings.embed_thumbnail(event_id, thumbnail)
|
|
|
|
def _process_genai_description(self, event, camera_config, thumbnail) -> None:
|
|
if event.has_snapshot and camera_config.genai.use_snapshot:
|
|
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
|
|
if not snapshot_image:
|
|
return
|
|
|
|
num_thumbnails = len(self.tracked_events.get(event.id, []))
|
|
|
|
# ensure we have a jpeg to pass to the model
|
|
thumbnail = ensure_jpeg_bytes(thumbnail)
|
|
|
|
embed_image = (
|
|
[snapshot_image]
|
|
if event.has_snapshot and camera_config.genai.use_snapshot
|
|
else (
|
|
[data["thumbnail"] for data in self.tracked_events[event.id]]
|
|
if num_thumbnails > 0
|
|
else [thumbnail]
|
|
)
|
|
)
|
|
|
|
if camera_config.genai.debug_save_thumbnails and num_thumbnails > 0:
|
|
logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}")
|
|
|
|
Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir(
|
|
parents=True, exist_ok=True
|
|
)
|
|
|
|
for idx, data in enumerate(self.tracked_events[event.id], 1):
|
|
jpg_bytes: bytes = data["thumbnail"]
|
|
|
|
if jpg_bytes is None:
|
|
logger.warning(f"Unable to save thumbnail {idx} for {event.id}.")
|
|
else:
|
|
with open(
|
|
os.path.join(
|
|
CLIPS_DIR,
|
|
f"genai-requests/{event.id}/{idx}.jpg",
|
|
),
|
|
"wb",
|
|
) as j:
|
|
j.write(jpg_bytes)
|
|
|
|
# Generate the description. Call happens in a thread since it is network bound.
|
|
threading.Thread(
|
|
target=self._genai_embed_description,
|
|
name=f"_genai_embed_description_{event.id}",
|
|
daemon=True,
|
|
args=(
|
|
event,
|
|
embed_image,
|
|
),
|
|
).start()
|
|
|
|
def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None:
|
|
"""Embed the description for an event."""
|
|
camera_config = self.config.cameras[event.camera]
|
|
|
|
description = self.genai_client.generate_description(
|
|
camera_config, thumbnails, event
|
|
)
|
|
|
|
if not description:
|
|
logger.debug("Failed to generate description for %s", event.id)
|
|
return
|
|
|
|
# fire and forget description update
|
|
self.requestor.send_data(
|
|
UPDATE_EVENT_DESCRIPTION,
|
|
{
|
|
"type": TrackedObjectUpdateTypesEnum.description,
|
|
"id": event.id,
|
|
"description": description,
|
|
"camera": event.camera,
|
|
},
|
|
)
|
|
|
|
# Embed the description
|
|
if self.config.semantic_search.enabled:
|
|
self.embeddings.embed_description(event.id, description)
|
|
|
|
logger.debug(
|
|
"Generated description for %s (%d images): %s",
|
|
event.id,
|
|
len(thumbnails),
|
|
description,
|
|
)
|
|
|
|
def _read_and_crop_snapshot(self, event: Event, camera_config) -> bytes | None:
|
|
"""Read, decode, and crop the snapshot image."""
|
|
|
|
snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
|
|
|
|
if not os.path.isfile(snapshot_file):
|
|
logger.error(
|
|
f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}"
|
|
)
|
|
return None
|
|
|
|
try:
|
|
with open(snapshot_file, "rb") as image_file:
|
|
snapshot_image = image_file.read()
|
|
|
|
img = cv2.imdecode(
|
|
np.frombuffer(snapshot_image, dtype=np.int8),
|
|
cv2.IMREAD_COLOR,
|
|
)
|
|
|
|
# Crop snapshot based on region
|
|
# provide full image if region doesn't exist (manual events)
|
|
height, width = img.shape[:2]
|
|
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
|
|
"region", [0, 0, 1, 1]
|
|
)
|
|
x1, y1 = int(x1_rel * width), int(y1_rel * height)
|
|
|
|
cropped_image = img[
|
|
y1 : y1 + int(height_rel * height),
|
|
x1 : x1 + int(width_rel * width),
|
|
]
|
|
|
|
_, buffer = cv2.imencode(".jpg", cropped_image)
|
|
|
|
return buffer.tobytes()
|
|
except Exception:
|
|
return None
|
|
|
|
def handle_regenerate_description(
|
|
self, event_id: str, source: str, force: bool
|
|
) -> None:
|
|
try:
|
|
event: Event = Event.get(Event.id == event_id)
|
|
except DoesNotExist:
|
|
logger.error(f"Event {event_id} not found for description regeneration")
|
|
return
|
|
|
|
if self.genai_client is None:
|
|
logger.error("GenAI not enabled")
|
|
return
|
|
|
|
camera_config = self.config.cameras[event.camera]
|
|
if not camera_config.genai.enabled and not force:
|
|
logger.error(f"GenAI not enabled for camera {event.camera}")
|
|
return
|
|
|
|
thumbnail = get_event_thumbnail_bytes(event)
|
|
|
|
# ensure we have a jpeg to pass to the model
|
|
thumbnail = ensure_jpeg_bytes(thumbnail)
|
|
|
|
logger.debug(
|
|
f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}"
|
|
)
|
|
|
|
if event.has_snapshot and source == "snapshot":
|
|
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
|
|
if not snapshot_image:
|
|
return
|
|
|
|
embed_image = (
|
|
[snapshot_image]
|
|
if event.has_snapshot and source == "snapshot"
|
|
else (
|
|
[data["thumbnail"] for data in self.tracked_events[event_id]]
|
|
if len(self.tracked_events.get(event_id, [])) > 0
|
|
else [thumbnail]
|
|
)
|
|
)
|
|
|
|
self._genai_embed_description(event, embed_image)
|