This commit is contained in:
gwmullin 2026-06-21 11:55:16 -04:00 committed by GitHub
commit 6c6a1a5368
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 191 additions and 117 deletions

View File

@ -389,41 +389,85 @@ def events_explore(
limit: int = 10, limit: int = 10,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter), allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
): ):
# get distinct labels for all events if not allowed_cameras:
distinct_labels = ( return JSONResponse(content=[])
Event.select(Event.label)
.where(Event.camera << allowed_cameras) explore_columns = (
.distinct() Event.id,
.order_by(Event.label) Event.camera,
Event.label,
Event.sub_label,
Event.zones,
Event.start_time,
Event.end_time,
Event.has_clip,
Event.has_snapshot,
Event.plus_id,
Event.retain_indefinitely,
Event.top_score,
Event.false_positive,
Event.box,
Event.data,
) )
label_counts = {} # Single query: per-label COUNT and top-N ranking by start_time computed
# via window functions in a CTE, then filtered to rn <= limit
def event_generator(): event_count = (
for label_obj in distinct_labels.iterator(): fn.COUNT(Event.id).over(partition_by=[Event.label]).alias("event_count")
label = label_obj.label )
rn = (
# get most recent events for this label fn.ROW_NUMBER()
label_events = ( .over(partition_by=[Event.label], order_by=[Event.start_time.desc()])
Event.select() .alias("rn")
.where((Event.label == label) & (Event.camera << allowed_cameras))
.order_by(Event.start_time.desc())
.limit(limit)
.iterator()
) )
# count total events for this label base_query = Event.select(
label_counts[label] = ( *explore_columns,
Event.select() event_count,
.where((Event.label == label) & (Event.camera << allowed_cameras)) rn,
.count() ).where(Event.camera << allowed_cameras)
ranked = base_query.cte("ranked")
query = (
Event.select(
ranked.c.id,
ranked.c.camera,
ranked.c.label,
ranked.c.sub_label,
ranked.c.zones,
ranked.c.start_time,
ranked.c.end_time,
ranked.c.has_clip,
ranked.c.has_snapshot,
ranked.c.plus_id,
ranked.c.retain_indefinitely,
ranked.c.top_score,
ranked.c.false_positive,
ranked.c.box,
ranked.c.data,
ranked.c.event_count,
)
.from_(ranked)
.with_cte(ranked)
.where(ranked.c.rn <= limit)
.order_by(ranked.c.event_count.desc(), ranked.c.start_time.desc())
.objects()
) )
yield from label_events allowed_data_keys = {
"type",
"score",
"top_score",
"description",
"sub_label_score",
"average_estimated_speed",
"velocity_angle",
"path_data",
"recognized_license_plate",
"recognized_license_plate_score",
}
def process_events(): processed_events = [
for event in event_generator(): {
processed_event = {
"id": event.id, "id": event.id,
"camera": event.camera, "camera": event.camera,
"label": event.label, "label": event.label,
@ -439,32 +483,12 @@ def events_explore(
"false_positive": event.false_positive, "false_positive": event.false_positive,
"box": event.box, "box": event.box,
"data": { "data": {
k: v k: v for k, v in (event.data or {}).items() if k in allowed_data_keys
for k, v in event.data.items()
if k
in [
"type",
"score",
"top_score",
"description",
"sub_label_score",
"average_estimated_speed",
"velocity_angle",
"path_data",
"recognized_license_plate",
"recognized_license_plate_score",
]
}, },
"event_count": label_counts[event.label], "event_count": event.event_count,
} }
yield processed_event for event in query
]
# convert iterator to list and sort
processed_events = sorted(
process_events(),
key=lambda x: (x["event_count"], x["start_time"]),
reverse=True,
)
return JSONResponse(content=processed_events) return JSONResponse(content=processed_events)
@ -487,22 +511,18 @@ async def event_ids(ids: str, request: Request):
status_code=400, status_code=400,
) )
for event_id in ids:
try: try:
event = Event.get(Event.id == event_id) events = list(Event.select().where(Event.id << ids).dicts().iterator())
await require_camera_access(event.camera, request=request)
except DoesNotExist:
# we should not fail the entire request if an event is not found
continue
try:
events = Event.select().where(Event.id << ids).dicts().iterator()
return JSONResponse(list(events))
except Exception: except Exception:
return JSONResponse( return JSONResponse(
content=({"success": False, "message": "Events not found"}), status_code=400 content=({"success": False, "message": "Events not found"}), status_code=400
) )
for event in events:
await require_camera_access(event["camera"], request=request)
return JSONResponse(events)
@router.get( @router.get(
"/events/search", "/events/search",

View File

@ -10,7 +10,7 @@ import pandas as pd
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
from fastapi.params import Depends from fastapi.params import Depends
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from peewee import Case, DoesNotExist, IntegrityError, fn, operator from peewee import Case, DoesNotExist, fn, operator
from playhouse.shortcuts import model_to_dict from playhouse.shortcuts import model_to_dict
from frigate.api.auth import ( from frigate.api.auth import (
@ -172,11 +172,19 @@ async def review_ids(request: Request, ids: str):
status_code=400, status_code=400,
) )
for review_id in ids:
try: try:
review = ReviewSegment.get(ReviewSegment.id == review_id) reviews = list(
await require_camera_access(review.camera, request=request) ReviewSegment.select().where(ReviewSegment.id << ids).dicts().iterator()
except DoesNotExist: )
except Exception:
return JSONResponse(
content=({"success": False, "message": "Review segments not found"}),
status_code=400,
)
found_ids = {r["id"] for r in reviews}
for review_id in ids:
if review_id not in found_ids:
return JSONResponse( return JSONResponse(
content=( content=(
{"success": False, "message": f"Review {review_id} not found"} {"success": False, "message": f"Review {review_id} not found"}
@ -184,16 +192,10 @@ async def review_ids(request: Request, ids: str):
status_code=404, status_code=404,
) )
try: for review in reviews:
reviews = ( await require_camera_access(review["camera"], request=request)
ReviewSegment.select().where(ReviewSegment.id << ids).dicts().iterator()
) return JSONResponse(reviews)
return JSONResponse(list(reviews))
except Exception:
return JSONResponse(
content=({"success": False, "message": "Review segments not found"}),
status_code=400,
)
@router.get( @router.get(
@ -490,27 +492,52 @@ async def set_multiple_reviewed(
user_id = current_user["username"] user_id = current_user["username"]
for review_id in body.ids: reviews = list(
try: ReviewSegment.select(ReviewSegment.id, ReviewSegment.camera).where(
review = ReviewSegment.get(ReviewSegment.id == review_id) ReviewSegment.id << body.ids
)
)
for review in reviews:
await require_camera_access(review.camera, request=request) await require_camera_access(review.camera, request=request)
review_status = UserReviewStatus.get(
UserReviewStatus.user_id == user_id, found_ids = [r.id for r in reviews]
UserReviewStatus.review_segment == review_id,
if found_ids:
existing_statuses = list(
UserReviewStatus.select().where(
(UserReviewStatus.user_id == user_id)
& (UserReviewStatus.review_segment << found_ids)
) )
# Update based on the reviewed parameter
if review_status.has_been_reviewed != body.reviewed:
review_status.has_been_reviewed = body.reviewed
review_status.save()
except DoesNotExist:
try:
UserReviewStatus.create(
user_id=user_id,
review_segment=ReviewSegment.get(id=review_id),
has_been_reviewed=body.reviewed,
) )
except (DoesNotExist, IntegrityError):
pass status_by_review = {s.review_segment_id: s for s in existing_statuses}
to_update = []
to_create = []
for review_id in found_ids:
if review_id in status_by_review:
status = status_by_review[review_id]
if status.has_been_reviewed != body.reviewed:
status.has_been_reviewed = body.reviewed
to_update.append(status)
else:
to_create.append(
{
"user_id": user_id,
"review_segment_id": review_id,
"has_been_reviewed": body.reviewed,
}
)
if to_update:
UserReviewStatus.bulk_update(
to_update, fields=[UserReviewStatus.has_been_reviewed], batch_size=100
)
if to_create:
UserReviewStatus.insert_many(to_create).execute()
return JSONResponse( return JSONResponse(
content=( content=(

View File

@ -0,0 +1,27 @@
"""Peewee migrations -- 036_add_perf_indexes.py.
Adds composite/single-column indexes to speed up the most common queries
issued by the web UI on initial page load:
- event(camera, start_time DESC): /events list filtered by camera + time range
- reviewsegment(camera, start_time DESC): /api/review filtered by camera + time range
- reviewsegment(end_time): supports the end_time > after half of /api/review's range
The existing event(label, start_time DESC) index from migration 027 already
covers /events/explore, so it is intentionally not duplicated here.
"""
import peewee as pw
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.sql(
'CREATE INDEX IF NOT EXISTS "event_camera_start_time" '
'ON "event" ("camera", "start_time" DESC)'
)
def rollback(migrator, database, fake=False, **kwargs):
migrator.sql('DROP INDEX IF EXISTS "event_camera_start_time"')