frigate/frigate/api/preview.py
Josh Hawkins d036061e3f
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
cache the preview_frames directory listing so concurrent per-camera frame requests share one scan instead of each re-listing the whole directory (#23526)
2026-06-20 14:56:05 -05:00

194 lines
6.4 KiB
Python

"""Preview apis."""
import bisect
import logging
import os
import threading
from datetime import datetime, timedelta, timezone
import pytz
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from frigate.api.auth import (
allow_any_authenticated,
get_allowed_cameras_for_filter,
require_camera_access,
)
from frigate.api.defs.response.preview_response import (
PreviewFramesResponse,
PreviewsResponse,
)
from frigate.api.defs.tags import Tags
from frigate.const import BASE_DIR, CACHE_DIR, PREVIEW_FRAME_TYPE
from frigate.models import Previews
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.preview])
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}",
response_model=PreviewsResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get preview clips for time range",
description="""Gets all preview clips for a specified camera and time range.
Returns a list of preview video clips that overlap with the requested time period,
ordered by start time. Use camera_name='all' to get previews from all cameras.
Returns an error if no previews are found.""",
)
def preview_ts(
camera_name: str,
start_ts: float,
end_ts: float,
allowed_cameras: list[str] = Depends(get_allowed_cameras_for_filter),
):
"""Get all mp4 previews relevant for time period."""
if camera_name != "all":
if camera_name not in allowed_cameras:
raise HTTPException(status_code=403, detail="Access denied for camera")
camera_list = [camera_name]
else:
camera_list = allowed_cameras
if not camera_list:
return JSONResponse(
content={"success": False, "message": "No previews found."},
status_code=404,
)
previews = (
Previews.select(
Previews.camera,
Previews.path,
Previews.duration,
Previews.start_time,
Previews.end_time,
)
.where(
Previews.start_time.between(start_ts, end_ts)
| Previews.end_time.between(start_ts, end_ts)
| ((start_ts > Previews.start_time) & (end_ts < Previews.end_time))
)
.where(Previews.camera << camera_list)
.order_by(Previews.start_time.asc())
.dicts()
.iterator()
)
clips = []
preview: Previews
for preview in previews:
clips.append(
{
"camera": preview["camera"],
"src": preview["path"].replace(BASE_DIR, ""),
"type": "video/mp4",
"start": preview["start_time"],
"end": preview["end_time"],
}
)
if not clips:
return JSONResponse(
content={
"success": False,
"message": "No previews found.",
},
status_code=404,
)
return JSONResponse(content=clips, status_code=200)
@router.get(
"/preview/{year_month}/{day}/{hour}/{camera_name}/{tz_name}",
response_model=PreviewsResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get preview clips for specific hour",
description="""Gets all preview clips for a specific hour in a given timezone.
Converts the provided date/time from the specified timezone to UTC and retrieves
all preview clips for that hour. Use camera_name='all' to get previews from all cameras.
The tz_name should be a timezone like 'America/New_York' (use commas instead of slashes).""",
)
def preview_hour(
year_month: str,
day: int,
hour: int,
camera_name: str,
tz_name: str,
allowed_cameras: list[str] = Depends(get_allowed_cameras_for_filter),
):
"""Get all mp4 previews relevant for time period given the timezone"""
parts = year_month.split("-")
start_date = (
datetime(int(parts[0]), int(parts[1]), int(day), int(hour), tzinfo=timezone.utc)
- datetime.now(pytz.timezone(tz_name.replace(",", "/"))).utcoffset()
)
end_date = start_date + timedelta(hours=1) - timedelta(milliseconds=1)
start_ts = start_date.timestamp()
end_ts = end_date.timestamp()
return preview_ts(camera_name, start_ts, end_ts, allowed_cameras)
# cache one sorted listing of the shared preview_frames dir
_preview_listing_lock = threading.Lock()
_preview_listing_cache: tuple[float, list[str]] = (-1.0, [])
def _get_preview_frame_listing(preview_dir: str) -> list[str]:
"""Return the sorted preview_frames listing, cached until the dir changes."""
global _preview_listing_cache
# mtime bumps when a frame is added or removed, invalidating the cache
mtime = os.stat(preview_dir).st_mtime
cached_mtime, files = _preview_listing_cache
if mtime == cached_mtime:
return files
with _preview_listing_lock:
# another thread may have refreshed the cache while we waited
cached_mtime, files = _preview_listing_cache
if mtime == cached_mtime:
return files
files = sorted(entry.name for entry in os.scandir(preview_dir))
_preview_listing_cache = (mtime, files)
return files
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}/frames",
response_model=PreviewFramesResponse,
dependencies=[Depends(require_camera_access)],
summary="Get cached preview frame filenames",
description="""Gets a list of cached preview frame filenames for a specific camera and time range.
Returns an array of filenames for preview frames that fall within the specified time period,
sorted in chronological order. These are individual frame images cached for quick preview display.""",
)
def get_preview_frames_from_cache(camera_name: str, start_ts: float, end_ts: float):
"""Get list of cached preview frames"""
preview_dir = os.path.join(CACHE_DIR, "preview_frames")
file_start = f"preview_{camera_name}-"
start_file = f"{file_start}{start_ts}.{PREVIEW_FRAME_TYPE}"
end_file = f"{file_start}{end_ts}.{PREVIEW_FRAME_TYPE}"
files = _get_preview_frame_listing(preview_dir)
# a camera's frames form a contiguous slice of the sorted listing;
# bisect locates it without scanning the whole directory
left = bisect.bisect_left(files, start_file)
right = bisect.bisect_right(files, end_file)
selected_previews = [
file for file in files[left:right] if file.startswith(file_start)
]
return JSONResponse(
content=selected_previews,
status_code=200,
)