VOD overlap query optimization, caching, and refactor

- A1: Rewrite overlap query from OR-based predicate to canonical form
  (end_time > start AND start_time < end) which is index-friendly
- A2: Add composite (camera, start_time, end_time) and (camera, end_time)
  indexes via migration to support the new overlap predicate
- A3: Add bounded LRU in-process cache for historical VOD mapping
  payloads to avoid repeated DB + assembly work on re-opens
- A4: Extract vod_ts internals into query/normalize/payload helpers
  in frigate/api/vod.py for benchmarking and future swapability
- Also update the recording_clip download path to use the same
  canonical overlap predicate

https://claude.ai/code/session_01XeVxvSk9ywyPBR288ZZXWE
This commit is contained in:
Claude 2026-03-07 01:26:20 +00:00
parent c2e667c0dd
commit 89adfe19e0
No known key found for this signature in database
3 changed files with 236 additions and 82 deletions

View File

@ -39,9 +39,14 @@ from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
INSTALL_DIR,
MAX_SEGMENT_DURATION,
PREVIEW_FRAME_TYPE,
)
from frigate.api.vod import (
_build_vod_mapping_payload,
_get_recordings_for_vod,
_normalize_recordings_to_vod_clips,
vod_mapping_cache,
)
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.output.preview import get_most_recent_preview_frame
from frigate.track.object_processing import TrackedObjectProcessor
@ -454,11 +459,10 @@ async def recording_clip(
Recordings.end_time,
)
.where(
(Recordings.start_time.between(start_ts, end_ts))
| (Recordings.end_time.between(start_ts, end_ts))
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
(Recordings.camera == camera_name)
& (Recordings.end_time > start_ts)
& (Recordings.start_time < end_ts)
)
.where(Recordings.camera == camera_name)
.order_by(Recordings.start_time.asc())
)
@ -542,77 +546,23 @@ async def vod_ts(
end_ts,
force_discontinuity,
)
recordings = (
Recordings.select(
Recordings.path,
Recordings.duration,
Recordings.end_time,
Recordings.start_time,
)
.where(
Recordings.start_time.between(start_ts, end_ts)
| Recordings.end_time.between(start_ts, end_ts)
| ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time))
)
.where(Recordings.camera == camera_name)
.order_by(Recordings.start_time.asc())
.iterator()
)
clips = []
durations = []
min_duration_ms = 100 # Minimum 100ms to ensure at least one video frame
max_duration_ms = MAX_SEGMENT_DURATION * 1000
hour_ago = datetime.now() - timedelta(hours=1)
is_cacheable = hour_ago.timestamp() > start_ts
recording: Recordings
for recording in recordings:
logger.debug(
"VOD: processing recording: %s start=%s end=%s duration=%s",
recording.path,
recording.start_time,
recording.end_time,
recording.duration,
)
# Check in-process cache for historical (cacheable) ranges
cache_key = (camera_name, int(start_ts), int(end_ts), force_discontinuity)
if is_cacheable:
cached = vod_mapping_cache.get(cache_key)
if cached is not None:
logger.debug("VOD: cache hit for %s", cache_key)
return JSONResponse(content=cached)
clip = {"type": "source", "path": recording.path}
duration = int(recording.duration * 1000)
# A1: canonical overlap query via helper
recordings = _get_recordings_for_vod(camera_name, start_ts, end_ts)
# adjust start offset if start_ts is after recording.start_time
if start_ts > recording.start_time:
inpoint = int((start_ts - recording.start_time) * 1000)
clip["clipFrom"] = inpoint
duration -= inpoint
logger.debug(
"VOD: applied clipFrom %sms to %s",
inpoint,
recording.path,
)
# adjust end if recording.end_time is after end_ts
if recording.end_time > end_ts:
duration -= int((recording.end_time - end_ts) * 1000)
if duration < min_duration_ms:
# skip if the clip has no valid duration (too short to contain frames)
logger.debug(
"VOD: skipping recording %s - resulting duration %sms too short",
recording.path,
duration,
)
continue
if min_duration_ms <= duration < max_duration_ms:
clip["keyFrameDurations"] = [duration]
clips.append(clip)
durations.append(duration)
logger.debug(
"VOD: added clip %s duration_ms=%s clipFrom=%s",
recording.path,
duration,
clip.get("clipFrom"),
)
else:
logger.warning(f"Recording clip is missing or empty: {recording.path}")
# A4: normalize into clips via helper
clips, durations = _normalize_recordings_to_vod_clips(recordings, start_ts, end_ts)
if not clips:
logger.error(
@ -626,18 +576,17 @@ async def vod_ts(
status_code=404,
)
hour_ago = datetime.now() - timedelta(hours=1)
return JSONResponse(
content={
"cache": hour_ago.timestamp() > start_ts,
"discontinuity": force_discontinuity,
"consistentSequenceMediaInfo": True,
"durations": durations,
"segment_duration": max(durations),
"sequences": [{"clips": clips}],
}
# A4: build payload via helper
payload = _build_vod_mapping_payload(
clips, durations, start_ts, force_discontinuity, is_cacheable
)
# A3: cache historical payloads
if is_cacheable:
vod_mapping_cache.put(cache_key, payload)
return JSONResponse(content=payload)
@router.get(
"/vod/{year_month}/{day}/{hour}/{camera_name}",

175
frigate/api/vod.py Normal file
View File

@ -0,0 +1,175 @@
"""VOD query, normalization, and payload assembly helpers."""
import logging
import threading
import time
from collections import OrderedDict
from frigate.const import MAX_SEGMENT_DURATION
from frigate.models import Recordings
logger = logging.getLogger(__name__)
def _get_recordings_for_vod(
camera_name: str, start_ts: float, end_ts: float
) -> list[dict]:
"""Query recordings that overlap the requested time range.
Uses the canonical overlap predicate: two intervals [A, B) and [C, D)
overlap iff A < D AND C < B. This is index-friendly and equivalent to
the previous OR-based predicate for playback purposes.
"""
recordings = (
Recordings.select(
Recordings.path,
Recordings.duration,
Recordings.end_time,
Recordings.start_time,
)
.where(
(Recordings.camera == camera_name)
& (Recordings.end_time > start_ts)
& (Recordings.start_time < end_ts)
)
.order_by(Recordings.start_time.asc())
.iterator()
)
results = []
for recording in recordings:
results.append(
{
"path": recording.path,
"duration": recording.duration,
"start_time": recording.start_time,
"end_time": recording.end_time,
}
)
return results
def _normalize_recordings_to_vod_clips(
recordings: list[dict], start_ts: float, end_ts: float
) -> tuple[list[dict], list[int]]:
"""Convert raw recording rows into trimmed VOD clips with durations.
Returns a tuple of (clips, durations) where clips are nginx-vod source
clip dicts and durations are in milliseconds.
"""
clips = []
durations = []
min_duration_ms = 100 # Minimum 100ms to ensure at least one video frame
max_duration_ms = MAX_SEGMENT_DURATION * 1000
for rec in recordings:
logger.debug(
"VOD: processing recording: %s start=%s end=%s duration=%s",
rec["path"],
rec["start_time"],
rec["end_time"],
rec["duration"],
)
clip: dict = {"type": "source", "path": rec["path"]}
duration = int(rec["duration"] * 1000)
# adjust start offset if start_ts is after recording start
if start_ts > rec["start_time"]:
inpoint = int((start_ts - rec["start_time"]) * 1000)
clip["clipFrom"] = inpoint
duration -= inpoint
logger.debug(
"VOD: applied clipFrom %sms to %s",
inpoint,
rec["path"],
)
# adjust end if recording ends after end_ts
if rec["end_time"] > end_ts:
duration -= int((rec["end_time"] - end_ts) * 1000)
if duration < min_duration_ms:
logger.debug(
"VOD: skipping recording %s - resulting duration %sms too short",
rec["path"],
duration,
)
continue
if min_duration_ms <= duration < max_duration_ms:
clip["keyFrameDurations"] = [duration]
clips.append(clip)
durations.append(duration)
logger.debug(
"VOD: added clip %s duration_ms=%s clipFrom=%s",
rec["path"],
duration,
clip.get("clipFrom"),
)
else:
logger.warning(f"Recording clip is missing or empty: {rec['path']}")
return clips, durations
def _build_vod_mapping_payload(
clips: list[dict],
durations: list[int],
start_ts: float,
force_discontinuity: bool,
is_cacheable: bool,
) -> dict:
"""Assemble the final VOD mapping response payload."""
return {
"cache": is_cacheable,
"discontinuity": force_discontinuity,
"consistentSequenceMediaInfo": True,
"durations": durations,
"segment_duration": max(durations),
"sequences": [{"clips": clips}],
}
# ---------------------------------------------------------------------------
# In-process LRU cache for historical VOD mapping payloads (A3)
# ---------------------------------------------------------------------------
_VOD_CACHE_MAX_SIZE = 256
_VOD_CACHE_TTL_SECONDS = 300 # 5 minutes
class _VodMappingCache:
"""Thread-safe bounded LRU cache for historical VOD payloads."""
def __init__(self, max_size: int = _VOD_CACHE_MAX_SIZE, ttl: float = _VOD_CACHE_TTL_SECONDS):
self._max_size = max_size
self._ttl = ttl
self._cache: OrderedDict[tuple, tuple[float, dict]] = OrderedDict()
self._lock = threading.Lock()
def get(self, key: tuple) -> dict | None:
with self._lock:
entry = self._cache.get(key)
if entry is None:
return None
ts, payload = entry
if time.monotonic() - ts > self._ttl:
del self._cache[key]
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
return payload
def put(self, key: tuple, payload: dict) -> None:
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (time.monotonic(), payload)
else:
if len(self._cache) >= self._max_size:
self._cache.popitem(last=False)
self._cache[key] = (time.monotonic(), payload)
vod_mapping_cache = _VodMappingCache()

View File

@ -0,0 +1,30 @@
"""Peewee migrations -- 036_add_recordings_vod_indexes.py.
Add indexes to the recordings table optimized for the VOD overlap query:
WHERE camera = ? AND end_time > ? AND start_time < ?
ORDER BY start_time ASC
The composite index (camera, start_time, end_time) covers the full predicate
and ordering in a single B-tree walk. The (camera, end_time) index gives the
planner an alternative access path for the end_time > ? filter.
"""
import peewee as pw
SQL = pw.SQL
def migrate(migrator, database, fake=False, **kwargs):
migrator.sql(
'CREATE INDEX IF NOT EXISTS "idx_recordings_camera_start_end" '
'ON "recordings" ("camera", "start_time", "end_time")'
)
migrator.sql(
'CREATE INDEX IF NOT EXISTS "idx_recordings_camera_end" '
'ON "recordings" ("camera", "end_time")'
)
def rollback(migrator, database, fake=False, **kwargs):
migrator.sql('DROP INDEX IF EXISTS "idx_recordings_camera_start_end"')
migrator.sql('DROP INDEX IF EXISTS "idx_recordings_camera_end"')