Refactor object genai to be a post-processor (#20331)
Some checks failed
CI / AMD64 Build (push) Has been cancelled
CI / ARM Build (push) Has been cancelled
CI / Jetson Jetpack 6 (push) Has been cancelled
CI / AMD64 Extra Build (push) Has been cancelled
CI / ARM Extra Build (push) Has been cancelled
CI / Synaptics Build (push) Has been cancelled
CI / Assemble and push default build (push) Has been cancelled

* Refactor object genai to be a post-processor

* Include function correctly
This commit is contained in:
Nicolas Mowen 2025-10-02 12:48:11 -06:00 committed by GitHub
parent 37999abbe6
commit 2d45ea271e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 440 additions and 304 deletions

View File

@ -0,0 +1,349 @@
"""Post processor for object descriptions using GenAI."""
import datetime
import logging
import os
import threading
from pathlib import Path
from typing import TYPE_CHECKING, Any
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR, UPDATE_EVENT_DESCRIPTION
from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.types import PostProcessDataEnum
from frigate.genai import GenAIClient
from frigate.models import Event
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from frigate.util.image import create_thumbnail, ensure_jpeg_bytes
from frigate.util.path import get_event_thumbnail_bytes
if TYPE_CHECKING:
from frigate.embeddings import Embeddings
from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
MAX_THUMBNAILS = 10
class ObjectDescriptionProcessor(PostProcessorApi):
def __init__(
self,
config: FrigateConfig,
embeddings: "Embeddings",
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
client: GenAIClient,
semantic_trigger_processor: SemanticTriggerProcessor | None,
):
super().__init__(config, metrics, None)
self.config = config
self.embeddings = embeddings
self.requestor = requestor
self.metrics = metrics
self.genai_client = client
self.semantic_trigger_processor = semantic_trigger_processor
self.tracked_events: dict[str, list[Any]] = {}
self.early_request_sent: dict[str, bool] = {}
self.object_desc_speed = InferenceSpeed(self.metrics.object_desc_speed)
self.object_desc_dps = EventsPerSecond()
self.object_desc_dps.start()
def __handle_frame_update(
self, camera: str, data: dict, yuv_frame: np.ndarray
) -> None:
"""Handle an update to a frame for an object."""
camera_config = self.config.cameras[camera]
# no need to save our own thumbnails if genai is not enabled
# or if the object has become stationary
if not data["stationary"]:
if data["id"] not in self.tracked_events:
self.tracked_events[data["id"]] = []
data["thumbnail"] = create_thumbnail(yuv_frame, data["box"])
# Limit the number of thumbnails saved
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS:
# Always keep the first thumbnail for the event
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
# check if we're configured to send an early request after a minimum number of updates received
if camera_config.objects.genai.send_triggers.after_significant_updates:
if (
len(self.tracked_events.get(data["id"], []))
>= camera_config.objects.genai.send_triggers.after_significant_updates
and data["id"] not in self.early_request_sent
):
if data["has_clip"] and data["has_snapshot"]:
event: Event = Event.get(Event.id == data["id"])
if (
not camera_config.objects.genai.objects
or event.label in camera_config.objects.genai.objects
) and (
not camera_config.objects.genai.required_zones
or set(data["entered_zones"])
& set(camera_config.objects.genai.required_zones)
):
logger.debug(f"{camera} sending early request to GenAI")
self.early_request_sent[data["id"]] = True
threading.Thread(
target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}",
daemon=True,
args=(
event,
[
data["thumbnail"]
for data in self.tracked_events[data["id"]]
],
),
).start()
def __handle_frame_finalize(
self, camera: str, event: Event, thumbnail: bytes
) -> None:
"""Handle the finalization of a frame."""
camera_config = self.config.cameras[camera]
if (
camera_config.objects.genai.enabled
and camera_config.objects.genai.send_triggers.tracked_object_end
and (
not camera_config.objects.genai.objects
or event.label in camera_config.objects.genai.objects
)
and (
not camera_config.objects.genai.required_zones
or set(event.zones) & set(camera_config.objects.genai.required_zones)
)
):
self._process_genai_description(event, camera_config, thumbnail)
def __regenerate_description(self, event_id: str, source: str, force: bool) -> None:
"""Regenerate the description for an event."""
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
logger.error(f"Event {event_id} not found for description regeneration")
return
if self.genai_client is None:
logger.error("GenAI not enabled")
return
camera_config = self.config.cameras[event.camera]
if not camera_config.objects.genai.enabled and not force:
logger.error(f"GenAI not enabled for camera {event.camera}")
return
thumbnail = get_event_thumbnail_bytes(event)
# ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail)
logger.debug(
f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}"
)
if event.has_snapshot and source == "snapshot":
snapshot_image = self._read_and_crop_snapshot(event)
if not snapshot_image:
return
embed_image = (
[snapshot_image]
if event.has_snapshot and source == "snapshot"
else (
[data["thumbnail"] for data in self.tracked_events[event_id]]
if len(self.tracked_events.get(event_id, [])) > 0
else [thumbnail]
)
)
self._genai_embed_description(event, embed_image)
def process_data(self, frame_data: dict, data_type: PostProcessDataEnum) -> None:
"""Process a frame update."""
self.metrics.object_desc_dps.value = self.object_desc_dps.eps()
if data_type != PostProcessDataEnum.tracked_object:
return
state: str | None = frame_data.get("state", None)
if state is not None:
logger.debug(f"Processing {state} for {frame_data['camera']}")
if state == "update":
self.__handle_frame_update(
frame_data["camera"], frame_data["data"], frame_data["yuv_frame"]
)
elif state == "finalize":
self.__handle_frame_finalize(
frame_data["camera"], frame_data["event"], frame_data["thumbnail"]
)
def handle_request(self, topic: str, data: dict[str, Any]) -> str | None:
"""Handle a request."""
if topic == "regenerate_description":
self.__regenerate_description(
data["event_id"], data["source"], data["force"]
)
return None
def _read_and_crop_snapshot(self, event: Event) -> bytes | None:
"""Read, decode, and crop the snapshot image."""
snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
if not os.path.isfile(snapshot_file):
logger.error(
f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}"
)
return None
try:
with open(snapshot_file, "rb") as image_file:
snapshot_image = image_file.read()
img = cv2.imdecode(
np.frombuffer(snapshot_image, dtype=np.int8),
cv2.IMREAD_COLOR,
)
# Crop snapshot based on region
# provide full image if region doesn't exist (manual events)
height, width = img.shape[:2]
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
"region", [0, 0, 1, 1]
)
x1, y1 = int(x1_rel * width), int(y1_rel * height)
cropped_image = img[
y1 : y1 + int(height_rel * height),
x1 : x1 + int(width_rel * width),
]
_, buffer = cv2.imencode(".jpg", cropped_image)
return buffer.tobytes()
except Exception:
return None
def _process_genai_description(
self, event: Event, camera_config: CameraConfig, thumbnail
) -> None:
if event.has_snapshot and camera_config.objects.genai.use_snapshot:
snapshot_image = self._read_and_crop_snapshot(event)
if not snapshot_image:
return
num_thumbnails = len(self.tracked_events.get(event.id, []))
# ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail)
embed_image = (
[snapshot_image]
if event.has_snapshot and camera_config.objects.genai.use_snapshot
else (
[data["thumbnail"] for data in self.tracked_events[event.id]]
if num_thumbnails > 0
else [thumbnail]
)
)
if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0:
logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}")
Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir(
parents=True, exist_ok=True
)
for idx, data in enumerate(self.tracked_events[event.id], 1):
jpg_bytes: bytes | None = data["thumbnail"]
if jpg_bytes is None:
logger.warning(f"Unable to save thumbnail {idx} for {event.id}.")
else:
with open(
os.path.join(
CLIPS_DIR,
f"genai-requests/{event.id}/{idx}.jpg",
),
"wb",
) as j:
j.write(jpg_bytes)
# Generate the description. Call happens in a thread since it is network bound.
threading.Thread(
target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}",
daemon=True,
args=(
event,
embed_image,
),
).start()
# Delete tracked events based on the event_id
if event.id in self.tracked_events:
del self.tracked_events[event.id]
def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None:
"""Embed the description for an event."""
start = datetime.datetime.now().timestamp()
camera_config = self.config.cameras[event.camera]
description = self.genai_client.generate_object_description(
camera_config, thumbnails, event
)
if not description:
logger.debug("Failed to generate description for %s", event.id)
return
# fire and forget description update
self.requestor.send_data(
UPDATE_EVENT_DESCRIPTION,
{
"type": TrackedObjectUpdateTypesEnum.description,
"id": event.id,
"description": description,
"camera": event.camera,
},
)
# Embed the description
if self.config.semantic_search.enabled:
self.embeddings.embed_description(event.id, description)
# Check semantic trigger for this description
if self.semantic_trigger_processor is not None:
self.semantic_trigger_processor.process_data(
{"event_id": event.id, "camera": event.camera, "type": "text"},
PostProcessDataEnum.tracked_object,
)
# Update inference timing metrics
self.object_desc_speed.update(datetime.datetime.now().timestamp() - start)
self.object_desc_dps.update()
logger.debug(
"Generated description for %s (%d images): %s",
event.id,
len(thumbnails),
description,
)

