This commit is contained in:
Josh Hawkins 2026-04-11 07:52:08 -05:00
parent 335229d0d4
commit 855e5cc0f9
3 changed files with 413 additions and 162 deletions

View File

@ -0,0 +1,42 @@
from typing import List, Optional
from pydantic import BaseModel, Field, model_validator
class BatchExportBody(BaseModel):
start_time: float = Field(title="Start time")
end_time: float = Field(title="End time")
camera_ids: List[str] = Field(title="Camera IDs", min_length=1)
name: Optional[str] = Field(
default=None,
title="Friendly name template",
max_length=256,
description="Base export name. Each export is saved as '<name> - <camera>'",
)
export_case_id: Optional[str] = Field(
default=None,
title="Export case ID",
max_length=30,
description="Existing export case ID to assign all exports to",
)
new_case_name: Optional[str] = Field(
default=None,
title="New case name",
max_length=100,
description="Name of a new export case to create when export_case_id is omitted",
)
new_case_description: Optional[str] = Field(
default=None,
title="New case description",
description="Optional description for a newly created export case",
)
@model_validator(mode="after")
def validate_case_target(self) -> "BatchExportBody":
if self.end_time <= self.start_time:
raise ValueError("end_time must be after start_time")
if self.export_case_id is None and self.new_case_name is None:
raise ValueError("Either export_case_id or new_case_name must be provided")
return self

View File

@ -30,4 +30,29 @@ class StartExportResponse(BaseModel):
)
class BatchExportResultModel(BaseModel):
"""Per-camera result for a batch export request."""
camera: str = Field(description="Camera name for this export attempt")
export_id: Optional[str] = Field(
default=None,
description="The export ID when the export was successfully queued",
)
success: bool = Field(description="Whether the export was successfully queued")
error: Optional[str] = Field(
default=None,
description="Validation or queueing error for this camera, if any",
)
class BatchExportResponse(BaseModel):
"""Response model for starting a multi-camera export batch."""
export_case_id: str = Field(description="Export case ID associated with the batch")
export_ids: List[str] = Field(description="Export IDs successfully queued")
results: List[BatchExportResultModel] = Field(
description="Per-camera batch export results"
)
ExportsResponse = List[ExportModel]

View File

