frigate/frigate/data_processing/post/review_descriptions.py
Nicolas Mowen d41ee4ff88
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Miscellaneous Fixes (#20848)
* Fix filtering for classification

* Adjust prompt to account for response tokens

* Correctly return response for reprocess

* Use API response to update data instead of trying to re-parse all of the values

* Implement rename class api

* Fix model deletion / rename dialog

* Remove camera spatial context

* Catch error
2025-11-08 13:13:40 -07:00

478 lines
16 KiB
Python

"""Post processor for review items to get descriptions."""
import copy
import datetime
import logging
import math
import os
import shutil
import threading
from pathlib import Path
from typing import Any
import cv2
from peewee import DoesNotExist
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.camera.review import GenAIReviewConfig, ImageSourceEnum
from frigate.const import CACHE_DIR, CLIPS_DIR, UPDATE_REVIEW_DESCRIPTION
from frigate.data_processing.types import PostProcessDataEnum
from frigate.genai import GenAIClient
from frigate.models import Recordings, ReviewSegment
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from frigate.util.image import get_image_from_recording
from ..post.api import PostProcessorApi
from ..types import DataProcessorMetrics
logger = logging.getLogger(__name__)
RECORDING_BUFFER_EXTENSION_PERCENT = 0.10
class ReviewDescriptionProcessor(PostProcessorApi):
def __init__(
self,
config: FrigateConfig,
requestor: InterProcessRequestor,
metrics: DataProcessorMetrics,
client: GenAIClient,
):
super().__init__(config, metrics, None)
self.requestor = requestor
self.metrics = metrics
self.genai_client = client
self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed)
self.review_descs_dps = EventsPerSecond()
self.review_descs_dps.start()
def calculate_frame_count(
self,
camera: str,
image_source: ImageSourceEnum = ImageSourceEnum.preview,
height: int = 480,
) -> int:
"""Calculate optimal number of frames based on context size, image source, and resolution.
Token usage varies by resolution: larger images (ultrawide aspect ratios) use more tokens.
Estimates ~1 token per 1250 pixels. Targets 98% context utilization with safety margin.
Capped at 20 frames.
"""
context_size = self.genai_client.get_context_size()
camera_config = self.config.cameras[camera]
detect_width = camera_config.detect.width
detect_height = camera_config.detect.height
aspect_ratio = detect_width / detect_height
if image_source == ImageSourceEnum.recordings:
if aspect_ratio >= 1:
# Landscape or square: constrain height
width = int(height * aspect_ratio)
else:
# Portrait: constrain width
width = height
height = int(width / aspect_ratio)
else:
if aspect_ratio >= 1:
# Landscape or square: constrain height
target_height = 180
width = int(target_height * aspect_ratio)
height = target_height
else:
# Portrait: constrain width
target_width = 180
width = target_width
height = int(target_width / aspect_ratio)
pixels_per_image = width * height
tokens_per_image = pixels_per_image / 1250
prompt_tokens = 3500
response_tokens = 300
available_tokens = context_size - prompt_tokens - response_tokens
max_frames = int(available_tokens / tokens_per_image)
return min(max(max_frames, 3), 20)
def process_data(self, data, data_type):
self.metrics.review_desc_dps.value = self.review_descs_dps.eps()
if data_type != PostProcessDataEnum.review:
return
camera = data["after"]["camera"]
camera_config = self.config.cameras[camera]
if not camera_config.review.genai.enabled:
return
id = data["after"]["id"]
if data["type"] == "new" or data["type"] == "update":
return
else:
final_data = data["after"]
if (
final_data["severity"] == "alert"
and not camera_config.review.genai.alerts
):
return
elif (
final_data["severity"] == "detection"
and not camera_config.review.genai.detections
):
return
image_source = camera_config.review.genai.image_source
if image_source == ImageSourceEnum.recordings:
duration = final_data["end_time"] - final_data["start_time"]
buffer_extension = duration * RECORDING_BUFFER_EXTENSION_PERCENT
thumbs = self.get_recording_frames(
camera,
final_data["start_time"] - buffer_extension,
final_data["end_time"] + buffer_extension,
height=480, # Use 480p for good balance between quality and token usage
)
if not thumbs:
# Fallback to preview frames if no recordings available
logger.warning(
f"No recording frames found for {camera}, falling back to preview frames"
)
thumbs = self.get_preview_frames_as_bytes(
camera,
final_data["start_time"],
final_data["end_time"],
final_data["thumb_path"],
id,
camera_config.review.genai.debug_save_thumbnails,
)
elif camera_config.review.genai.debug_save_thumbnails:
# Save debug thumbnails for recordings
Path(os.path.join(CLIPS_DIR, "genai-requests", id)).mkdir(
parents=True, exist_ok=True
)
for idx, frame_bytes in enumerate(thumbs):
with open(
os.path.join(CLIPS_DIR, f"genai-requests/{id}/{idx}.jpg"),
"wb",
) as f:
f.write(frame_bytes)
else:
# Use preview frames
thumbs = self.get_preview_frames_as_bytes(
camera,
final_data["start_time"],
final_data["end_time"],
final_data["thumb_path"],
id,
camera_config.review.genai.debug_save_thumbnails,
)
# kickoff analysis
self.review_descs_dps.update()
threading.Thread(
target=run_analysis,
args=(
self.requestor,
self.genai_client,
self.review_desc_speed,
camera,
final_data,
thumbs,
camera_config.review.genai,
list(self.config.model.merged_labelmap.values()),
self.config.model.all_attributes,
),
).start()
def handle_request(self, topic, request_data):
if topic == EmbeddingsRequestEnum.summarize_review.value:
start_ts = request_data["start_ts"]
end_ts = request_data["end_ts"]
logger.debug(
f"Found GenAI Review Summary request for {start_ts} to {end_ts}"
)
items: list[dict[str, Any]] = [
r["data"]["metadata"]
for r in (
ReviewSegment.select(ReviewSegment.data)
.where(
(ReviewSegment.data["metadata"].is_null(False))
& (ReviewSegment.start_time < end_ts)
& (ReviewSegment.end_time > start_ts)
)
.order_by(ReviewSegment.start_time.asc())
.dicts()
.iterator()
)
]
if len(items) == 0:
logger.debug("No review items with metadata found during time period")
return "No activity was found during this time."
important_items = list(
filter(
lambda item: item.get("potential_threat_level", 0) > 0
or item.get("other_concerns"),
items,
)
)
if not important_items:
return "No concerns were found during this time period."
if self.config.review.genai.debug_save_thumbnails:
Path(
os.path.join(CLIPS_DIR, "genai-requests", f"{start_ts}-{end_ts}")
).mkdir(parents=True, exist_ok=True)
return self.genai_client.generate_review_summary(
start_ts,
end_ts,
important_items,
self.config.review.genai.debug_save_thumbnails,
)
else:
return None
def get_cache_frames(
self,
camera: str,
start_time: float,
end_time: float,
) -> list[str]:
preview_dir = os.path.join(CACHE_DIR, "preview_frames")
file_start = f"preview_{camera}"
start_file = f"{file_start}-{start_time}.webp"
end_file = f"{file_start}-{end_time}.webp"
all_frames = []
for file in sorted(os.listdir(preview_dir)):
if not file.startswith(file_start):
continue
if file < start_file:
if len(all_frames):
all_frames[0] = os.path.join(preview_dir, file)
else:
all_frames.append(os.path.join(preview_dir, file))
continue
if file > end_file:
all_frames.append(os.path.join(preview_dir, file))
break
all_frames.append(os.path.join(preview_dir, file))
frame_count = len(all_frames)
desired_frame_count = self.calculate_frame_count(camera)
if frame_count <= desired_frame_count:
return all_frames
selected_frames = []
step_size = (frame_count - 1) / (desired_frame_count - 1)
for i in range(desired_frame_count):
index = round(i * step_size)
selected_frames.append(all_frames[index])
return selected_frames
def get_recording_frames(
self,
camera: str,
start_time: float,
end_time: float,
height: int = 480,
) -> list[bytes]:
"""Get frames from recordings at specified timestamps."""
duration = end_time - start_time
desired_frame_count = self.calculate_frame_count(
camera, ImageSourceEnum.recordings, height
)
# Calculate evenly spaced timestamps throughout the duration
if desired_frame_count == 1:
timestamps = [start_time + duration / 2]
else:
step = duration / (desired_frame_count - 1)
timestamps = [start_time + (i * step) for i in range(desired_frame_count)]
def extract_frame_from_recording(ts: float) -> bytes | None:
"""Extract a single frame from recording at given timestamp."""
try:
recording = (
Recordings.select(
Recordings.path,
Recordings.start_time,
)
.where((ts >= Recordings.start_time) & (ts <= Recordings.end_time))
.where(Recordings.camera == camera)
.order_by(Recordings.start_time.desc())
.limit(1)
.get()
)
time_in_segment = ts - recording.start_time
return get_image_from_recording(
self.config.ffmpeg,
recording.path,
time_in_segment,
"mjpeg",
height=height,
)
except DoesNotExist:
return None
frames = []
for timestamp in timestamps:
try:
# Try to extract frame at exact timestamp
image_data = extract_frame_from_recording(timestamp)
if not image_data:
# Try with rounded timestamp as fallback
rounded_timestamp = math.ceil(timestamp)
image_data = extract_frame_from_recording(rounded_timestamp)
if image_data:
frames.append(image_data)
else:
logger.warning(
f"No recording found for {camera} at timestamp {timestamp}"
)
except Exception as e:
logger.error(
f"Error extracting frame from recording for {camera} at {timestamp}: {e}"
)
continue
return frames
def get_preview_frames_as_bytes(
self,
camera: str,
start_time: float,
end_time: float,
thumb_path_fallback: str,
review_id: str,
save_debug: bool,
) -> list[bytes]:
"""Get preview frames and convert them to JPEG bytes.
Args:
camera: Camera name
start_time: Start timestamp
end_time: End timestamp
thumb_path_fallback: Fallback thumbnail path if no preview frames found
review_id: Review item ID for debug saving
save_debug: Whether to save debug thumbnails
Returns:
List of JPEG image bytes
"""
frame_paths = self.get_cache_frames(camera, start_time, end_time)
if not frame_paths:
frame_paths = [thumb_path_fallback]
thumbs = []
for idx, thumb_path in enumerate(frame_paths):
thumb_data = cv2.imread(thumb_path)
ret, jpg = cv2.imencode(
".jpg", thumb_data, [int(cv2.IMWRITE_JPEG_QUALITY), 100]
)
if ret:
thumbs.append(jpg.tobytes())
if save_debug:
Path(os.path.join(CLIPS_DIR, "genai-requests", review_id)).mkdir(
parents=True, exist_ok=True
)
shutil.copy(
thumb_path,
os.path.join(CLIPS_DIR, f"genai-requests/{review_id}/{idx}.webp"),
)
return thumbs
@staticmethod
def run_analysis(
requestor: InterProcessRequestor,
genai_client: GenAIClient,
review_inference_speed: InferenceSpeed,
camera: str,
final_data: dict[str, str],
thumbs: list[bytes],
genai_config: GenAIReviewConfig,
labelmap_objects: list[str],
attribute_labels: list[str],
) -> None:
start = datetime.datetime.now().timestamp()
analytics_data = {
"id": final_data["id"],
"camera": camera,
"zones": final_data["data"]["zones"],
"start": datetime.datetime.fromtimestamp(final_data["start_time"]).strftime(
"%A, %I:%M %p"
),
"duration": round(final_data["end_time"] - final_data["start_time"]),
}
unified_objects = []
objects_list = final_data["data"]["objects"]
sub_labels_list = final_data["data"]["sub_labels"]
for i, verified_label in enumerate(final_data["data"]["verified_objects"]):
object_type = verified_label.replace("-verified", "").replace("_", " ")
name = sub_labels_list[i].replace("_", " ").title()
unified_objects.append(f"{name} ({object_type})")
for label in objects_list:
if "-verified" in label:
continue
elif label in labelmap_objects:
object_type = label.replace("_", " ").title()
if label in attribute_labels:
unified_objects.append(f"{object_type} (delivery/service)")
else:
unified_objects.append(object_type)
analytics_data["unified_objects"] = unified_objects
metadata = genai_client.generate_review_description(
analytics_data,
thumbs,
genai_config.additional_concerns,
genai_config.preferred_language,
genai_config.debug_save_thumbnails,
genai_config.activity_context_prompt,
)
review_inference_speed.update(datetime.datetime.now().timestamp() - start)
if not metadata:
return None
prev_data = copy.deepcopy(final_data)
final_data["data"]["metadata"] = metadata.model_dump()
requestor.send_data(
UPDATE_REVIEW_DESCRIPTION,
{
"type": "genai",
"before": {k: v for k, v in prev_data.items()},
"after": {k: v for k, v in final_data.items()},
},
)