View File

@ -22,6 +22,8 @@ class DataProcessorMetrics:
yolov9_lpr_pps: Synchronized yolov9_lpr_pps: Synchronized
review_desc_speed: Synchronized review_desc_speed: Synchronized
review_desc_dps: Synchronized review_desc_dps: Synchronized
object_desc_speed: Synchronized
object_desc_dps: Synchronized
classification_speeds: dict[str, Synchronized] classification_speeds: dict[str, Synchronized]
classification_cps: dict[str, Synchronized] classification_cps: dict[str, Synchronized]
@ -38,6 +40,8 @@ class DataProcessorMetrics:
self.yolov9_lpr_pps = manager.Value("d", 0.0) self.yolov9_lpr_pps = manager.Value("d", 0.0)
self.review_desc_speed = manager.Value("d", 0.0) self.review_desc_speed = manager.Value("d", 0.0)
self.review_desc_dps = manager.Value("d", 0.0) self.review_desc_dps = manager.Value("d", 0.0)
self.object_desc_speed = manager.Value("d", 0.0)
self.object_desc_dps = manager.Value("d", 0.0)
self.classification_speeds = manager.dict() self.classification_speeds = manager.dict()
self.classification_cps = manager.dict() self.classification_cps = manager.dict()

View File

@ -3,14 +3,10 @@
import base64 import base64
import datetime import datetime
import logging import logging
import os
import threading import threading
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from typing import Any
from typing import Any, Optional
import cv2
import numpy as np
from peewee import DoesNotExist from peewee import DoesNotExist
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
@ -30,16 +26,12 @@ from frigate.comms.recordings_updater import (
RecordingsDataTypeEnum, RecordingsDataTypeEnum,
) )
from frigate.comms.review_updater import ReviewDataSubscriber from frigate.comms.review_updater import ReviewDataSubscriber
from frigate.config import CameraConfig, FrigateConfig from frigate.config import FrigateConfig
from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import ( from frigate.config.camera.updater import (
CameraConfigUpdateEnum, CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber, CameraConfigUpdateSubscriber,
) )
from frigate.const import (
CLIPS_DIR,
UPDATE_EVENT_DESCRIPTION,
)
from frigate.data_processing.common.license_plate.model import ( from frigate.data_processing.common.license_plate.model import (
LicensePlateModelRunner, LicensePlateModelRunner,
) )
@ -50,6 +42,7 @@ from frigate.data_processing.post.audio_transcription import (
from frigate.data_processing.post.license_plate import ( from frigate.data_processing.post.license_plate import (
LicensePlatePostProcessor, LicensePlatePostProcessor,
) )
from frigate.data_processing.post.object_descriptions import ObjectDescriptionProcessor
from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor from frigate.data_processing.post.review_descriptions import ReviewDescriptionProcessor
from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor from frigate.data_processing.post.semantic_trigger import SemanticTriggerProcessor
from frigate.data_processing.real_time.api import RealTimeProcessorApi from frigate.data_processing.real_time.api import RealTimeProcessorApi
@ -67,13 +60,8 @@ from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client from frigate.genai import get_genai_client
from frigate.models import Event, Recordings, ReviewSegment, Trigger from frigate.models import Event, Recordings, ReviewSegment, Trigger
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize from frigate.util.builtin import serialize
from frigate.util.image import ( from frigate.util.image import SharedMemoryFrameManager
SharedMemoryFrameManager,
calculate_region,
ensure_jpeg_bytes,
)
from frigate.util.path import get_event_thumbnail_bytes from frigate.util.path import get_event_thumbnail_bytes
from .embeddings import Embeddings from .embeddings import Embeddings
@ -235,20 +223,30 @@ class EmbeddingMaintainer(threading.Thread):
AudioTranscriptionPostProcessor(self.config, self.requestor, metrics) AudioTranscriptionPostProcessor(self.config, self.requestor, metrics)
) )
semantic_trigger_processor: SemanticTriggerProcessor | None = None
if self.config.semantic_search.enabled: if self.config.semantic_search.enabled:
semantic_trigger_processor = SemanticTriggerProcessor(
db,
self.config,
self.requestor,
metrics,
self.embeddings,
)
self.post_processors.append(semantic_trigger_processor)
if any(c.objects.genai.enabled_in_config for c in self.config.cameras.values()):
self.post_processors.append( self.post_processors.append(
SemanticTriggerProcessor( ObjectDescriptionProcessor(
db,
self.config, self.config,
self.requestor,
metrics,
self.embeddings, self.embeddings,
self.requestor,
self.metrics,
self.genai_client,
semantic_trigger_processor,
) )
) )
self.stop_event = stop_event self.stop_event = stop_event
self.tracked_events: dict[str, list[Any]] = {}
self.early_request_sent: dict[str, bool] = {}
# recordings data # recordings data
self.recordings_available_through: dict[str, float] = {} self.recordings_available_through: dict[str, float] = {}
@ -337,11 +335,8 @@ class EmbeddingMaintainer(threading.Thread):
camera_config = self.config.cameras[camera] camera_config = self.config.cameras[camera]
# no need to process updated objects if face recognition, lpr, genai are disabled # no need to process updated objects if no processors are active
if ( if len(self.realtime_processors) == 0 and len(self.post_processors) == 0:
not camera_config.objects.genai.enabled
and len(self.realtime_processors) == 0
):
return return
# Create our own thumbnail based on the bounding box and the frame time # Create our own thumbnail based on the bounding box and the frame time
@ -361,57 +356,17 @@ class EmbeddingMaintainer(threading.Thread):
for processor in self.realtime_processors: for processor in self.realtime_processors:
processor.process_frame(data, yuv_frame) processor.process_frame(data, yuv_frame)
# no need to save our own thumbnails if genai is not enabled for processor in self.post_processors:
# or if the object has become stationary if isinstance(processor, ObjectDescriptionProcessor):
if self.genai_client is not None and not data["stationary"]: processor.process_data(
if data["id"] not in self.tracked_events: {
self.tracked_events[data["id"]] = [] "camera": camera,
"data": data,
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) "state": "update",
"yuv_frame": yuv_frame,
# Limit the number of thumbnails saved },
if len(self.tracked_events[data["id"]]) >= MAX_THUMBNAILS: PostProcessDataEnum.tracked_object,
# Always keep the first thumbnail for the event )
self.tracked_events[data["id"]].pop(1)
self.tracked_events[data["id"]].append(data)
# check if we're configured to send an early request after a minimum number of updates received
if (
self.genai_client is not None
and camera_config.objects.genai.send_triggers.after_significant_updates
):
if (
len(self.tracked_events.get(data["id"], []))
>= camera_config.objects.genai.send_triggers.after_significant_updates
and data["id"] not in self.early_request_sent
):
if data["has_clip"] and data["has_snapshot"]:
event: Event = Event.get(Event.id == data["id"])
if (
not camera_config.objects.genai.objects
or event.label in camera_config.objects.genai.objects
) and (
not camera_config.objects.genai.required_zones
or set(data["entered_zones"])
& set(camera_config.objects.genai.required_zones)
):
logger.debug(f"{camera} sending early request to GenAI")
self.early_request_sent[data["id"]] = True
threading.Thread(
target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}",
daemon=True,
args=(
event,
[
data["thumbnail"]
for data in self.tracked_events[data["id"]]
],
),
).start()
self.frame_manager.close(frame_name) self.frame_manager.close(frame_name)
@ -424,12 +379,13 @@ class EmbeddingMaintainer(threading.Thread):
break break
event_id, camera, updated_db = ended event_id, camera, updated_db = ended
camera_config = self.config.cameras[camera]
# expire in realtime processors # expire in realtime processors
for processor in self.realtime_processors: for processor in self.realtime_processors:
processor.expire_object(event_id, camera) processor.expire_object(event_id, camera)
thumbnail: bytes | None = None
if updated_db: if updated_db:
try: try:
event: Event = Event.get(Event.id == event_id) event: Event = Event.get(Event.id == event_id)
@ -446,23 +402,6 @@ class EmbeddingMaintainer(threading.Thread):
# Embed the thumbnail # Embed the thumbnail
self._embed_thumbnail(event_id, thumbnail) self._embed_thumbnail(event_id, thumbnail)
# Run GenAI
if (
camera_config.objects.genai.enabled
and camera_config.objects.genai.send_triggers.tracked_object_end
and self.genai_client is not None
and (
not camera_config.objects.genai.objects
or event.label in camera_config.objects.genai.objects
)
and (
not camera_config.objects.genai.required_zones
or set(event.zones)
& set(camera_config.objects.genai.required_zones)
)
):
self._process_genai_description(event, camera_config, thumbnail)
# call any defined post processors # call any defined post processors
for processor in self.post_processors: for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor): if isinstance(processor, LicensePlatePostProcessor):
@ -492,16 +431,25 @@ class EmbeddingMaintainer(threading.Thread):
{"event_id": event_id, "camera": camera, "type": "image"}, {"event_id": event_id, "camera": camera, "type": "image"},
PostProcessDataEnum.tracked_object, PostProcessDataEnum.tracked_object,
) )
elif isinstance(processor, ObjectDescriptionProcessor):
if not updated_db:
continue
processor.process_data(
{
"event": event,
"camera": camera,
"state": "finalize",
"thumbnail": thumbnail,
},
PostProcessDataEnum.tracked_object,
)
else: else:
processor.process_data( processor.process_data(
{"event_id": event_id, "camera": camera}, {"event_id": event_id, "camera": camera},
PostProcessDataEnum.tracked_object, PostProcessDataEnum.tracked_object,
) )
# Delete tracked events based on the event_id
if event_id in self.tracked_events:
del self.tracked_events[event_id]
def _expire_dedicated_lpr(self) -> None: def _expire_dedicated_lpr(self) -> None:
"""Remove plates not seen for longer than expiration timeout for dedicated lpr cameras.""" """Remove plates not seen for longer than expiration timeout for dedicated lpr cameras."""
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
@ -570,9 +518,16 @@ class EmbeddingMaintainer(threading.Thread):
event_id, source, force = payload event_id, source, force = payload
if event_id: if event_id:
self.handle_regenerate_description( for processor in self.post_processors:
event_id, RegenerateDescriptionEnum(source), force if isinstance(processor, ObjectDescriptionProcessor):
) processor.handle_request(
"regenerate_description",
{
"event_id": event_id,
"source": RegenerateDescriptionEnum(source),
"force": force,
},
)
def _process_frame_updates(self) -> None: def _process_frame_updates(self) -> None:
"""Process event updates""" """Process event updates"""
@ -622,208 +577,9 @@ class EmbeddingMaintainer(threading.Thread):
self.frame_manager.close(frame_name) self.frame_manager.close(frame_name)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
region = calculate_region(
frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4
)
frame = frame[region[1] : region[3], region[0] : region[2]]
width = int(height * frame.shape[1] / frame.shape[0])
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if ret:
return jpg.tobytes()
return None
def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None: def _embed_thumbnail(self, event_id: str, thumbnail: bytes) -> None:
"""Embed the thumbnail for an event.""" """Embed the thumbnail for an event."""
if not self.config.semantic_search.enabled: if not self.config.semantic_search.enabled:
return return
self.embeddings.embed_thumbnail(event_id, thumbnail) self.embeddings.embed_thumbnail(event_id, thumbnail)
def _process_genai_description(
self, event: Event, camera_config: CameraConfig, thumbnail
) -> None:
if event.has_snapshot and camera_config.objects.genai.use_snapshot:
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
if not snapshot_image:
return
num_thumbnails = len(self.tracked_events.get(event.id, []))
# ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail)
embed_image = (
[snapshot_image]
if event.has_snapshot and camera_config.objects.genai.use_snapshot
else (
[data["thumbnail"] for data in self.tracked_events[event.id]]
if num_thumbnails > 0
else [thumbnail]
)
)
if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0:
logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}")
Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir(
parents=True, exist_ok=True
)
for idx, data in enumerate(self.tracked_events[event.id], 1):
jpg_bytes: bytes = data["thumbnail"]
if jpg_bytes is None:
logger.warning(f"Unable to save thumbnail {idx} for {event.id}.")
else:
with open(
os.path.join(
CLIPS_DIR,
f"genai-requests/{event.id}/{idx}.jpg",
),
"wb",
) as j:
j.write(jpg_bytes)
# Generate the description. Call happens in a thread since it is network bound.
threading.Thread(
target=self._genai_embed_description,
name=f"_genai_embed_description_{event.id}",
daemon=True,
args=(
event,
embed_image,
),
).start()
def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None:
"""Embed the description for an event."""
camera_config = self.config.cameras[event.camera]
description = self.genai_client.generate_object_description(
camera_config, thumbnails, event
)
if not description:
logger.debug("Failed to generate description for %s", event.id)
return
# fire and forget description update
self.requestor.send_data(
UPDATE_EVENT_DESCRIPTION,
{
"type": TrackedObjectUpdateTypesEnum.description,
"id": event.id,
"description": description,
"camera": event.camera,
},
)
# Embed the description
if self.config.semantic_search.enabled:
self.embeddings.embed_description(event.id, description)
# Check semantic trigger for this description
for processor in self.post_processors:
if isinstance(processor, SemanticTriggerProcessor):
processor.process_data(
{"event_id": event.id, "camera": event.camera, "type": "text"},
PostProcessDataEnum.tracked_object,
)
else:
continue
logger.debug(
"Generated description for %s (%d images): %s",
event.id,
len(thumbnails),
description,
)
def _read_and_crop_snapshot(self, event: Event, camera_config) -> bytes | None:
"""Read, decode, and crop the snapshot image."""
snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
if not os.path.isfile(snapshot_file):
logger.error(
f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}"
)
return None
try:
with open(snapshot_file, "rb") as image_file:
snapshot_image = image_file.read()
img = cv2.imdecode(
np.frombuffer(snapshot_image, dtype=np.int8),
cv2.IMREAD_COLOR,
)
# Crop snapshot based on region
# provide full image if region doesn't exist (manual events)
height, width = img.shape[:2]
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
"region", [0, 0, 1, 1]
)
x1, y1 = int(x1_rel * width), int(y1_rel * height)
cropped_image = img[
y1 : y1 + int(height_rel * height),
x1 : x1 + int(width_rel * width),
]
_, buffer = cv2.imencode(".jpg", cropped_image)
return buffer.tobytes()
except Exception:
return None
def handle_regenerate_description(
self, event_id: str, source: str, force: bool
) -> None:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
logger.error(f"Event {event_id} not found for description regeneration")
return
if self.genai_client is None:
logger.error("GenAI not enabled")
return
camera_config = self.config.cameras[event.camera]
if not camera_config.objects.genai.enabled and not force:
logger.error(f"GenAI not enabled for camera {event.camera}")
return
thumbnail = get_event_thumbnail_bytes(event)
# ensure we have a jpeg to pass to the model
thumbnail = ensure_jpeg_bytes(thumbnail)
logger.debug(
f"Trying {source} regeneration for {event}, has_snapshot: {event.has_snapshot}"
)
if event.has_snapshot and source == "snapshot":
snapshot_image = self._read_and_crop_snapshot(event, camera_config)
if not snapshot_image:
return
embed_image = (
[snapshot_image]
if event.has_snapshot and source == "snapshot"
else (
[data["thumbnail"] for data in self.tracked_events[event_id]]
if len(self.tracked_events.get(event_id, [])) > 0
else [thumbnail]
)
)
self._genai_embed_description(event, embed_image)

