frigate/frigate/api/export.py
Josh Hawkins 048475e750
API admin exemptions and route guard updates (#21094)
* update exempt paths and add missing guard to api endpoints

* admin only frigate+ submission
2025-11-29 07:30:04 -06:00

292 lines
8.8 KiB
Python

"""Export apis."""
import logging
import random
import string
from pathlib import Path
from typing import List
import psutil
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse
from pathvalidate import sanitize_filepath
from peewee import DoesNotExist
from playhouse.shortcuts import model_to_dict
from frigate.api.auth import (
allow_any_authenticated,
get_allowed_cameras_for_filter,
require_camera_access,
require_role,
)
from frigate.api.defs.request.export_recordings_body import ExportRecordingsBody
from frigate.api.defs.request.export_rename_body import ExportRenameBody
from frigate.api.defs.response.export_response import (
ExportModel,
ExportsResponse,
StartExportResponse,
)
from frigate.api.defs.response.generic_response import GenericResponse
from frigate.api.defs.tags import Tags
from frigate.const import CLIPS_DIR, EXPORT_DIR
from frigate.models import Export, Previews, Recordings
from frigate.record.export import (
PlaybackFactorEnum,
PlaybackSourceEnum,
RecordingExporter,
)
from frigate.util.time import is_current_hour
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.export])
@router.get(
"/exports",
response_model=ExportsResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get exports",
description="""Gets all exports from the database for cameras the user has access to.
Returns a list of exports ordered by date (most recent first).""",
)
def get_exports(
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
):
exports = (
Export.select()
.where(Export.camera << allowed_cameras)
.order_by(Export.date.desc())
.dicts()
.iterator()
)
return JSONResponse(content=[e for e in exports])
@router.post(
"/export/{camera_name}/start/{start_time}/end/{end_time}",
response_model=StartExportResponse,
dependencies=[Depends(require_camera_access)],
summary="Start recording export",
description="""Starts an export of a recording for the specified time range.
The export can be from recordings or preview footage. Returns the export ID if
successful, or an error message if the camera is invalid or no recordings/previews
are found for the time range.""",
)
def export_recording(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
body: ExportRecordingsBody,
):
if not camera_name or not request.app.frigate_config.cameras.get(camera_name):
return JSONResponse(
content=(
{"success": False, "message": f"{camera_name} is not a valid camera."}
),
status_code=404,
)
playback_factor = body.playback
playback_source = body.source
friendly_name = body.name
existing_image = sanitize_filepath(body.image_path) if body.image_path else None
# Ensure that existing_image is a valid path
if existing_image and not existing_image.startswith(CLIPS_DIR):
return JSONResponse(
content=({"success": False, "message": "Invalid image path"}),
status_code=400,
)
if playback_source == "recordings":
recordings_count = (
Recordings.select()
.where(
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| (
(start_time > Recordings.start_time)
& (end_time < Recordings.end_time)
)
)
.where(Recordings.camera == camera_name)
.count()
)
if recordings_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No recordings found for time range"}
),
status_code=400,
)
else:
previews_count = (
Previews.select()
.where(
Previews.start_time.between(start_time, end_time)
| Previews.end_time.between(start_time, end_time)
| ((start_time > Previews.start_time) & (end_time < Previews.end_time))
)
.where(Previews.camera == camera_name)
.count()
)
if not is_current_hour(start_time) and previews_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No previews found for time range"}
),
status_code=400,
)
export_id = f"{camera_name}_{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
exporter = RecordingExporter(
request.app.frigate_config,
export_id,
camera_name,
friendly_name,
existing_image,
int(start_time),
int(end_time),
(
PlaybackFactorEnum[playback_factor]
if playback_factor in PlaybackFactorEnum.__members__.values()
else PlaybackFactorEnum.realtime
),
(
PlaybackSourceEnum[playback_source]
if playback_source in PlaybackSourceEnum.__members__.values()
else PlaybackSourceEnum.recordings
),
)
exporter.start()
return JSONResponse(
content=(
{
"success": True,
"message": "Starting export of recording.",
"export_id": export_id,
}
),
status_code=200,
)
@router.patch(
"/export/{event_id}/rename",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Rename export",
description="""Renames an export.
NOTE: This changes the friendly name of the export, not the filename.
""",
)
async def export_rename(event_id: str, body: ExportRenameBody, request: Request):
try:
export: Export = Export.get(Export.id == event_id)
await require_camera_access(export.camera, request=request)
except DoesNotExist:
return JSONResponse(
content=(
{
"success": False,
"message": "Export not found.",
}
),
status_code=404,
)
export.name = body.name
export.save()
return JSONResponse(
content=(
{
"success": True,
"message": "Successfully renamed export.",
}
),
status_code=200,
)
@router.delete(
"/export/{event_id}",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete export",
)
async def export_delete(event_id: str, request: Request):
try:
export: Export = Export.get(Export.id == event_id)
await require_camera_access(export.camera, request=request)
except DoesNotExist:
return JSONResponse(
content=(
{
"success": False,
"message": "Export not found.",
}
),
status_code=404,
)
files_in_use = []
for process in psutil.process_iter():
try:
if process.name() != "ffmpeg":
continue
file_list = process.open_files()
if file_list:
for nt in file_list:
if nt.path.startswith(EXPORT_DIR):
files_in_use.append(nt.path.split("/")[-1])
except psutil.Error:
continue
if export.video_path.split("/")[-1] in files_in_use:
return JSONResponse(
content=(
{"success": False, "message": "Can not delete in progress export."}
),
status_code=400,
)
Path(export.video_path).unlink(missing_ok=True)
if export.thumb_path:
Path(export.thumb_path).unlink(missing_ok=True)
export.delete_instance()
return JSONResponse(
content=(
{
"success": True,
"message": "Successfully deleted export.",
}
),
status_code=200,
)
@router.get(
"/exports/{export_id}",
response_model=ExportModel,
dependencies=[Depends(allow_any_authenticated())],
summary="Get a single export",
description="""Gets a specific export by ID. The user must have access to the camera
associated with the export.""",
)
async def get_export(export_id: str, request: Request):
try:
export = Export.get(Export.id == export_id)
await require_camera_access(export.camera, request=request)
return JSONResponse(content=model_to_dict(export))
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export not found"},
status_code=404,
)