This commit is contained in:
Josh Hawkins 2024-09-23 17:07:27 -05:00
parent 0e312af43b
commit 164f1b8545
4 changed files with 73 additions and 77 deletions

View File

@ -947,31 +947,30 @@ def set_description(id):
@EventBp.route("/events/<id>/description/regenerate", methods=["PUT"])
def regenerate_description(id):
# try:
# event: Event = Event.get(Event.id == id)
# except DoesNotExist:
# return make_response(
# jsonify({"success": False, "message": "Event " + id + " not found"}), 404
# )
try:
event: Event = Event.get(Event.id == id)
except DoesNotExist:
return make_response(
jsonify({"success": False, "message": "Event " + id + " not found"}), 404
)
# if (
# current_app.frigate_config.semantic_search.enabled
# and current_app.frigate_config.genai.enabled
# ):
logger.info(id)
current_app.event_metadata_updater.publish(id)
if (
current_app.frigate_config.semantic_search.enabled
and current_app.frigate_config.genai.enabled
):
current_app.event_metadata_updater.publish(event.id)
return make_response(
jsonify(
{
"success": True,
"message": "Event "
+ id
+ " description regeneration has been requested.",
}
),
200,
)
return make_response(
jsonify(
{
"success": True,
"message": "Event "
+ id
+ " description regeneration has been requested.",
}
),
200,
)
return make_response(
jsonify(

View File

@ -376,7 +376,9 @@ class FrigateApp:
def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator()
self.inter_config_updater = ConfigPublisher()
self.event_metadata_updater = EventMetadataPublisher(EventMetadataTypeEnum.all)
self.event_metadata_updater = EventMetadataPublisher(
EventMetadataTypeEnum.regenerate_description
)
self.inter_zmq_proxy = ZmqProxy()
def init_web_server(self) -> None:

View File

@ -37,9 +37,11 @@ class EventMetadataSubscriber(Subscriber):
def check_for_update(
self, timeout: float = None
) -> Optional[tuple[EventMetadataTypeEnum, any]]:
print(f"checking for update, timeout {timeout}")
return super().check_for_update(timeout)
def _return_object(self, topic: str, payload: any) -> any:
print(topic, payload)
if payload is None:
return (None, None)
return (EventMetadataTypeEnum[topic[len(self.topic_base) :]], payload)

View File

@ -70,14 +70,11 @@ class EmbeddingMaintainer(threading.Thread):
def _process_updates(self) -> None:
"""Process event updates"""
logger.info("processing event updates")
update = self.event_subscriber.check_for_update(timeout=1)
update = self.event_subscriber.check_for_update()
if update is None:
return
logger.info("not returning from _process_updates")
source_type, _, camera, data = update
if not camera or source_type != EventTypeEnum.tracked_object:
@ -101,71 +98,67 @@ class EmbeddingMaintainer(threading.Thread):
def _process_finalized(self) -> None:
"""Process the end of an event."""
while True:
ended = self.event_end_subscriber.check_for_update()
logger.info("processing finalized")
ended = self.event_end_subscriber.check_for_update(timeout=1)
if ended == None:
break
if ended is None:
return
event_id, camera, updated_db = ended
camera_config = self.config.cameras[camera]
logger.info("not returning from _process_finalized")
event_id, camera, updated_db = ended
camera_config = self.config.cameras[camera]
if updated_db:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
continue
if updated_db:
try:
event: Event = Event.get(Event.id == event_id)
except DoesNotExist:
return
# Skip the event if not an object
if event.data.get("type") != "object":
continue
# Skip the event if not an object
if event.data.get("type") != "object":
return
# Extract valid event metadata
metadata = get_metadata(event)
thumbnail = base64.b64decode(event.thumbnail)
# Extract valid event metadata
metadata = get_metadata(event)
thumbnail = base64.b64decode(event.thumbnail)
# Embed the thumbnail
self._embed_thumbnail(event_id, thumbnail, metadata)
# Embed the thumbnail
self._embed_thumbnail(event_id, thumbnail, metadata)
if (
camera_config.genai.enabled
and self.genai_client is not None
and event.data.get("description") is None
):
# Generate the description. Call happens in a thread since it is network bound.
threading.Thread(
target=self._embed_description,
name=f"_embed_description_{event.id}",
daemon=True,
args=(
event,
[
data["thumbnail"]
for data in self.tracked_events[event_id]
]
if len(self.tracked_events.get(event_id, [])) > 0
else [thumbnail],
metadata,
),
).start()
if (
camera_config.genai.enabled
and self.genai_client is not None
and event.data.get("description") is None
):
# Generate the description. Call happens in a thread since it is network bound.
threading.Thread(
target=self._embed_description,
name=f"_embed_description_{event.id}",
daemon=True,
args=(
event,
[data["thumbnail"] for data in self.tracked_events[event_id]]
if len(self.tracked_events.get(event_id, [])) > 0
else [thumbnail],
metadata,
),
).start()
# Delete tracked events based on the event_id
if event_id in self.tracked_events:
del self.tracked_events[event_id]
# Delete tracked events based on the event_id
if event_id in self.tracked_events:
del self.tracked_events[event_id]
def _process_event_metadata(self):
# Check for regenerate description requests
logger.info("processing event metadata")
(topic, event_id) = self.event_metadata_subscriber.check_for_update(timeout=1)
logger.info(f"in init in maintainer, {topic} {event_id}")
(topic, event_id) = self.event_metadata_subscriber.check_for_update()
if topic is None:
return
logger.info("not returning from _process_event_metadata")
if event_id:
logger.info(f"in maintainer: {event_id}")
# self.handle_regenerate_description(event_id)
self.handle_regenerate_description(event_id)
def _create_thumbnail(self, yuv_frame, box, height=500) -> Optional[bytes]:
"""Return jpg thumbnail of a region of the frame."""