Compare commits

...

11 Commits

Author SHA1 Message Date
gwmullin
6c6a1a5368
Merge c575fb223b into d982b3a782 2026-06-21 11:55:16 -04:00
Daniel
d982b3a782
perf(util): use monotonic clock and bounded deque in EventsPerSecond (#23520)
Some checks failed
CI / AMD64 Build (push) Has been cancelled
CI / ARM Build (push) Has been cancelled
CI / Jetson Jetpack 6 (push) Has been cancelled
CI / AMD64 Extra Build (push) Has been cancelled
CI / ARM Extra Build (push) Has been cancelled
CI / Synaptics Build (push) Has been cancelled
CI / Assemble and push default build (push) Has been cancelled
* perf(util): use monotonic clock and bounded deque in EventsPerSecond

EventsPerSecond is updated on every captured frame, every detection and
every processed frame across all cameras and detectors. The previous
implementation derived timestamps from datetime.now().timestamp() (wall
clock), so an NTP or manual clock adjustment could skew the rolling-window
expiry; it also stored timestamps in a list and expired them with
del self._timestamps[0] (O(n) per removal) plus a periodic slice-copy to
cap growth.

Switch to time.monotonic() for the interval math (correct by construction
and immune to wall-clock jumps) and a collections.deque(maxlen=...) so
expiry is O(1) (popleft) and retention is bounded automatically. This
mirrors the deque-based expiry already used in video/ffmpeg.py and
watchdog.py. Observable output is unchanged.

Adds frigate/test/test_builtin.py covering rate calculation, window
expiry and the memory bound.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* test: drop test_timestamps_are_memory_bounded

It only asserted that deque(maxlen=) caps length, which is stdlib behavior
rather than something this change needs to verify.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 07:38:41 -06:00
Josh Hawkins
d036061e3f
cache the preview_frames directory listing so concurrent per-camera frame requests share one scan instead of each re-listing the whole directory (#23526)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
2026-06-20 14:56:05 -05:00
Greg
c575fb223b Editor fail, re-ruff format. 2026-05-18 14:18:17 -07:00
Greg
9fa345f192 Remove 2x unnecessary index on reviewsegment, remove reference to prior code implementation in comment in event.py 2026-05-18 14:16:43 -07:00
Gdub
7b55c4b758 Rerun ruff formatting. 2026-05-18 13:39:12 -07:00
Gdub
570e2e3f76 Slightly simplify review logic and avoid duplicating the json response for empty review IDs. 2026-05-18 13:30:46 -07:00
Greg
39fba9b0a7 Use peewee instead of rw sql for the CTE query. 2026-05-11 16:46:43 -07:00
Greg
328a26b169 Collapse a few sequential queries into a single one. 2026-05-11 15:45:35 -07:00
Greg
311fb1bd19 Rewrite to use a CTE to leverage speedups by using sqllite internal optimization to do a single query instead of a starter query to get distinct labels and a subsequent loop of querys per distinct event labels.
Frigate is currently shipping sqlite 3.46.1, which is above the minimum version 3.25 needed for CTEs.
2026-05-08 16:18:37 -07:00
Greg
48b1426891 Add additional indicies on event and review tables. Every events or timeline endpoint filters on event start time and camera, this should speed things up by avoiding a range scan on the table. 2026-05-08 15:59:23 -07:00
6 changed files with 275 additions and 142 deletions

View File

@ -389,82 +389,106 @@ def events_explore(
limit: int = 10,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
):
# get distinct labels for all events
distinct_labels = (
Event.select(Event.label)
.where(Event.camera << allowed_cameras)
.distinct()
.order_by(Event.label)
if not allowed_cameras:
return JSONResponse(content=[])
explore_columns = (
Event.id,
Event.camera,
Event.label,
Event.sub_label,
Event.zones,
Event.start_time,
Event.end_time,
Event.has_clip,
Event.has_snapshot,
Event.plus_id,
Event.retain_indefinitely,
Event.top_score,
Event.false_positive,
Event.box,
Event.data,
)
label_counts = {}
def event_generator():
for label_obj in distinct_labels.iterator():
label = label_obj.label
# get most recent events for this label
label_events = (
Event.select()
.where((Event.label == label) & (Event.camera << allowed_cameras))
.order_by(Event.start_time.desc())
.limit(limit)
.iterator()
)
# count total events for this label
label_counts[label] = (
Event.select()
.where((Event.label == label) & (Event.camera << allowed_cameras))
.count()
)
yield from label_events
def process_events():
for event in event_generator():
processed_event = {
"id": event.id,
"camera": event.camera,
"label": event.label,
"zones": event.zones,
"start_time": event.start_time,
"end_time": event.end_time,
"has_clip": event.has_clip,
"has_snapshot": event.has_snapshot,
"plus_id": event.plus_id,
"retain_indefinitely": event.retain_indefinitely,
"sub_label": event.sub_label,
"top_score": event.top_score,
"false_positive": event.false_positive,
"box": event.box,
"data": {
k: v
for k, v in event.data.items()
if k
in [
"type",
"score",
"top_score",
"description",
"sub_label_score",
"average_estimated_speed",
"velocity_angle",
"path_data",
"recognized_license_plate",
"recognized_license_plate_score",
]
},
"event_count": label_counts[event.label],
}
yield processed_event
# convert iterator to list and sort
processed_events = sorted(
process_events(),
key=lambda x: (x["event_count"], x["start_time"]),
reverse=True,
# Single query: per-label COUNT and top-N ranking by start_time computed
# via window functions in a CTE, then filtered to rn <= limit
event_count = (
fn.COUNT(Event.id).over(partition_by=[Event.label]).alias("event_count")
)
rn = (
fn.ROW_NUMBER()
.over(partition_by=[Event.label], order_by=[Event.start_time.desc()])
.alias("rn")
)
base_query = Event.select(
*explore_columns,
event_count,
rn,
).where(Event.camera << allowed_cameras)
ranked = base_query.cte("ranked")
query = (
Event.select(
ranked.c.id,
ranked.c.camera,
ranked.c.label,
ranked.c.sub_label,
ranked.c.zones,
ranked.c.start_time,
ranked.c.end_time,
ranked.c.has_clip,
ranked.c.has_snapshot,
ranked.c.plus_id,
ranked.c.retain_indefinitely,
ranked.c.top_score,
ranked.c.false_positive,
ranked.c.box,
ranked.c.data,
ranked.c.event_count,
)
.from_(ranked)
.with_cte(ranked)
.where(ranked.c.rn <= limit)
.order_by(ranked.c.event_count.desc(), ranked.c.start_time.desc())
.objects()
)
allowed_data_keys = {
"type",
"score",
"top_score",
"description",
"sub_label_score",
"average_estimated_speed",
"velocity_angle",
"path_data",
"recognized_license_plate",
"recognized_license_plate_score",
}
processed_events = [
{
"id": event.id,
"camera": event.camera,
"label": event.label,
"zones": event.zones,
"start_time": event.start_time,
"end_time": event.end_time,
"has_clip": event.has_clip,
"has_snapshot": event.has_snapshot,
"plus_id": event.plus_id,
"retain_indefinitely": event.retain_indefinitely,
"sub_label": event.sub_label,
"top_score": event.top_score,
"false_positive": event.false_positive,
"box": event.box,
"data": {
k: v for k, v in (event.data or {}).items() if k in allowed_data_keys
},
"event_count": event.event_count,
}
for event in query
]
return JSONResponse(content=processed_events)
@ -487,22 +511,18 @@ async def event_ids(ids: str, request: Request):
status_code=400,
)
for event_id in ids:
try:
event = Event.get(Event.id == event_id)
await require_camera_access(event.camera, request=request)
except DoesNotExist:
# we should not fail the entire request if an event is not found
continue
try:
events = Event.select().where(Event.id << ids).dicts().iterator()
return JSONResponse(list(events))
events = list(Event.select().where(Event.id << ids).dicts().iterator())
except Exception:
return JSONResponse(
content=({"success": False, "message": "Events not found"}), status_code=400
)
for event in events:
await require_camera_access(event["camera"], request=request)
return JSONResponse(events)
@router.get(
"/events/search",

View File

@ -1,7 +1,9 @@
"""Preview apis."""
import bisect
import logging
import os
import threading
from datetime import datetime, timedelta, timezone
import pytz
@ -133,6 +135,32 @@ def preview_hour(
return preview_ts(camera_name, start_ts, end_ts, allowed_cameras)
# cache one sorted listing of the shared preview_frames dir
_preview_listing_lock = threading.Lock()
_preview_listing_cache: tuple[float, list[str]] = (-1.0, [])
def _get_preview_frame_listing(preview_dir: str) -> list[str]:
"""Return the sorted preview_frames listing, cached until the dir changes."""
global _preview_listing_cache
# mtime bumps when a frame is added or removed, invalidating the cache
mtime = os.stat(preview_dir).st_mtime
cached_mtime, files = _preview_listing_cache
if mtime == cached_mtime:
return files
with _preview_listing_lock:
# another thread may have refreshed the cache while we waited
cached_mtime, files = _preview_listing_cache
if mtime == cached_mtime:
return files
files = sorted(entry.name for entry in os.scandir(preview_dir))
_preview_listing_cache = (mtime, files)
return files
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}/frames",
response_model=PreviewFramesResponse,
@ -149,23 +177,15 @@ def get_preview_frames_from_cache(camera_name: str, start_ts: float, end_ts: flo
start_file = f"{file_start}{start_ts}.{PREVIEW_FRAME_TYPE}"
end_file = f"{file_start}{end_ts}.{PREVIEW_FRAME_TYPE}"
camera_files = [
entry.name
for entry in os.scandir(preview_dir)
if entry.name.startswith(file_start)
files = _get_preview_frame_listing(preview_dir)
# a camera's frames form a contiguous slice of the sorted listing;
# bisect locates it without scanning the whole directory
left = bisect.bisect_left(files, start_file)
right = bisect.bisect_right(files, end_file)
selected_previews = [
file for file in files[left:right] if file.startswith(file_start)
]
camera_files.sort()
selected_previews = []
for file in camera_files:
if file < start_file:
continue
if file > end_file:
break
selected_previews.append(file)
return JSONResponse(
content=selected_previews,

View File

@ -10,7 +10,7 @@ import pandas as pd
from fastapi import APIRouter, Request
from fastapi.params import Depends
from fastapi.responses import JSONResponse
from peewee import Case, DoesNotExist, IntegrityError, fn, operator
from peewee import Case, DoesNotExist, fn, operator
from playhouse.shortcuts import model_to_dict
from frigate.api.auth import (
@ -172,11 +172,19 @@ async def review_ids(request: Request, ids: str):
status_code=400,
)
try:
reviews = list(
ReviewSegment.select().where(ReviewSegment.id << ids).dicts().iterator()
)
except Exception:
return JSONResponse(
content=({"success": False, "message": "Review segments not found"}),
status_code=400,
)
found_ids = {r["id"] for r in reviews}
for review_id in ids:
try:
review = ReviewSegment.get(ReviewSegment.id == review_id)
await require_camera_access(review.camera, request=request)
except DoesNotExist:
if review_id not in found_ids:
return JSONResponse(
content=(
{"success": False, "message": f"Review {review_id} not found"}
@ -184,16 +192,10 @@ async def review_ids(request: Request, ids: str):
status_code=404,
)
try:
reviews = (
ReviewSegment.select().where(ReviewSegment.id << ids).dicts().iterator()
)
return JSONResponse(list(reviews))
except Exception:
return JSONResponse(
content=({"success": False, "message": "Review segments not found"}),
status_code=400,
)
for review in reviews:
await require_camera_access(review["camera"], request=request)
return JSONResponse(reviews)
@router.get(
@ -490,27 +492,52 @@ async def set_multiple_reviewed(
user_id = current_user["username"]
for review_id in body.ids:
try:
review = ReviewSegment.get(ReviewSegment.id == review_id)
await require_camera_access(review.camera, request=request)
review_status = UserReviewStatus.get(
UserReviewStatus.user_id == user_id,
UserReviewStatus.review_segment == review_id,
reviews = list(
ReviewSegment.select(ReviewSegment.id, ReviewSegment.camera).where(
ReviewSegment.id << body.ids
)
)
for review in reviews:
await require_camera_access(review.camera, request=request)
found_ids = [r.id for r in reviews]
if found_ids:
existing_statuses = list(
UserReviewStatus.select().where(
(UserReviewStatus.user_id == user_id)
& (UserReviewStatus.review_segment << found_ids)
)
# Update based on the reviewed parameter
if review_status.has_been_reviewed != body.reviewed:
review_status.has_been_reviewed = body.reviewed
review_status.save()
except DoesNotExist:
try:
UserReviewStatus.create(
user_id=user_id,
review_segment=ReviewSegment.get(id=review_id),
has_been_reviewed=body.reviewed,
)
status_by_review = {s.review_segment_id: s for s in existing_statuses}
to_update = []
to_create = []
for review_id in found_ids:
if review_id in status_by_review:
status = status_by_review[review_id]
if status.has_been_reviewed != body.reviewed:
status.has_been_reviewed = body.reviewed
to_update.append(status)
else:
to_create.append(
{
"user_id": user_id,
"review_segment_id": review_id,
"has_been_reviewed": body.reviewed,
}
)
except (DoesNotExist, IntegrityError):
pass
if to_update:
UserReviewStatus.bulk_update(
to_update, fields=[UserReviewStatus.has_been_reviewed], batch_size=100
)
if to_create:
UserReviewStatus.insert_many(to_create).execute()
return JSONResponse(
content=(

View File

@ -0,0 +1,41 @@
"""Tests for frigate.util.builtin helpers."""
import unittest
from unittest.mock import patch
from frigate.util.builtin import EventsPerSecond
class TestEventsPerSecond(unittest.TestCase):
def test_eps_is_zero_before_any_events(self) -> None:
eps = EventsPerSecond()
with patch("frigate.util.builtin.time.monotonic", return_value=100.0):
self.assertEqual(eps.eps(), 0.0)
def test_eps_counts_events_in_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [1000.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
# one event per second for five seconds
for _ in range(5):
clock[0] += 1.0
eps.update()
# five events over the five seconds since start
self.assertAlmostEqual(eps.eps(), 1.0)
def test_old_timestamps_expire_from_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [0.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
for _ in range(10):
clock[0] += 1.0
eps.update()
# jump well past the window so every timestamp ages out
clock[0] += 100.0
self.assertEqual(eps.eps(), 0.0)
if __name__ == "__main__":
unittest.main()

View File

@ -2,7 +2,6 @@
import ast
import copy
import datetime
import logging
import math
import multiprocessing.queues
@ -10,7 +9,9 @@ import queue
import re
import shlex
import struct
import time
import urllib.parse
from collections import deque
from collections.abc import Mapping
from multiprocessing.managers import ValueProxy
from pathlib import Path
@ -32,23 +33,20 @@ class EventsPerSecond:
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
self._timestamps: deque[float] = deque(maxlen=max_events)
def start(self) -> None:
self._start = datetime.datetime.now().timestamp()
self._start = time.monotonic()
def update(self) -> None:
now = datetime.datetime.now().timestamp()
now = time.monotonic()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self) -> float:
now = datetime.datetime.now().timestamp()
now = time.monotonic()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
@ -63,7 +61,7 @@ class EventsPerSecond:
def expire_timestamps(self, now: float) -> None:
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
self._timestamps.popleft()
class InferenceSpeed:

View File

@ -0,0 +1,27 @@
"""Peewee migrations -- 036_add_perf_indexes.py.
Adds composite/single-column indexes to speed up the most common queries
issued by the web UI on initial page load:
- event(camera, start_time DESC): /events list filtered by camera + time range
- reviewsegment(camera, start_time DESC): /api/review filtered by camera + time range
- reviewsegment(end_time): supports the end_time > after half of /api/review's range
The existing event(label, start_time DESC) index from migration 027 already
covers /events/explore, so it is intentionally not duplicated here.
"""
import peewee as pw
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.sql(
'CREATE INDEX IF NOT EXISTS "event_camera_start_time" '
'ON "event" ("camera", "start_time" DESC)'
)
def rollback(migrator, database, fake=False, **kwargs):
migrator.sql('DROP INDEX IF EXISTS "event_camera_start_time"')