diff --git a/frigate/api/media.py b/frigate/api/media.py index 903cf60c0..2d51b6473 100644 --- a/frigate/api/media.py +++ b/frigate/api/media.py @@ -39,9 +39,14 @@ from frigate.const import ( CACHE_DIR, CLIPS_DIR, INSTALL_DIR, - MAX_SEGMENT_DURATION, PREVIEW_FRAME_TYPE, ) +from frigate.api.vod import ( + _build_vod_mapping_payload, + _get_recordings_for_vod, + _normalize_recordings_to_vod_clips, + vod_mapping_cache, +) from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment from frigate.output.preview import get_most_recent_preview_frame from frigate.track.object_processing import TrackedObjectProcessor @@ -454,11 +459,10 @@ async def recording_clip( Recordings.end_time, ) .where( - (Recordings.start_time.between(start_ts, end_ts)) - | (Recordings.end_time.between(start_ts, end_ts)) - | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) + (Recordings.camera == camera_name) + & (Recordings.end_time > start_ts) + & (Recordings.start_time < end_ts) ) - .where(Recordings.camera == camera_name) .order_by(Recordings.start_time.asc()) ) @@ -542,77 +546,23 @@ async def vod_ts( end_ts, force_discontinuity, ) - recordings = ( - Recordings.select( - Recordings.path, - Recordings.duration, - Recordings.end_time, - Recordings.start_time, - ) - .where( - Recordings.start_time.between(start_ts, end_ts) - | Recordings.end_time.between(start_ts, end_ts) - | ((start_ts > Recordings.start_time) & (end_ts < Recordings.end_time)) - ) - .where(Recordings.camera == camera_name) - .order_by(Recordings.start_time.asc()) - .iterator() - ) - clips = [] - durations = [] - min_duration_ms = 100 # Minimum 100ms to ensure at least one video frame - max_duration_ms = MAX_SEGMENT_DURATION * 1000 + hour_ago = datetime.now() - timedelta(hours=1) + is_cacheable = hour_ago.timestamp() > start_ts - recording: Recordings - for recording in recordings: - logger.debug( - "VOD: processing recording: %s start=%s end=%s duration=%s", - recording.path, - recording.start_time, - recording.end_time, - recording.duration, - ) + # Check in-process cache for historical (cacheable) ranges + cache_key = (camera_name, int(start_ts), int(end_ts), force_discontinuity) + if is_cacheable: + cached = vod_mapping_cache.get(cache_key) + if cached is not None: + logger.debug("VOD: cache hit for %s", cache_key) + return JSONResponse(content=cached) - clip = {"type": "source", "path": recording.path} - duration = int(recording.duration * 1000) + # A1: canonical overlap query via helper + recordings = _get_recordings_for_vod(camera_name, start_ts, end_ts) - # adjust start offset if start_ts is after recording.start_time - if start_ts > recording.start_time: - inpoint = int((start_ts - recording.start_time) * 1000) - clip["clipFrom"] = inpoint - duration -= inpoint - logger.debug( - "VOD: applied clipFrom %sms to %s", - inpoint, - recording.path, - ) - - # adjust end if recording.end_time is after end_ts - if recording.end_time > end_ts: - duration -= int((recording.end_time - end_ts) * 1000) - - if duration < min_duration_ms: - # skip if the clip has no valid duration (too short to contain frames) - logger.debug( - "VOD: skipping recording %s - resulting duration %sms too short", - recording.path, - duration, - ) - continue - - if min_duration_ms <= duration < max_duration_ms: - clip["keyFrameDurations"] = [duration] - clips.append(clip) - durations.append(duration) - logger.debug( - "VOD: added clip %s duration_ms=%s clipFrom=%s", - recording.path, - duration, - clip.get("clipFrom"), - ) - else: - logger.warning(f"Recording clip is missing or empty: {recording.path}") + # A4: normalize into clips via helper + clips, durations = _normalize_recordings_to_vod_clips(recordings, start_ts, end_ts) if not clips: logger.error( @@ -626,18 +576,17 @@ async def vod_ts( status_code=404, ) - hour_ago = datetime.now() - timedelta(hours=1) - return JSONResponse( - content={ - "cache": hour_ago.timestamp() > start_ts, - "discontinuity": force_discontinuity, - "consistentSequenceMediaInfo": True, - "durations": durations, - "segment_duration": max(durations), - "sequences": [{"clips": clips}], - } + # A4: build payload via helper + payload = _build_vod_mapping_payload( + clips, durations, start_ts, force_discontinuity, is_cacheable ) + # A3: cache historical payloads + if is_cacheable: + vod_mapping_cache.put(cache_key, payload) + + return JSONResponse(content=payload) + @router.get( "/vod/{year_month}/{day}/{hour}/{camera_name}", diff --git a/frigate/api/vod.py b/frigate/api/vod.py new file mode 100644 index 000000000..c054bced8 --- /dev/null +++ b/frigate/api/vod.py @@ -0,0 +1,175 @@ +"""VOD query, normalization, and payload assembly helpers.""" + +import logging +import threading +import time +from collections import OrderedDict + +from frigate.const import MAX_SEGMENT_DURATION +from frigate.models import Recordings + +logger = logging.getLogger(__name__) + + +def _get_recordings_for_vod( + camera_name: str, start_ts: float, end_ts: float +) -> list[dict]: + """Query recordings that overlap the requested time range. + + Uses the canonical overlap predicate: two intervals [A, B) and [C, D) + overlap iff A < D AND C < B. This is index-friendly and equivalent to + the previous OR-based predicate for playback purposes. + """ + recordings = ( + Recordings.select( + Recordings.path, + Recordings.duration, + Recordings.end_time, + Recordings.start_time, + ) + .where( + (Recordings.camera == camera_name) + & (Recordings.end_time > start_ts) + & (Recordings.start_time < end_ts) + ) + .order_by(Recordings.start_time.asc()) + .iterator() + ) + + results = [] + for recording in recordings: + results.append( + { + "path": recording.path, + "duration": recording.duration, + "start_time": recording.start_time, + "end_time": recording.end_time, + } + ) + return results + + +def _normalize_recordings_to_vod_clips( + recordings: list[dict], start_ts: float, end_ts: float +) -> tuple[list[dict], list[int]]: + """Convert raw recording rows into trimmed VOD clips with durations. + + Returns a tuple of (clips, durations) where clips are nginx-vod source + clip dicts and durations are in milliseconds. + """ + clips = [] + durations = [] + min_duration_ms = 100 # Minimum 100ms to ensure at least one video frame + max_duration_ms = MAX_SEGMENT_DURATION * 1000 + + for rec in recordings: + logger.debug( + "VOD: processing recording: %s start=%s end=%s duration=%s", + rec["path"], + rec["start_time"], + rec["end_time"], + rec["duration"], + ) + + clip: dict = {"type": "source", "path": rec["path"]} + duration = int(rec["duration"] * 1000) + + # adjust start offset if start_ts is after recording start + if start_ts > rec["start_time"]: + inpoint = int((start_ts - rec["start_time"]) * 1000) + clip["clipFrom"] = inpoint + duration -= inpoint + logger.debug( + "VOD: applied clipFrom %sms to %s", + inpoint, + rec["path"], + ) + + # adjust end if recording ends after end_ts + if rec["end_time"] > end_ts: + duration -= int((rec["end_time"] - end_ts) * 1000) + + if duration < min_duration_ms: + logger.debug( + "VOD: skipping recording %s - resulting duration %sms too short", + rec["path"], + duration, + ) + continue + + if min_duration_ms <= duration < max_duration_ms: + clip["keyFrameDurations"] = [duration] + clips.append(clip) + durations.append(duration) + logger.debug( + "VOD: added clip %s duration_ms=%s clipFrom=%s", + rec["path"], + duration, + clip.get("clipFrom"), + ) + else: + logger.warning(f"Recording clip is missing or empty: {rec['path']}") + + return clips, durations + + +def _build_vod_mapping_payload( + clips: list[dict], + durations: list[int], + start_ts: float, + force_discontinuity: bool, + is_cacheable: bool, +) -> dict: + """Assemble the final VOD mapping response payload.""" + return { + "cache": is_cacheable, + "discontinuity": force_discontinuity, + "consistentSequenceMediaInfo": True, + "durations": durations, + "segment_duration": max(durations), + "sequences": [{"clips": clips}], + } + + +# --------------------------------------------------------------------------- +# In-process LRU cache for historical VOD mapping payloads (A3) +# --------------------------------------------------------------------------- + +_VOD_CACHE_MAX_SIZE = 256 +_VOD_CACHE_TTL_SECONDS = 300 # 5 minutes + + +class _VodMappingCache: + """Thread-safe bounded LRU cache for historical VOD payloads.""" + + def __init__(self, max_size: int = _VOD_CACHE_MAX_SIZE, ttl: float = _VOD_CACHE_TTL_SECONDS): + self._max_size = max_size + self._ttl = ttl + self._cache: OrderedDict[tuple, tuple[float, dict]] = OrderedDict() + self._lock = threading.Lock() + + def get(self, key: tuple) -> dict | None: + with self._lock: + entry = self._cache.get(key) + if entry is None: + return None + ts, payload = entry + if time.monotonic() - ts > self._ttl: + del self._cache[key] + return None + # Move to end (most recently used) + self._cache.move_to_end(key) + return payload + + def put(self, key: tuple, payload: dict) -> None: + with self._lock: + if key in self._cache: + self._cache.move_to_end(key) + self._cache[key] = (time.monotonic(), payload) + else: + if len(self._cache) >= self._max_size: + self._cache.popitem(last=False) + self._cache[key] = (time.monotonic(), payload) + + +vod_mapping_cache = _VodMappingCache() diff --git a/migrations/036_add_recordings_vod_indexes.py b/migrations/036_add_recordings_vod_indexes.py new file mode 100644 index 000000000..4f778a8a7 --- /dev/null +++ b/migrations/036_add_recordings_vod_indexes.py @@ -0,0 +1,30 @@ +"""Peewee migrations -- 036_add_recordings_vod_indexes.py. + +Add indexes to the recordings table optimized for the VOD overlap query: + WHERE camera = ? AND end_time > ? AND start_time < ? + ORDER BY start_time ASC + +The composite index (camera, start_time, end_time) covers the full predicate +and ordering in a single B-tree walk. The (camera, end_time) index gives the +planner an alternative access path for the end_time > ? filter. +""" + +import peewee as pw + +SQL = pw.SQL + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.sql( + 'CREATE INDEX IF NOT EXISTS "idx_recordings_camera_start_end" ' + 'ON "recordings" ("camera", "start_time", "end_time")' + ) + migrator.sql( + 'CREATE INDEX IF NOT EXISTS "idx_recordings_camera_end" ' + 'ON "recordings" ("camera", "end_time")' + ) + + +def rollback(migrator, database, fake=False, **kwargs): + migrator.sql('DROP INDEX IF EXISTS "idx_recordings_camera_start_end"') + migrator.sql('DROP INDEX IF EXISTS "idx_recordings_camera_end"')