@ -1,8 +1,11 @@
"""Export apis."""
import datetime
import logging
import random
import string
import threading
import time
from pathlib import Path
from typing import List, Optional
@ -19,6 +22,7 @@ from frigate.api.auth import (
require_camera_access,
require_role,
)
from frigate.api.defs.request.batch_export_body import BatchExportBody
from frigate.api.defs.request.export_case_body import (
ExportCaseAssignBody,
ExportCaseCreateBody,
@ -34,6 +38,7 @@ from frigate.api.defs.response.export_case_response import (
ExportCasesResponse,
)
from frigate.api.defs.response.export_response import (
BatchExportResponse,
ExportModel,
ExportsResponse,
StartExportResponse,
@ -54,6 +59,214 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.export])
EXPORT_START_SEMAPHORE = threading.Semaphore(3)
class ManagedRecordingExporter(RecordingExporter):
"""Recording exporter that releases the shared concurrency slot on exit."""
def run(self) -> None:
try:
super().run()
finally:
EXPORT_START_SEMAPHORE.release()
def _generate_id(length: int = 12) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
def _generate_export_id(camera_name: str) -> str:
return f"{camera_name}_{_generate_id(6)}"
def _create_export_case_record(
name: str,
description: Optional[str],
) -> ExportCase:
now = datetime.datetime.fromtimestamp(time.time())
return ExportCase.create(
id=_generate_id(),
name=name,
description=description,
created_at=now,
updated_at=now,
)
def _validate_camera_name(request: Request, camera_name: str) -> Optional[JSONResponse]:
if camera_name and request.app.frigate_config.cameras.get(camera_name):
return None
return JSONResponse(
content={"success": False, "message": f"{camera_name} is not a valid camera."},
status_code=404,
)
def _validate_export_case(export_case_id: Optional[str]) -> Optional[JSONResponse]:
if export_case_id is None:
return None
try:
ExportCase.get(ExportCase.id == export_case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
return None
def _sanitize_existing_image(
image_path: Optional[str],
) -> tuple[Optional[str], Optional[JSONResponse]]:
existing_image = sanitize_filepath(image_path) if image_path else None
if existing_image and not existing_image.startswith(CLIPS_DIR):
return None, JSONResponse(
content={"success": False, "message": "Invalid image path"},
status_code=400,
)
return existing_image, None
def _validate_export_source(
camera_name: str,
start_time: float,
end_time: float,
playback_source: PlaybackSourceEnum,
) -> Optional[str]:
if playback_source == PlaybackSourceEnum.recordings:
recordings_count = (
Recordings.select()
.where(
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| (
(start_time > Recordings.start_time)
& (end_time < Recordings.end_time)
)
)
.where(Recordings.camera == camera_name)
.count()
)
if recordings_count <= 0:
return "No recordings found for time range"
return None
previews_count = (
Previews.select()
.where(
Previews.start_time.between(start_time, end_time)
| Previews.end_time.between(start_time, end_time)
| ((start_time > Previews.start_time) & (end_time < Previews.end_time))
)
.where(Previews.camera == camera_name)
.count()
)
if not is_current_hour(start_time) and previews_count <= 0:
return "No previews found for time range"
return None
def _get_batch_recording_export_errors(
request: Request,
camera_names: list[str],
start_time: float,
end_time: float,
) -> dict[str, str]:
unique_camera_names = list(dict.fromkeys(camera_names))
configured_cameras = request.app.frigate_config.cameras
errors: dict[str, str] = {}
valid_camera_names = [
camera_name
for camera_name in unique_camera_names
if configured_cameras.get(camera_name)
]
for camera_name in unique_camera_names:
if camera_name not in valid_camera_names:
errors[camera_name] = f"{camera_name} is not a valid camera."
if not valid_camera_names:
return errors
recordings = (
Recordings.select(Recordings.camera)
.distinct()
.where(
Recordings.camera << valid_camera_names,
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| ((start_time > Recordings.start_time) & (end_time < Recordings.end_time)),
)
.iterator()
)
cameras_with_recordings = {recording.camera for recording in recordings}
for camera_name in valid_camera_names:
if camera_name not in cameras_with_recordings:
errors[camera_name] = "No recordings found for time range"
return errors
def _build_exporter(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
friendly_name: Optional[str],
existing_image: Optional[str],
playback_source: PlaybackSourceEnum,
export_case_id: Optional[str],
ffmpeg_input_args: Optional[str] = None,
ffmpeg_output_args: Optional[str] = None,
cpu_fallback: bool = False,
) -> ManagedRecordingExporter:
return ManagedRecordingExporter(
request.app.frigate_config,
_generate_export_id(camera_name),
camera_name,
friendly_name,
existing_image,
int(start_time),
int(end_time),
playback_source,
export_case_id,
ffmpeg_input_args,
ffmpeg_output_args,
cpu_fallback,
)
def _start_exporter(exporter: ManagedRecordingExporter) -> None:
EXPORT_START_SEMAPHORE.acquire()
try:
exporter.start()
except Exception:
EXPORT_START_SEMAPHORE.release()
raise
def _export_case_to_dict(case: ExportCase) -> dict[str, object]:
case_dict = model_to_dict(case)
for field in ("created_at", "updated_at"):
value = case_dict.get(field)
if isinstance(value, datetime.datetime):
case_dict[field] = value.timestamp()
return case_dict
@router.get(
"/exports",
@ -103,10 +316,8 @@ def get_exports(
description="Gets all export cases from the database.",
)
def get_export_cases():
cases = (
ExportCase.select().order_by(ExportCase.created_at.desc()).dicts().iterator()
)
return JSONResponse(content=[c for c in cases])
cases = ExportCase.select().order_by(ExportCase.created_at.desc()).iterator()
return JSONResponse(content=[_export_case_to_dict(case) for case in cases])
@router.post(
@ -117,14 +328,8 @@ def get_export_cases():
description="Creates a new export case.",
)
def create_export_case(body: ExportCaseCreateBody):
case = ExportCase.create(
id="".join(random.choices(string.ascii_lowercase + string.digits, k=12)),
name=body.name,
description=body.description,
created_at=Path().stat().st_mtime,
updated_at=Path().stat().st_mtime,
)
return JSONResponse(content=model_to_dict(case))
case = _create_export_case_record(body.name, body.description)
return JSONResponse(content=_export_case_to_dict(case))
@router.get(
@ -137,7 +342,7 @@ def create_export_case(body: ExportCaseCreateBody):
def get_export_case(case_id: str):
try:
case = ExportCase.get(ExportCase.id == case_id)
return JSONResponse(content=model_to_dict(case))
return JSONResponse(content=_export_case_to_dict(case))
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
@ -166,6 +371,8 @@ def update_export_case(case_id: str, body: ExportCaseUpdateBody):
if body.description is not None:
case.description = body.description
case.updated_at = datetime.datetime.fromtimestamp(time.time())
case.save()
return JSONResponse(
@ -241,6 +448,82 @@ async def assign_export_case(
)
@router.post(
"/exports/batch",
response_model=BatchExportResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Start multi-camera recording export",
description=(
"Starts recording exports for multiple cameras for the same time range and "
"assigns them to a single export case."
),
)
def export_recordings_batch(request: Request, body: BatchExportBody):
case_validation_error = _validate_export_case(body.export_case_id)
if case_validation_error is not None:
return case_validation_error
export_case_id = body.export_case_id
if export_case_id is None:
export_case = _create_export_case_record(
body.new_case_name or body.name or "New Case",
body.new_case_description,
)
export_case_id = export_case.id
export_ids: list[str] = []
results: list[dict[str, Optional[str] | bool]] = []
camera_errors = _get_batch_recording_export_errors(
request,
body.camera_ids,
body.start_time,
body.end_time,
)
for camera_name in dict.fromkeys(body.camera_ids):
camera_error = camera_errors.get(camera_name)
if camera_error is not None:
results.append(
{
"camera": camera_name,
"export_id": None,
"success": False,
"error": camera_error,
}
)
continue
exporter = _build_exporter(
request,
camera_name,
body.start_time,
body.end_time,
f"{body.name} - {camera_name}" if body.name else None,
None,
PlaybackSourceEnum.recordings,
export_case_id,
)
_start_exporter(exporter)
export_ids.append(exporter.export_id)
results.append(
{
"camera": camera_name,
"export_id": exporter.export_id,
"success": True,
"error": None,
}
)
return JSONResponse(
content={
"export_case_id": export_case_id,
"export_ids": export_ids,
"results": results,
}
)
@router.post(
"/export/{camera_name}/start/{start_time}/end/{end_time}",
response_model=StartExportResponse,
@ -258,100 +541,51 @@ def export_recording(
end_time: float,
body: ExportRecordingsBody,
):
if not camera_name or not request.app.frigate_config.cameras.get(camera_name):
return JSONResponse(
content=(
{"success": False, "message": f"{camera_name} is not a valid camera."}
),
status_code=404,
)
camera_validation_error = _validate_camera_name(request, camera_name)
if camera_validation_error is not None:
return camera_validation_error
playback_source = body.source
friendly_name = body.name
existing_image = sanitize_filepath(body.image_path) if body.image_path else None
existing_image, image_validation_error = _sanitize_existing_image(body.image_path)
if image_validation_error is not None:
return image_validation_error
export_case_id = body.export_case_id
if export_case_id is not None:
try:
ExportCase.get(ExportCase.id == export_case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
case_validation_error = _validate_export_case(export_case_id)
if case_validation_error is not None:
return case_validation_error
# Ensure that existing_image is a valid path
if existing_image and not existing_image.startswith(CLIPS_DIR):
source_error = _validate_export_source(
camera_name,
start_time,
end_time,
playback_source,
)
if source_error is not None:
return JSONResponse(
content=({"success": False, "message": "Invalid image path"}),
content={"success": False, "message": source_error},
status_code=400,
)
if playback_source == "recordings":
recordings_count = (
Recordings.select()
.where(
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| (
(start_time > Recordings.start_time)
& (end_time < Recordings.end_time)
)
)
.where(Recordings.camera == camera_name)
.count()
)
if recordings_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No recordings found for time range"}
),
status_code=400,
)
else:
previews_count = (
Previews.select()
.where(
Previews.start_time.between(start_time, end_time)
| Previews.end_time.between(start_time, end_time)
| ((start_time > Previews.start_time) & (end_time < Previews.end_time))
)
.where(Previews.camera == camera_name)
.count()
)
if not is_current_hour(start_time) and previews_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No previews found for time range"}
),
status_code=400,
)
export_id = f"{camera_name}_{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
exporter = RecordingExporter(
request.app.frigate_config,
export_id,
exporter = _build_exporter(
request,
camera_name,
start_time,
end_time,
friendly_name,
existing_image,
int(start_time),
int(end_time),
(
PlaybackSourceEnum[playback_source]
if playback_source in PlaybackSourceEnum.__members__.values()
else PlaybackSourceEnum.recordings
),
playback_source,
export_case_id,
)
exporter.start()
_start_exporter(exporter)
return JSONResponse(
content=(
{
"success": True,
"message": "Starting export of recording.",
"export_id": export_id,
"export_id": exporter.export_id,
}
),
status_code=200,
@ -472,82 +706,36 @@ def export_recording_custom(
end_time: float,
body: ExportRecordingsCustomBody,
):
if not camera_name or not request.app.frigate_config.cameras.get(camera_name):
return JSONResponse(
content=(
{"success": False, "message": f"{camera_name} is not a valid camera."}
),
status_code=404,
)
camera_validation_error = _validate_camera_name(request, camera_name)
if camera_validation_error is not None:
return camera_validation_error
playback_source = body.source
friendly_name = body.name
existing_image = sanitize_filepath(body.image_path) if body.image_path else None
existing_image, image_validation_error = _sanitize_existing_image(body.image_path)
if image_validation_error is not None:
return image_validation_error
ffmpeg_input_args = body.ffmpeg_input_args
ffmpeg_output_args = body.ffmpeg_output_args
cpu_fallback = body.cpu_fallback
export_case_id = body.export_case_id
if export_case_id is not None:
try:
ExportCase.get(ExportCase.id == export_case_id)
except DoesNotExist:
return JSONResponse(
content={"success": False, "message": "Export case not found"},
status_code=404,
)
case_validation_error = _validate_export_case(export_case_id)
if case_validation_error is not None:
return case_validation_error
# Ensure that existing_image is a valid path
if existing_image and not existing_image.startswith(CLIPS_DIR):
source_error = _validate_export_source(
camera_name,
start_time,
end_time,
playback_source,
)
if source_error is not None:
return JSONResponse(
content=({"success": False, "message": "Invalid image path"}),
content={"success": False, "message": source_error},
status_code=400,
)
if playback_source == "recordings":
recordings_count = (
Recordings.select()
.where(
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| (
(start_time > Recordings.start_time)
& (end_time < Recordings.end_time)
)
)
.where(Recordings.camera == camera_name)
.count()
)
if recordings_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No recordings found for time range"}
),
status_code=400,
)
else:
previews_count = (
Previews.select()
.where(
Previews.start_time.between(start_time, end_time)
| Previews.end_time.between(start_time, end_time)
| ((start_time > Previews.start_time) & (end_time < Previews.end_time))
)
.where(Previews.camera == camera_name)
.count()
)
if not is_current_hour(start_time) and previews_count <= 0:
return JSONResponse(
content=(
{"success": False, "message": "No previews found for time range"}
),
status_code=400,
)
export_id = f"{camera_name}_{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
# Validate user-provided ffmpeg args to prevent injection.
# Admin users are trusted and skip validation.
is_admin = request.headers.get("remote-role", "") == "admin"
@ -577,31 +765,27 @@ def export_recording_custom(
if ffmpeg_output_args is None:
ffmpeg_output_args = DEFAULT_TIME_LAPSE_FFMPEG_ARGS
exporter = RecordingExporter(
request.app.frigate_config,
export_id,
exporter = _build_exporter(
request,
camera_name,
start_time,
end_time,
friendly_name,
existing_image,
int(start_time),
int(end_time),
(
PlaybackSourceEnum[playback_source]
if playback_source in PlaybackSourceEnum.__members__.values()
else PlaybackSourceEnum.recordings
),
playback_source,
export_case_id,
ffmpeg_input_args,
ffmpeg_output_args,
cpu_fallback,
)
exporter.start()
_start_exporter(exporter)
return JSONResponse(
content=(
{
"success": True,
"message": "Starting export of recording.",
"export_id": export_id,
"export_id": exporter.export_id,
}
),
status_code=200,