View File

@ -32,7 +32,7 @@ def register_genai_provider(key: GenAIProviderEnum):
class GenAIClient: class GenAIClient:
"""Generative AI client for Frigate.""" """Generative AI client for Frigate."""
def __init__(self, genai_config: GenAIConfig, timeout: int = 60) -> None: def __init__(self, genai_config: GenAIConfig, timeout: int = 120) -> None:
self.genai_config: GenAIConfig = genai_config self.genai_config: GenAIConfig = genai_config
self.timeout = timeout self.timeout = timeout
self.provider = self._init_provider() self.provider = self._init_provider()

View File

@ -361,6 +361,14 @@ def stats_snapshot(
embeddings_metrics.review_desc_dps.value, 2 embeddings_metrics.review_desc_dps.value, 2
) )
if embeddings_metrics.object_desc_speed.value > 0.0:
stats["embeddings"]["object_description_speed"] = round(
embeddings_metrics.object_desc_speed.value * 1000, 2
)
stats["embeddings"]["object_descriptions"] = round(
embeddings_metrics.object_desc_dps.value, 2
)
for key in embeddings_metrics.classification_speeds.keys(): for key in embeddings_metrics.classification_speeds.keys():
stats["embeddings"][f"{key}_classification_speed"] = round( stats["embeddings"][f"{key}_classification_speed"] = round(
embeddings_metrics.classification_speeds[key].value * 1000, 2 embeddings_metrics.classification_speeds[key].value * 1000, 2

View File

@ -995,7 +995,26 @@ def get_histogram(image, x_min, y_min, x_max, y_max):
return cv2.normalize(hist, hist).flatten() return cv2.normalize(hist, hist).flatten()
def ensure_jpeg_bytes(image_data): def create_thumbnail(
yuv_frame: np.ndarray, box: tuple[int, int, int, int], height=500
) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
region = calculate_region(
frame.shape, box[0], box[1], box[2], box[3], height, multiplier=1.4
)
frame = frame[region[1] : region[3], region[0] : region[2]]
width = int(height * frame.shape[1] / frame.shape[0])
frame = cv2.resize(frame, dsize=(width, height), interpolation=cv2.INTER_AREA)
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if ret:
return jpg.tobytes()
return None
def ensure_jpeg_bytes(image_data: bytes) -> bytes:
"""Ensure image data is jpeg bytes for genai""" """Ensure image data is jpeg bytes for genai"""
try: try:
img_array = np.frombuffer(image_data, dtype=np.uint8) img_array = np.frombuffer(image_data, dtype=np.uint8)