frigate/frigate/data_processing/post/review_descriptions.py

163 lines
5.2 KiB
Python
Raw Normal View History

"""Post processor for review items to get descriptions."""
2025-08-09 03:14:32 +03:00
import copy
2025-08-09 03:00:23 +03:00
import datetime
import logging
2025-08-09 02:24:52 +03:00
import os
2025-08-07 14:52:51 +03:00
import shutil
2025-08-09 03:14:32 +03:00
import threading
2025-08-09 02:24:52 +03:00
from pathlib import Path
2025-08-03 19:06:38 +03:00
import cv2
2025-08-09 15:11:41 +03:00
from frigate.comms.inter_process import InterProcessRequestor
2025-08-03 19:06:38 +03:00
from frigate.config import FrigateConfig
2025-08-09 15:11:41 +03:00
from frigate.const import CLIPS_DIR, UPDATE_REVIEW_DESCRIPTION
from frigate.data_processing.types import PostProcessDataEnum
2025-08-09 01:57:43 +03:00
from frigate.genai import GenAIClient
2025-08-10 06:47:39 +03:00
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from ..post.api import PostProcessorApi
2025-08-09 15:11:41 +03:00
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
class ReviewDescriptionProcessor(PostProcessorApi):
2025-08-09 15:11:41 +03:00
def __init__(
self,
config: FrigateConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
client: GenAIClient,
):
super().__init__(config, metrics, None)
2025-08-09 15:11:41 +03:00
self.requestor = requestor
self.metrics = metrics
2025-08-03 19:06:38 +03:00
self.tracked_review_items: dict[str, list[tuple[int, bytes]]] = {}
2025-08-09 01:57:43 +03:00
self.genai_client = client
2025-08-10 06:47:39 +03:00
self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed)
self.review_descs_dps = EventsPerSecond()
self.review_descs_dps.start()
def process_data(self, data, data_type):
2025-08-10 06:47:39 +03:00
self.metrics.review_desc_dps.value = self.review_descs_dps.eps()
if data_type != PostProcessDataEnum.review:
return
2025-08-03 19:06:38 +03:00
id = data["after"]["id"]
if data["type"] == "new" or data["type"] == "update":
if id not in self.tracked_review_items:
self.tracked_review_items[id] = []
thumb_time = data["after"]["data"]["thumb_time"]
thumb_path = data["after"]["thumb_path"]
if thumb_time and thumb_path:
2025-08-03 22:47:46 +03:00
if (
len(self.tracked_review_items[id]) > 0
and self.tracked_review_items[id][0] == thumb_time
):
# we have already processed this thumbnail
return
2025-08-03 19:06:38 +03:00
thumb_data = cv2.imread(thumb_path)
ret, jpg = cv2.imencode(
".jpg", thumb_data, [int(cv2.IMWRITE_JPEG_QUALITY), 100]
)
if ret:
self.tracked_review_items[id].append((thumb_time, jpg.tobytes()))
2025-08-09 02:24:52 +03:00
2025-08-09 02:25:16 +03:00
if self.config.cameras[
data["after"]["camera"]
].review.genai.debug_save_thumbnails:
2025-08-09 02:24:52 +03:00
id = data["after"]["id"]
Path(os.path.join(CLIPS_DIR, f"genai-requests/{id}")).mkdir(
parents=True, exist_ok=True
)
2025-08-09 03:14:32 +03:00
shutil.copy(
thumb_path,
os.path.join(
CLIPS_DIR,
f"genai-requests/{id}/{thumb_time}.webp",
),
)
2025-08-09 02:24:52 +03:00
2025-08-03 19:06:38 +03:00
else:
2025-08-03 22:47:46 +03:00
if id not in self.tracked_review_items:
return
final_data = data["after"]
2025-08-09 01:53:54 +03:00
camera = final_data["camera"]
2025-08-09 01:57:43 +03:00
if (
2025-08-09 02:48:15 +03:00
final_data["severity"] == "alert"
2025-08-09 01:57:43 +03:00
and not self.config.cameras[camera].review.genai.alerts
):
2025-08-09 01:53:54 +03:00
self.tracked_review_items.pop(id)
return
2025-08-09 01:57:43 +03:00
elif (
2025-08-09 02:48:15 +03:00
final_data["severity"] == "detection"
2025-08-09 04:03:19 +03:00
and not self.config.cameras[camera].review.genai.detections
2025-08-09 01:57:43 +03:00
):
2025-08-09 01:53:54 +03:00
self.tracked_review_items.pop(id)
return
2025-08-09 03:14:32 +03:00
# kickoff analysis
2025-08-10 06:47:39 +03:00
self.review_descs_dps.update()
2025-08-09 03:14:32 +03:00
threading.Thread(
2025-08-09 15:02:00 +03:00
target=run_analysis,
2025-08-09 03:14:32 +03:00
args=(
2025-08-09 15:29:29 +03:00
self.requestor,
2025-08-09 03:14:32 +03:00
self.genai_client,
2025-08-10 06:47:39 +03:00
self.review_desc_speed,
2025-08-09 03:14:32 +03:00
camera,
final_data,
copy.copy([r[1] for r in self.tracked_review_items[id]]),
),
).start()
2025-08-03 19:06:38 +03:00
self.tracked_review_items.pop(id)
def handle_request(self, request_data):
pass
2025-08-09 15:02:00 +03:00
@staticmethod
def run_analysis(
2025-08-09 15:11:41 +03:00
requestor: InterProcessRequestor,
2025-08-09 15:02:00 +03:00
genai_client: GenAIClient,
2025-08-10 06:47:39 +03:00
review_inference_speed: InferenceSpeed,
2025-08-09 15:02:00 +03:00
camera: str,
final_data: dict[str, str],
thumbs: list[bytes],
) -> None:
2025-08-10 06:47:39 +03:00
start = datetime.datetime.now().timestamp()
2025-08-09 15:02:00 +03:00
metadata = genai_client.generate_review_description(
{
"camera": camera,
"objects": final_data["data"]["objects"],
"recognized_objects": final_data["data"]["sub_labels"],
"zones": final_data["data"]["zones"],
"timestamp": datetime.datetime.fromtimestamp(final_data["end_time"]),
},
thumbs,
)
2025-08-10 06:47:39 +03:00
review_inference_speed.update(datetime.datetime.now().timestamp() - start)
2025-08-09 15:02:00 +03:00
if not metadata:
return None
2025-08-09 15:11:41 +03:00
prev_data = copy.deepcopy(final_data)
2025-08-09 16:20:57 +03:00
final_data["data"]["metadata"] = metadata.model_dump()
2025-08-09 15:11:41 +03:00
requestor.send_data(
UPDATE_REVIEW_DESCRIPTION,
{
2025-08-09 15:57:35 +03:00
"type": "genai",
2025-08-09 15:11:41 +03:00
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
},
)