add endpoint and new zmq pub/sub model

This commit is contained in:
Josh Hawkins 2024-09-23 12:48:37 -05:00
parent efd9ef2234
commit 0d0e266eb7
3 changed files with 133 additions and 0 deletions

View File

@ -22,6 +22,10 @@ from peewee import JOIN, DoesNotExist, fn, operator
from PIL import Image from PIL import Image
from playhouse.shortcuts import model_to_dict from playhouse.shortcuts import model_to_dict
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.const import ( from frigate.const import (
CLIPS_DIR, CLIPS_DIR,
) )
@ -945,6 +949,49 @@ def set_description(id):
) )
@EventBp.route("/events/<id>/description/regenerate", methods=["PUT"])
def regenerate_description(id):
event_metadata_updater = EventMetadataPublisher(
EventMetadataTypeEnum.regenerate_description
)
# try:
# event: Event = Event.get(Event.id == id)
# except DoesNotExist:
# return make_response(
# jsonify({"success": False, "message": "Event " + id + " not found"}), 404
# )
# if (
# current_app.frigate_config.semantic_search.enabled
# and current_app.frigate_config.genai.enabled
# ):
logger.info(id)
event_metadata_updater.publish(id)
return make_response(
jsonify(
{
"success": True,
"message": "Event "
+ id
+ " description regeneration has been requested.",
}
),
200,
)
return make_response(
jsonify(
{
"success": False,
"message": "Semantic search and generative AI are not enabled",
}
),
400,
)
@EventBp.route("/events/<id>", methods=("DELETE",)) @EventBp.route("/events/<id>", methods=("DELETE",))
def delete_event(id): def delete_event(id):
try: try:

View File

@ -0,0 +1,48 @@
"""Facilitates communication between processes."""
import logging
from enum import Enum
from typing import Optional
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class EventMetadataTypeEnum(str, Enum):
regenerate_description = "regenerate_description"
class EventMetadataPublisher(Publisher):
"""Simplifies receiving event metadata."""
topic_base = "event_metadata/"
def __init__(self, topic: EventMetadataTypeEnum) -> None:
topic = topic.value
super().__init__(topic)
def publish(self, payload: str) -> None:
logger.info(f"publishing payload: {payload}")
super().publish(payload)
class EventMetadataSubscriber(Subscriber):
"""Simplifies receiving event metadata."""
topic_base = "event_metadata/"
def __init__(self, topic: EventMetadataTypeEnum) -> None:
topic = topic.value
logger.info(f"subscribing to: {topic.value}")
super().__init__(topic)
def check_for_update(
self, timeout: float = None
) -> Optional[tuple[EventMetadataTypeEnum, any]]:
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: any) -> any:
if payload is None:
return (None, None)
return (EventMetadataTypeEnum[topic[len(self.topic_base) :]], payload)

View File

@ -12,6 +12,10 @@ import numpy as np
from peewee import DoesNotExist from peewee import DoesNotExist
from PIL import Image from PIL import Image
from frigate.comms.event_metadata_updater import (
EventMetadataSubscriber,
EventMetadataTypeEnum,
)
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
@ -40,6 +44,9 @@ class EmbeddingMaintainer(threading.Thread):
self.embeddings = Embeddings() self.embeddings = Embeddings()
self.event_subscriber = EventUpdateSubscriber() self.event_subscriber = EventUpdateSubscriber()
self.event_end_subscriber = EventEndSubscriber() self.event_end_subscriber = EventEndSubscriber()
self.event_metadata_subscriber = EventMetadataSubscriber(
EventMetadataTypeEnum.regenerate_description
)
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
# create communication for updating event descriptions # create communication for updating event descriptions
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
@ -52,9 +59,11 @@ class EmbeddingMaintainer(threading.Thread):
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self._process_updates() self._process_updates()
self._process_finalized() self._process_finalized()
self._process_event_metadata()
self.event_subscriber.stop() self.event_subscriber.stop()
self.event_end_subscriber.stop() self.event_end_subscriber.stop()
self.event_metadata_subscriber.stop()
self.requestor.stop() self.requestor.stop()
logger.info("Exiting embeddings maintenance...") logger.info("Exiting embeddings maintenance...")
@ -140,6 +149,18 @@ class EmbeddingMaintainer(threading.Thread):
if event_id in self.tracked_events: if event_id in self.tracked_events:
del self.tracked_events[event_id] del self.tracked_events[event_id]
def _process_event_metadata(self, event_id):
# Check for regenerate description requests
(topic, event_id) = self.event_metadata_subscriber.check_for_update()
logger.info(f"in init in maintainer, {topic} {event_id}")
if not topic:
return
if event_id:
logger.info(f"in maintainer: {event_id}")
# self.handle_regenerate_description(event_id)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame.""" """Return jpg thumbnail of a region of the frame."""
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420)
@ -200,3 +221,20 @@ class EmbeddingMaintainer(threading.Thread):
len(thumbnails), len(thumbnails),
description, description,
) )
def handle_regenerate_description(self, event_id: str) -> None:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
logger.error(f"Event {event_id} not found for description regeneration")
return
camera_config = self.config.cameras[event.camera]
if not camera_config.genai.enabled or self.genai_client is None:
logger.error(f"GenAI not enabled for camera {event.camera}")
return
metadata = get_metadata(event)
thumbnail = base64.b64decode(event.thumbnail)
self._embed_description(event, [thumbnail], metadata)