From 0d0e266eb78dd2b2d006b8ca8b3dbe2a9644b44f Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 23 Sep 2024 12:48:37 -0500 Subject: [PATCH] add endpoint and new zmq pub/sub model --- frigate/api/event.py | 47 ++++++++++++++++++++++++ frigate/comms/event_metadata_updater.py | 48 +++++++++++++++++++++++++ frigate/embeddings/maintainer.py | 38 ++++++++++++++++++++ 3 files changed, 133 insertions(+) create mode 100644 frigate/comms/event_metadata_updater.py diff --git a/frigate/api/event.py b/frigate/api/event.py index a49b8942d..182916efa 100644 --- a/frigate/api/event.py +++ b/frigate/api/event.py @@ -22,6 +22,10 @@ from peewee import JOIN, DoesNotExist, fn, operator from PIL import Image from playhouse.shortcuts import model_to_dict +from frigate.comms.event_metadata_updater import ( + EventMetadataPublisher, + EventMetadataTypeEnum, +) from frigate.const import ( CLIPS_DIR, ) @@ -945,6 +949,49 @@ def set_description(id): ) +@EventBp.route("/events//description/regenerate", methods=["PUT"]) +def regenerate_description(id): + event_metadata_updater = EventMetadataPublisher( + EventMetadataTypeEnum.regenerate_description + ) + + # try: + # event: Event = Event.get(Event.id == id) + # except DoesNotExist: + # return make_response( + # jsonify({"success": False, "message": "Event " + id + " not found"}), 404 + # ) + + # if ( + # current_app.frigate_config.semantic_search.enabled + # and current_app.frigate_config.genai.enabled + # ): + logger.info(id) + event_metadata_updater.publish(id) + + return make_response( + jsonify( + { + "success": True, + "message": "Event " + + id + + " description regeneration has been requested.", + } + ), + 200, + ) + + return make_response( + jsonify( + { + "success": False, + "message": "Semantic search and generative AI are not enabled", + } + ), + 400, + ) + + @EventBp.route("/events/", methods=("DELETE",)) def delete_event(id): try: diff --git a/frigate/comms/event_metadata_updater.py b/frigate/comms/event_metadata_updater.py new file mode 100644 index 000000000..84f50168c --- /dev/null +++ b/frigate/comms/event_metadata_updater.py @@ -0,0 +1,48 @@ +"""Facilitates communication between processes.""" + +import logging +from enum import Enum +from typing import Optional + +from .zmq_proxy import Publisher, Subscriber + +logger = logging.getLogger(__name__) + + +class EventMetadataTypeEnum(str, Enum): + regenerate_description = "regenerate_description" + + +class EventMetadataPublisher(Publisher): + """Simplifies receiving event metadata.""" + + topic_base = "event_metadata/" + + def __init__(self, topic: EventMetadataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) + + def publish(self, payload: str) -> None: + logger.info(f"publishing payload: {payload}") + super().publish(payload) + + +class EventMetadataSubscriber(Subscriber): + """Simplifies receiving event metadata.""" + + topic_base = "event_metadata/" + + def __init__(self, topic: EventMetadataTypeEnum) -> None: + topic = topic.value + logger.info(f"subscribing to: {topic.value}") + super().__init__(topic) + + def check_for_update( + self, timeout: float = None + ) -> Optional[tuple[EventMetadataTypeEnum, any]]: + return super().check_for_update(timeout) + + def _return_object(self, topic: str, payload: any) -> any: + if payload is None: + return (None, None) + return (EventMetadataTypeEnum[topic[len(self.topic_base) :]], payload) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index eee0d2994..1a37b79ce 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -12,6 +12,10 @@ import numpy as np from peewee import DoesNotExist from PIL import Image +from frigate.comms.event_metadata_updater import ( + EventMetadataSubscriber, + EventMetadataTypeEnum, +) from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig @@ -40,6 +44,9 @@ class EmbeddingMaintainer(threading.Thread): self.embeddings = Embeddings() self.event_subscriber = EventUpdateSubscriber() self.event_end_subscriber = EventEndSubscriber() + self.event_metadata_subscriber = EventMetadataSubscriber( + EventMetadataTypeEnum.regenerate_description + ) self.frame_manager = SharedMemoryFrameManager() # create communication for updating event descriptions self.requestor = InterProcessRequestor() @@ -52,9 +59,11 @@ class EmbeddingMaintainer(threading.Thread): while not self.stop_event.is_set(): self._process_updates() self._process_finalized() + self._process_event_metadata() self.event_subscriber.stop() self.event_end_subscriber.stop() + self.event_metadata_subscriber.stop() self.requestor.stop() logger.info("Exiting embeddings maintenance...") @@ -140,6 +149,18 @@ class EmbeddingMaintainer(threading.Thread): if event_id in self.tracked_events: del self.tracked_events[event_id] + def _process_event_metadata(self, event_id): + # Check for regenerate description requests + (topic, event_id) = self.event_metadata_subscriber.check_for_update() + logger.info(f"in init in maintainer, {topic} {event_id}") + + if not topic: + return + + if event_id: + logger.info(f"in maintainer: {event_id}") + # self.handle_regenerate_description(event_id) + def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]: """Return jpg thumbnail of a region of the frame.""" frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2BGR_I420) @@ -200,3 +221,20 @@ class EmbeddingMaintainer(threading.Thread): len(thumbnails), description, ) + + def handle_regenerate_description(self, event_id: str) -> None: + try: + event: Event = Event.get(Event.id == event_id) + except DoesNotExist: + logger.error(f"Event {event_id} not found for description regeneration") + return + + camera_config = self.config.cameras[event.camera] + if not camera_config.genai.enabled or self.genai_client is None: + logger.error(f"GenAI not enabled for camera {event.camera}") + return + + metadata = get_metadata(event) + thumbnail = base64.b64decode(event.thumbnail) + + self._embed_description(event, [thumbnail], metadata)