mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-07-02 10:01:15 +03:00
Compare commits
11 Commits
103d39c66e
...
6c6a1a5368
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c6a1a5368 | ||
|
|
d982b3a782 | ||
|
|
d036061e3f | ||
|
|
c575fb223b | ||
|
|
9fa345f192 | ||
|
|
7b55c4b758 | ||
|
|
570e2e3f76 | ||
|
|
39fba9b0a7 | ||
|
|
328a26b169 | ||
|
|
311fb1bd19 | ||
|
|
48b1426891 |
@ -389,82 +389,106 @@ def events_explore(
|
||||
limit: int = 10,
|
||||
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
|
||||
):
|
||||
# get distinct labels for all events
|
||||
distinct_labels = (
|
||||
Event.select(Event.label)
|
||||
.where(Event.camera << allowed_cameras)
|
||||
.distinct()
|
||||
.order_by(Event.label)
|
||||
if not allowed_cameras:
|
||||
return JSONResponse(content=[])
|
||||
|
||||
explore_columns = (
|
||||
Event.id,
|
||||
Event.camera,
|
||||
Event.label,
|
||||
Event.sub_label,
|
||||
Event.zones,
|
||||
Event.start_time,
|
||||
Event.end_time,
|
||||
Event.has_clip,
|
||||
Event.has_snapshot,
|
||||
Event.plus_id,
|
||||
Event.retain_indefinitely,
|
||||
Event.top_score,
|
||||
Event.false_positive,
|
||||
Event.box,
|
||||
Event.data,
|
||||
)
|
||||
|
||||
label_counts = {}
|
||||
|
||||
def event_generator():
|
||||
for label_obj in distinct_labels.iterator():
|
||||
label = label_obj.label
|
||||
|
||||
# get most recent events for this label
|
||||
label_events = (
|
||||
Event.select()
|
||||
.where((Event.label == label) & (Event.camera << allowed_cameras))
|
||||
.order_by(Event.start_time.desc())
|
||||
.limit(limit)
|
||||
.iterator()
|
||||
)
|
||||
|
||||
# count total events for this label
|
||||
label_counts[label] = (
|
||||
Event.select()
|
||||
.where((Event.label == label) & (Event.camera << allowed_cameras))
|
||||
.count()
|
||||
)
|
||||
|
||||
yield from label_events
|
||||
|
||||
def process_events():
|
||||
for event in event_generator():
|
||||
processed_event = {
|
||||
"id": event.id,
|
||||
"camera": event.camera,
|
||||
"label": event.label,
|
||||
"zones": event.zones,
|
||||
"start_time": event.start_time,
|
||||
"end_time": event.end_time,
|
||||
"has_clip": event.has_clip,
|
||||
"has_snapshot": event.has_snapshot,
|
||||
"plus_id": event.plus_id,
|
||||
"retain_indefinitely": event.retain_indefinitely,
|
||||
"sub_label": event.sub_label,
|
||||
"top_score": event.top_score,
|
||||
"false_positive": event.false_positive,
|
||||
"box": event.box,
|
||||
"data": {
|
||||
k: v
|
||||
for k, v in event.data.items()
|
||||
if k
|
||||
in [
|
||||
"type",
|
||||
"score",
|
||||
"top_score",
|
||||
"description",
|
||||
"sub_label_score",
|
||||
"average_estimated_speed",
|
||||
"velocity_angle",
|
||||
"path_data",
|
||||
"recognized_license_plate",
|
||||
"recognized_license_plate_score",
|
||||
]
|
||||
},
|
||||
"event_count": label_counts[event.label],
|
||||
}
|
||||
yield processed_event
|
||||
|
||||
# convert iterator to list and sort
|
||||
processed_events = sorted(
|
||||
process_events(),
|
||||
key=lambda x: (x["event_count"], x["start_time"]),
|
||||
reverse=True,
|
||||
# Single query: per-label COUNT and top-N ranking by start_time computed
|
||||
# via window functions in a CTE, then filtered to rn <= limit
|
||||
event_count = (
|
||||
fn.COUNT(Event.id).over(partition_by=[Event.label]).alias("event_count")
|
||||
)
|
||||
rn = (
|
||||
fn.ROW_NUMBER()
|
||||
.over(partition_by=[Event.label], order_by=[Event.start_time.desc()])
|
||||
.alias("rn")
|
||||
)
|
||||
|
||||
base_query = Event.select(
|
||||
*explore_columns,
|
||||
event_count,
|
||||
rn,
|
||||
).where(Event.camera << allowed_cameras)
|
||||
ranked = base_query.cte("ranked")
|
||||
query = (
|
||||
Event.select(
|
||||
ranked.c.id,
|
||||
ranked.c.camera,
|
||||
ranked.c.label,
|
||||
ranked.c.sub_label,
|
||||
ranked.c.zones,
|
||||
ranked.c.start_time,
|
||||
ranked.c.end_time,
|
||||
ranked.c.has_clip,
|
||||
ranked.c.has_snapshot,
|
||||
ranked.c.plus_id,
|
||||
ranked.c.retain_indefinitely,
|
||||
ranked.c.top_score,
|
||||
ranked.c.false_positive,
|
||||
ranked.c.box,
|
||||
ranked.c.data,
|
||||
ranked.c.event_count,
|
||||
)
|
||||
.from_(ranked)
|
||||
.with_cte(ranked)
|
||||
.where(ranked.c.rn <= limit)
|
||||
.order_by(ranked.c.event_count.desc(), ranked.c.start_time.desc())
|
||||
.objects()
|
||||
)
|
||||
|
||||
allowed_data_keys = {
|
||||
"type",
|
||||
"score",
|
||||
"top_score",
|
||||
"description",
|
||||
"sub_label_score",
|
||||
"average_estimated_speed",
|
||||
"velocity_angle",
|
||||
"path_data",
|
||||
"recognized_license_plate",
|
||||
"recognized_license_plate_score",
|
||||
}
|
||||
|
||||
processed_events = [
|
||||
{
|
||||
"id": event.id,
|
||||
"camera": event.camera,
|
||||
"label": event.label,
|
||||
"zones": event.zones,
|
||||
"start_time": event.start_time,
|
||||
"end_time": event.end_time,
|
||||
"has_clip": event.has_clip,
|
||||
"has_snapshot": event.has_snapshot,
|
||||
"plus_id": event.plus_id,
|
||||
"retain_indefinitely": event.retain_indefinitely,
|
||||
"sub_label": event.sub_label,
|
||||
"top_score": event.top_score,
|
||||
"false_positive": event.false_positive,
|
||||
"box": event.box,
|
||||
"data": {
|
||||
k: v for k, v in (event.data or {}).items() if k in allowed_data_keys
|
||||
},
|
||||
"event_count": event.event_count,
|
||||
}
|
||||
for event in query
|
||||
]
|
||||
|
||||
return JSONResponse(content=processed_events)
|
||||
|
||||
@ -487,22 +511,18 @@ async def event_ids(ids: str, request: Request):
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
for event_id in ids:
|
||||
try:
|
||||
event = Event.get(Event.id == event_id)
|
||||
await require_camera_access(event.camera, request=request)
|
||||
except DoesNotExist:
|
||||
# we should not fail the entire request if an event is not found
|
||||
continue
|
||||
|
||||
try:
|
||||
events = Event.select().where(Event.id << ids).dicts().iterator()
|
||||
return JSONResponse(list(events))
|
||||
events = list(Event.select().where(Event.id << ids).dicts().iterator())
|
||||
except Exception:
|
||||
return JSONResponse(
|
||||
content=({"success": False, "message": "Events not found"}), status_code=400
|
||||
)
|
||||
|
||||
for event in events:
|
||||
await require_camera_access(event["camera"], request=request)
|
||||
|
||||
return JSONResponse(events)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/events/search",
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
"""Preview apis."""
|
||||
|
||||
import bisect
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytz
|
||||
@ -133,6 +135,32 @@ def preview_hour(
|
||||
return preview_ts(camera_name, start_ts, end_ts, allowed_cameras)
|
||||
|
||||
|
||||
# cache one sorted listing of the shared preview_frames dir
|
||||
_preview_listing_lock = threading.Lock()
|
||||
_preview_listing_cache: tuple[float, list[str]] = (-1.0, [])
|
||||
|
||||
|
||||
def _get_preview_frame_listing(preview_dir: str) -> list[str]:
|
||||
"""Return the sorted preview_frames listing, cached until the dir changes."""
|
||||
global _preview_listing_cache
|
||||
|
||||
# mtime bumps when a frame is added or removed, invalidating the cache
|
||||
mtime = os.stat(preview_dir).st_mtime
|
||||
cached_mtime, files = _preview_listing_cache
|
||||
if mtime == cached_mtime:
|
||||
return files
|
||||
|
||||
with _preview_listing_lock:
|
||||
# another thread may have refreshed the cache while we waited
|
||||
cached_mtime, files = _preview_listing_cache
|
||||
if mtime == cached_mtime:
|
||||
return files
|
||||
|
||||
files = sorted(entry.name for entry in os.scandir(preview_dir))
|
||||
_preview_listing_cache = (mtime, files)
|
||||
return files
|
||||
|
||||
|
||||
@router.get(
|
||||
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}/frames",
|
||||
response_model=PreviewFramesResponse,
|
||||
@ -149,23 +177,15 @@ def get_preview_frames_from_cache(camera_name: str, start_ts: float, end_ts: flo
|
||||
start_file = f"{file_start}{start_ts}.{PREVIEW_FRAME_TYPE}"
|
||||
end_file = f"{file_start}{end_ts}.{PREVIEW_FRAME_TYPE}"
|
||||
|
||||
camera_files = [
|
||||
entry.name
|
||||
for entry in os.scandir(preview_dir)
|
||||
if entry.name.startswith(file_start)
|
||||
files = _get_preview_frame_listing(preview_dir)
|
||||
|
||||
# a camera's frames form a contiguous slice of the sorted listing;
|
||||
# bisect locates it without scanning the whole directory
|
||||
left = bisect.bisect_left(files, start_file)
|
||||
right = bisect.bisect_right(files, end_file)
|
||||
selected_previews = [
|
||||
file for file in files[left:right] if file.startswith(file_start)
|
||||
]
|
||||
camera_files.sort()
|
||||
|
||||
selected_previews = []
|
||||
|
||||
for file in camera_files:
|
||||
if file < start_file:
|
||||
continue
|
||||
|
||||
if file > end_file:
|
||||
break
|
||||
|
||||
selected_previews.append(file)
|
||||
|
||||
return JSONResponse(
|
||||
content=selected_previews,
|
||||
|
||||
@ -10,7 +10,7 @@ import pandas as pd
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.params import Depends
|
||||
from fastapi.responses import JSONResponse
|
||||
from peewee import Case, DoesNotExist, IntegrityError, fn, operator
|
||||
from peewee import Case, DoesNotExist, fn, operator
|
||||
from playhouse.shortcuts import model_to_dict
|
||||
|
||||
from frigate.api.auth import (
|
||||
@ -172,11 +172,19 @@ async def review_ids(request: Request, ids: str):
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
try:
|
||||
reviews = list(
|
||||
ReviewSegment.select().where(ReviewSegment.id << ids).dicts().iterator()
|
||||
)
|
||||
except Exception:
|
||||
return JSONResponse(
|
||||
content=({"success": False, "message": "Review segments not found"}),
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
found_ids = {r["id"] for r in reviews}
|
||||
for review_id in ids:
|
||||
try:
|
||||
review = ReviewSegment.get(ReviewSegment.id == review_id)
|
||||
await require_camera_access(review.camera, request=request)
|
||||
except DoesNotExist:
|
||||
if review_id not in found_ids:
|
||||
return JSONResponse(
|
||||
content=(
|
||||
{"success": False, "message": f"Review {review_id} not found"}
|
||||
@ -184,16 +192,10 @@ async def review_ids(request: Request, ids: str):
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
try:
|
||||
reviews = (
|
||||
ReviewSegment.select().where(ReviewSegment.id << ids).dicts().iterator()
|
||||
)
|
||||
return JSONResponse(list(reviews))
|
||||
except Exception:
|
||||
return JSONResponse(
|
||||
content=({"success": False, "message": "Review segments not found"}),
|
||||
status_code=400,
|
||||
)
|
||||
for review in reviews:
|
||||
await require_camera_access(review["camera"], request=request)
|
||||
|
||||
return JSONResponse(reviews)
|
||||
|
||||
|
||||
@router.get(
|
||||
@ -490,27 +492,52 @@ async def set_multiple_reviewed(
|
||||
|
||||
user_id = current_user["username"]
|
||||
|
||||
for review_id in body.ids:
|
||||
try:
|
||||
review = ReviewSegment.get(ReviewSegment.id == review_id)
|
||||
await require_camera_access(review.camera, request=request)
|
||||
review_status = UserReviewStatus.get(
|
||||
UserReviewStatus.user_id == user_id,
|
||||
UserReviewStatus.review_segment == review_id,
|
||||
reviews = list(
|
||||
ReviewSegment.select(ReviewSegment.id, ReviewSegment.camera).where(
|
||||
ReviewSegment.id << body.ids
|
||||
)
|
||||
)
|
||||
|
||||
for review in reviews:
|
||||
await require_camera_access(review.camera, request=request)
|
||||
|
||||
found_ids = [r.id for r in reviews]
|
||||
|
||||
if found_ids:
|
||||
existing_statuses = list(
|
||||
UserReviewStatus.select().where(
|
||||
(UserReviewStatus.user_id == user_id)
|
||||
& (UserReviewStatus.review_segment << found_ids)
|
||||
)
|
||||
# Update based on the reviewed parameter
|
||||
if review_status.has_been_reviewed != body.reviewed:
|
||||
review_status.has_been_reviewed = body.reviewed
|
||||
review_status.save()
|
||||
except DoesNotExist:
|
||||
try:
|
||||
UserReviewStatus.create(
|
||||
user_id=user_id,
|
||||
review_segment=ReviewSegment.get(id=review_id),
|
||||
has_been_reviewed=body.reviewed,
|
||||
)
|
||||
|
||||
status_by_review = {s.review_segment_id: s for s in existing_statuses}
|
||||
|
||||
to_update = []
|
||||
to_create = []
|
||||
|
||||
for review_id in found_ids:
|
||||
if review_id in status_by_review:
|
||||
status = status_by_review[review_id]
|
||||
if status.has_been_reviewed != body.reviewed:
|
||||
status.has_been_reviewed = body.reviewed
|
||||
to_update.append(status)
|
||||
else:
|
||||
to_create.append(
|
||||
{
|
||||
"user_id": user_id,
|
||||
"review_segment_id": review_id,
|
||||
"has_been_reviewed": body.reviewed,
|
||||
}
|
||||
)
|
||||
except (DoesNotExist, IntegrityError):
|
||||
pass
|
||||
|
||||
if to_update:
|
||||
UserReviewStatus.bulk_update(
|
||||
to_update, fields=[UserReviewStatus.has_been_reviewed], batch_size=100
|
||||
)
|
||||
|
||||
if to_create:
|
||||
UserReviewStatus.insert_many(to_create).execute()
|
||||
|
||||
return JSONResponse(
|
||||
content=(
|
||||
|
||||
41
frigate/test/test_builtin.py
Normal file
41
frigate/test/test_builtin.py
Normal file
@ -0,0 +1,41 @@
|
||||
"""Tests for frigate.util.builtin helpers."""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from frigate.util.builtin import EventsPerSecond
|
||||
|
||||
|
||||
class TestEventsPerSecond(unittest.TestCase):
|
||||
def test_eps_is_zero_before_any_events(self) -> None:
|
||||
eps = EventsPerSecond()
|
||||
with patch("frigate.util.builtin.time.monotonic", return_value=100.0):
|
||||
self.assertEqual(eps.eps(), 0.0)
|
||||
|
||||
def test_eps_counts_events_in_window(self) -> None:
|
||||
eps = EventsPerSecond(last_n_seconds=10)
|
||||
clock = [1000.0]
|
||||
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
|
||||
eps.start()
|
||||
# one event per second for five seconds
|
||||
for _ in range(5):
|
||||
clock[0] += 1.0
|
||||
eps.update()
|
||||
# five events over the five seconds since start
|
||||
self.assertAlmostEqual(eps.eps(), 1.0)
|
||||
|
||||
def test_old_timestamps_expire_from_window(self) -> None:
|
||||
eps = EventsPerSecond(last_n_seconds=10)
|
||||
clock = [0.0]
|
||||
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
|
||||
eps.start()
|
||||
for _ in range(10):
|
||||
clock[0] += 1.0
|
||||
eps.update()
|
||||
# jump well past the window so every timestamp ages out
|
||||
clock[0] += 100.0
|
||||
self.assertEqual(eps.eps(), 0.0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@ -2,7 +2,6 @@
|
||||
|
||||
import ast
|
||||
import copy
|
||||
import datetime
|
||||
import logging
|
||||
import math
|
||||
import multiprocessing.queues
|
||||
@ -10,7 +9,9 @@ import queue
|
||||
import re
|
||||
import shlex
|
||||
import struct
|
||||
import time
|
||||
import urllib.parse
|
||||
from collections import deque
|
||||
from collections.abc import Mapping
|
||||
from multiprocessing.managers import ValueProxy
|
||||
from pathlib import Path
|
||||
@ -32,23 +33,20 @@ class EventsPerSecond:
|
||||
self._start = None
|
||||
self._max_events = max_events
|
||||
self._last_n_seconds = last_n_seconds
|
||||
self._timestamps = []
|
||||
self._timestamps: deque[float] = deque(maxlen=max_events)
|
||||
|
||||
def start(self) -> None:
|
||||
self._start = datetime.datetime.now().timestamp()
|
||||
self._start = time.monotonic()
|
||||
|
||||
def update(self) -> None:
|
||||
now = datetime.datetime.now().timestamp()
|
||||
now = time.monotonic()
|
||||
if self._start is None:
|
||||
self._start = now
|
||||
self._timestamps.append(now)
|
||||
# truncate the list when it goes 100 over the max_size
|
||||
if len(self._timestamps) > self._max_events + 100:
|
||||
self._timestamps = self._timestamps[(1 - self._max_events) :]
|
||||
self.expire_timestamps(now)
|
||||
|
||||
def eps(self) -> float:
|
||||
now = datetime.datetime.now().timestamp()
|
||||
now = time.monotonic()
|
||||
if self._start is None:
|
||||
self._start = now
|
||||
# compute the (approximate) events in the last n seconds
|
||||
@ -63,7 +61,7 @@ class EventsPerSecond:
|
||||
def expire_timestamps(self, now: float) -> None:
|
||||
threshold = now - self._last_n_seconds
|
||||
while self._timestamps and self._timestamps[0] < threshold:
|
||||
del self._timestamps[0]
|
||||
self._timestamps.popleft()
|
||||
|
||||
|
||||
class InferenceSpeed:
|
||||
|
||||
27
migrations/036_add_perf_indexes.py
Normal file
27
migrations/036_add_perf_indexes.py
Normal file
@ -0,0 +1,27 @@
|
||||
"""Peewee migrations -- 036_add_perf_indexes.py.
|
||||
|
||||
Adds composite/single-column indexes to speed up the most common queries
|
||||
issued by the web UI on initial page load:
|
||||
|
||||
- event(camera, start_time DESC): /events list filtered by camera + time range
|
||||
- reviewsegment(camera, start_time DESC): /api/review filtered by camera + time range
|
||||
- reviewsegment(end_time): supports the end_time > after half of /api/review's range
|
||||
|
||||
The existing event(label, start_time DESC) index from migration 027 already
|
||||
covers /events/explore, so it is intentionally not duplicated here.
|
||||
"""
|
||||
|
||||
import peewee as pw
|
||||
|
||||
SQL = pw.SQL
|
||||
|
||||
|
||||
def migrate(migrator, database, fake=False, **kwargs):
|
||||
migrator.sql(
|
||||
'CREATE INDEX IF NOT EXISTS "event_camera_start_time" '
|
||||
'ON "event" ("camera", "start_time" DESC)'
|
||||
)
|
||||
|
||||
|
||||
def rollback(migrator, database, fake=False, **kwargs):
|
||||
migrator.sql('DROP INDEX IF EXISTS "event_camera_start_time"')
|
||||
Loading…
Reference in New Issue
Block a user