Generate review item summaries with requests

This commit is contained in:
Nicolas Mowen 2025-08-11 06:58:27 -06:00
parent 2a7525ef20
commit e2532ad6e5
8 changed files with 119 additions and 7 deletions

View File

@ -6,7 +6,7 @@ from functools import reduce
from pathlib import Path
import pandas as pd
from fastapi import APIRouter
from fastapi import APIRouter, Request
from fastapi.params import Depends
from fastapi.responses import JSONResponse
from peewee import Case, DoesNotExist, IntegrityError, fn, operator
@ -26,6 +26,8 @@ from frigate.api.defs.response.review_response import (
ReviewSummaryResponse,
)
from frigate.api.defs.tags import Tags
from frigate.config import FrigateConfig
from frigate.embeddings import EmbeddingsContext
from frigate.models import Recordings, ReviewSegment, UserReviewStatus
from frigate.review.types import SeverityEnum
from frigate.util.builtin import get_tz_modifiers
@ -606,3 +608,35 @@ async def set_not_reviewed(
content=({"success": True, "message": f"Set Review {review_id} as not viewed"}),
status_code=200,
)
@router.post(
"/review/summarize/start/{start_ts}/end/{end_ts}",
description="Use GenAI to summarize review items over a period of time.",
)
def generate_review_summary(request: Request, start_ts: float, end_ts: float):
config: FrigateConfig = request.app.frigate_config
if not config.genai.provider:
return JSONResponse(
content=(
{
"success": False,
"message": "GenAI must be configured to use this feature.",
}
),
status_code=400,
)
context: EmbeddingsContext = request.app.embeddings
summary = context.generate_review_summary(start_ts, end_ts)
if summary:
return JSONResponse(
content=({"success": True, "summary": summary}), status_code=200
)
else:
return JSONResponse(
content=({"success": False, "message": "Failed to create summary."}),
status_code=500,
)

View File

@ -29,6 +29,8 @@ class EmbeddingsRequestEnum(Enum):
reindex = "reindex"
# LPR
reprocess_plate = "reprocess_plate"
# Review Descriptions
summarize_review = "summarize_review"
class EmbeddingsResponder:

View File

@ -22,4 +22,6 @@ class GenAIConfig(FrigateBaseModel):
api_key: Optional[EnvString] = Field(default=None, title="Provider API key.")
base_url: Optional[str] = Field(default=None, title="Provider base url.")
model: str = Field(default="gpt-4o", title="GenAI model.")
provider: GenAIProviderEnum | None = Field(default=None, title="GenAI provider.")
provider: GenAIProviderEnum | None = Field(
default=None, title="GenAI provider."
)

View File

@ -39,7 +39,7 @@ class PostProcessorApi(ABC):
pass
@abstractmethod
def handle_request(self, request_data: dict[str, Any]) -> dict[str, Any] | None:
def handle_request(self, topic: str, request_data: dict[str, Any]) -> dict[str, Any] | None:
"""Handle metadata requests.
Args:
request_data (dict): containing data about requested change to process.

View File

@ -10,11 +10,13 @@ from pathlib import Path
import cv2
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, UPDATE_REVIEW_DESCRIPTION
from frigate.data_processing.types import PostProcessDataEnum
from frigate.genai import GenAIClient
from frigate.models import ReviewSegment
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
from ..post.api import PostProcessorApi
@ -116,8 +118,32 @@ class ReviewDescriptionProcessor(PostProcessorApi):
),
).start()
def handle_request(self, request_data):
pass
def handle_request(self, topic, request_data):
if topic == EmbeddingsRequestEnum.summarize_review.value:
start_ts = request_data["start_ts"]
end_ts = request_data["end_ts"]
items = [
r["data"]["metadata"]
for r in (
ReviewSegment.select(ReviewSegment.data)
.where(
(ReviewSegment.data["metadata"].is_null(False))
& (ReviewSegment.start_time < end_ts)
& (ReviewSegment.end_time > start_ts)
)
.order_by(ReviewSegment.start_time.asc())
.dicts()
.iterator()
)
]
if len(items) == 0:
logger.debug("No review items with metadata found during time period")
return None
return self.genai_client.generate_review_summary(start_ts, end_ts, items)
else:
return None
def get_cache_frames(
self, camera: str, start_time: float, end_time: float

View File

@ -313,3 +313,9 @@ class EmbeddingsContext:
EmbeddingsRequestEnum.embed_thumbnail.value,
{"id": str(event_id), "thumbnail": str(thumbnail), "upsert": False},
)
def generate_review_summary(self, start_ts: float, end_ts: float) -> str | None:
return self.requestor.send_data(
EmbeddingsRequestEnum.summarize_review.value,
{"start_ts": start_ts, "end_ts": end_ts},
)

View File

@ -66,7 +66,7 @@ from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataE
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client
from frigate.models import Event, Recordings, Trigger
from frigate.models import Event, Recordings, ReviewSegment, Trigger
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import (
@ -121,7 +121,7 @@ class EmbeddingMaintainer(threading.Thread):
),
load_vec_extension=True,
)
models = [Event, Recordings, Trigger]
models = [Event, Recordings, ReviewSegment, Trigger]
db.bind(models)
if config.semantic_search.enabled:

View File

@ -1,5 +1,6 @@
"""Generative AI module for Frigate."""
import datetime
import importlib
import logging
import os
@ -112,6 +113,47 @@ Your response **MUST** be a flat JSON object with:
else:
return None
def generate_review_summary(
self, start_ts: float, end_ts: float, segments: list[dict[str, Any]]
) -> str | None:
"""Generate a summary of review item descriptions over a period of time."""
time_range = f"{datetime.datetime.fromtimestamp(start_ts).strftime('%I:%M %p')} to {datetime.datetime.fromtimestamp(end_ts).strftime('%I:%M %p')}"
timeline_summary_prompt = f"""
Analyze security camera metadata for {time_range} and write a professional security report.
INPUT FORMAT: JSON objects with "scene", "confidence", and "potential_threat_level" (0-3).
OUTPUT FORMAT:
Security Summary - {time_range}
[One sentence overview of general activity]
[Chronological timeline with timestamps when available]
[Final threat assessment statement]
REPORT REQUIREMENTS:
- Write chronologically using timestamps
- Highlight any potential_threat_level 2 incidents with times
- Note unusual events even if not threats
- State "only normal activity observed" if no threats detected
- Use factual, professional security language
STRICT RULES:
- Output ONLY the security report
- NO introductory phrases like "Here's a breakdown"
- NO recommendations, suggestions, or system commentary
- NO follow-up questions
- Write as a human security officer would
"""
for item in segments:
timeline_summary_prompt += f"\n {item}"
with open("/config/prompt.txt", "w") as f:
f.write(timeline_summary_prompt)
return self._send(timeline_summary_prompt, [])
def generate_object_description(
self,
camera_config: CameraConfig,