frigate/frigate/api/preview.py
Nicolas Mowen 3c7e36fb16
Api docs updates (#20388)
* Update classification API docs

* Add information to events api

* Fix tag

* Add exports

* Add generic response to model for classification apis

* Add preview API information

* Cleanup

* Cleanup
2025-10-08 14:55:38 -05:00

145 lines
4.8 KiB
Python

"""Preview apis."""
import logging
import os
from datetime import datetime, timedelta, timezone
import pytz
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from frigate.api.auth import require_camera_access
from frigate.api.defs.response.preview_response import (
PreviewFramesResponse,
PreviewsResponse,
)
from frigate.api.defs.tags import Tags
from frigate.const import BASE_DIR, CACHE_DIR, PREVIEW_FRAME_TYPE
from frigate.models import Previews
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.preview])
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}",
response_model=PreviewsResponse,
dependencies=[Depends(require_camera_access)],
summary="Get preview clips for time range",
description="""Gets all preview clips for a specified camera and time range.
Returns a list of preview video clips that overlap with the requested time period,
ordered by start time. Use camera_name='all' to get previews from all cameras.
Returns an error if no previews are found.""",
)
def preview_ts(camera_name: str, start_ts: float, end_ts: float):
"""Get all mp4 previews relevant for time period."""
if camera_name != "all":
camera_clause = Previews.camera == camera_name
else:
camera_clause = True
previews = (
Previews.select(
Previews.camera,
Previews.path,
Previews.duration,
Previews.start_time,
Previews.end_time,
)
.where(
Previews.start_time.between(start_ts, end_ts)
| Previews.end_time.between(start_ts, end_ts)
| ((start_ts > Previews.start_time) & (end_ts < Previews.end_time))
)
.where(camera_clause)
.order_by(Previews.start_time.asc())
.dicts()
.iterator()
)
clips = []
preview: Previews
for preview in previews:
clips.append(
{
"camera": preview["camera"],
"src": preview["path"].replace(BASE_DIR, ""),
"type": "video/mp4",
"start": preview["start_time"],
"end": preview["end_time"],
}
)
if not clips:
return JSONResponse(
content={
"success": False,
"message": "No previews found.",
},
status_code=404,
)
return JSONResponse(content=clips, status_code=200)
@router.get(
"/preview/{year_month}/{day}/{hour}/{camera_name}/{tz_name}",
response_model=PreviewsResponse,
dependencies=[Depends(require_camera_access)],
summary="Get preview clips for specific hour",
description="""Gets all preview clips for a specific hour in a given timezone.
Converts the provided date/time from the specified timezone to UTC and retrieves
all preview clips for that hour. Use camera_name='all' to get previews from all cameras.
The tz_name should be a timezone like 'America/New_York' (use commas instead of slashes).""",
)
def preview_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: str):
"""Get all mp4 previews relevant for time period given the timezone"""
parts = year_month.split("-")
start_date = (
datetime(int(parts[0]), int(parts[1]), int(day), int(hour), tzinfo=timezone.utc)
- datetime.now(pytz.timezone(tz_name.replace(",", "/"))).utcoffset()
)
end_date = start_date + timedelta(hours=1) - timedelta(milliseconds=1)
start_ts = start_date.timestamp()
end_ts = end_date.timestamp()
return preview_ts(camera_name, start_ts, end_ts)
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}/frames",
response_model=PreviewFramesResponse,
dependencies=[Depends(require_camera_access)],
summary="Get cached preview frame filenames",
description="""Gets a list of cached preview frame filenames for a specific camera and time range.
Returns an array of filenames for preview frames that fall within the specified time period,
sorted in chronological order. These are individual frame images cached for quick preview display.""",
)
def get_preview_frames_from_cache(camera_name: str, start_ts: float, end_ts: float):
"""Get list of cached preview frames"""
preview_dir = os.path.join(CACHE_DIR, "preview_frames")
file_start = f"preview_{camera_name}"
start_file = f"{file_start}-{start_ts}.{PREVIEW_FRAME_TYPE}"
end_file = f"{file_start}-{end_ts}.{PREVIEW_FRAME_TYPE}"
selected_previews = []
for file in sorted(os.listdir(preview_dir)):
if not file.startswith(file_start):
continue
if file < start_file:
continue
if file > end_file:
break
selected_previews.append(file)
return JSONResponse(
content=selected_previews,
status_code=200,
)