refactor batch endpoint for multiple review items

This commit is contained in:
Josh Hawkins 2026-04-11 18:20:59 -05:00
parent 398ac80577
commit a6c405a2a5
3 changed files with 212 additions and 81 deletions

View File

@ -2,22 +2,47 @@ from typing import List, Optional
from pydantic import BaseModel, Field, model_validator from pydantic import BaseModel, Field, model_validator
MAX_BATCH_EXPORT_ITEMS = 50
class BatchExportBody(BaseModel):
class BatchExportItem(BaseModel):
camera: str = Field(title="Camera name")
start_time: float = Field(title="Start time") start_time: float = Field(title="Start time")
end_time: float = Field(title="End time") end_time: float = Field(title="End time")
camera_ids: List[str] = Field(title="Camera IDs", min_length=1) image_path: Optional[str] = Field(
name: Optional[str] = Field(
default=None, default=None,
title="Friendly name template", title="Existing thumbnail path",
description="Optional existing image to use as the export thumbnail",
)
friendly_name: Optional[str] = Field(
default=None,
title="Friendly name",
max_length=256, max_length=256,
description="Base export name. Each export is saved as '<name> - <camera>'", description="Optional friendly name for this specific export item",
)
client_item_id: Optional[str] = Field(
default=None,
title="Client item ID",
max_length=128,
description="Optional opaque client identifier echoed back in results",
)
class BatchExportBody(BaseModel):
items: List[BatchExportItem] = Field(
title="Items",
min_length=1,
max_length=MAX_BATCH_EXPORT_ITEMS,
description="List of export items. Each item has its own camera and time range.",
) )
export_case_id: Optional[str] = Field( export_case_id: Optional[str] = Field(
default=None, default=None,
title="Export case ID", title="Export case ID",
max_length=30, max_length=30,
description="Existing export case ID to assign all exports to", description=(
"Existing export case ID to assign all exports to. Attaching to an "
"existing case is temporarily admin-only until case-level ACLs exist."
),
) )
new_case_name: Optional[str] = Field( new_case_name: Optional[str] = Field(
default=None, default=None,
@ -33,8 +58,9 @@ class BatchExportBody(BaseModel):
@model_validator(mode="after") @model_validator(mode="after")
def validate_case_target(self) -> "BatchExportBody": def validate_case_target(self) -> "BatchExportBody":
if self.end_time <= self.start_time: for item in self.items:
raise ValueError("end_time must be after start_time") if item.end_time <= item.start_time:
raise ValueError("end_time must be after start_time")
if self.export_case_id is None and self.new_case_name is None: if self.export_case_id is None and self.new_case_name is None:
raise ValueError("Either export_case_id or new_case_name must be provided") raise ValueError("Either export_case_id or new_case_name must be provided")

View File

@ -35,7 +35,7 @@ class StartExportResponse(BaseModel):
class BatchExportResultModel(BaseModel): class BatchExportResultModel(BaseModel):
"""Per-camera result for a batch export request.""" """Per-item result for a batch export request."""
camera: str = Field(description="Camera name for this export attempt") camera: str = Field(description="Camera name for this export attempt")
export_id: Optional[str] = Field( export_id: Optional[str] = Field(
@ -49,12 +49,20 @@ class BatchExportResultModel(BaseModel):
) )
error: Optional[str] = Field( error: Optional[str] = Field(
default=None, default=None,
description="Validation or queueing error for this camera, if any", description="Validation or queueing error for this item, if any",
)
item_index: Optional[int] = Field(
default=None,
description="Zero-based index of this result within the request items list",
)
client_item_id: Optional[str] = Field(
default=None,
description="Opaque client-supplied item identifier echoed from the request",
) )
class BatchExportResponse(BaseModel): class BatchExportResponse(BaseModel):
"""Response model for starting a multi-camera export batch.""" """Response model for starting an export batch."""
export_case_id: Optional[str] = Field( export_case_id: Optional[str] = Field(
default=None, default=None,
@ -62,7 +70,7 @@ class BatchExportResponse(BaseModel):
) )
export_ids: List[str] = Field(description="Export IDs successfully queued") export_ids: List[str] = Field(description="Export IDs successfully queued")
results: List[BatchExportResultModel] = Field( results: List[BatchExportResultModel] = Field(
description="Per-camera batch export results" description="Per-item batch export results"
) )

View File

@ -18,10 +18,14 @@ from playhouse.shortcuts import model_to_dict
from frigate.api.auth import ( from frigate.api.auth import (
allow_any_authenticated, allow_any_authenticated,
get_allowed_cameras_for_filter, get_allowed_cameras_for_filter,
get_current_user,
require_camera_access, require_camera_access,
require_role, require_role,
) )
from frigate.api.defs.request.batch_export_body import BatchExportBody from frigate.api.defs.request.batch_export_body import (
BatchExportBody,
BatchExportItem,
)
from frigate.api.defs.request.export_case_body import ( from frigate.api.defs.request.export_case_body import (
ExportCaseAssignBody, ExportCaseAssignBody,
ExportCaseCreateBody, ExportCaseCreateBody,
@ -172,45 +176,63 @@ def _validate_export_source(
return None return None
def _get_batch_recording_export_errors( def _get_item_recording_export_errors(
request: Request, request: Request,
camera_names: list[str], items: list[BatchExportItem],
start_time: float, ) -> dict[int, str]:
end_time: float, """Return {item_index: error message} for items with invalid state.
) -> dict[str, str]:
unique_camera_names = list(dict.fromkeys(camera_names)) Checks camera configuration and recording presence per item. Groups by
camera and issues one query per unique camera covering that camera's
full requested range, then checks each item's range against the returned
rows in Python. This avoids O(N) DB round-trips on large batches.
"""
configured_cameras = request.app.frigate_config.cameras configured_cameras = request.app.frigate_config.cameras
errors: dict[str, str] = {} errors: dict[int, str] = {}
valid_camera_names = [ # Validate camera configuration first
camera_name item_ranges_by_camera: dict[str, list[tuple[int, float, float]]] = {}
for camera_name in unique_camera_names for index, item in enumerate(items):
if configured_cameras.get(camera_name) if not configured_cameras.get(item.camera):
] errors[index] = f"{item.camera} is not a valid camera."
continue
item_ranges_by_camera.setdefault(item.camera, []).append(
(index, item.start_time, item.end_time)
)
for camera_name in unique_camera_names: if not item_ranges_by_camera:
if camera_name not in valid_camera_names:
errors[camera_name] = f"{camera_name} is not a valid camera."
if not valid_camera_names:
return errors return errors
recordings = ( # For each camera, fetch recordings that cover the union of ranges
Recordings.select(Recordings.camera) for camera_name, indexed_ranges in item_ranges_by_camera.items():
.distinct() min_start = min(r[1] for r in indexed_ranges)
.where( max_end = max(r[2] for r in indexed_ranges)
Recordings.camera << valid_camera_names,
Recordings.start_time.between(start_time, end_time)
| Recordings.end_time.between(start_time, end_time)
| ((start_time > Recordings.start_time) & (end_time < Recordings.end_time)),
)
.iterator()
)
cameras_with_recordings = {recording.camera for recording in recordings}
for camera_name in valid_camera_names: recording_ranges = list(
if camera_name not in cameras_with_recordings: Recordings.select(Recordings.start_time, Recordings.end_time)
errors[camera_name] = "No recordings found for time range" .where(
Recordings.camera == camera_name,
Recordings.start_time.between(min_start, max_end)
| Recordings.end_time.between(min_start, max_end)
| (
(min_start > Recordings.start_time)
& (max_end < Recordings.end_time)
),
)
.iterator()
)
for index, start_time, end_time in indexed_ranges:
has_recording = any(
(
start_time <= rec.start_time <= end_time
or start_time <= rec.end_time <= end_time
or (start_time > rec.start_time and end_time < rec.end_time)
)
for rec in recording_ranges
)
if not has_recording:
errors[index] = "No recordings found for time range"
return errors return errors
@ -473,39 +495,90 @@ async def get_export_job_status(export_id: str, request: Request):
@router.post( @router.post(
"/exports/batch", "/exports/batch",
response_model=BatchExportResponse, response_model=BatchExportResponse,
dependencies=[Depends(require_role(["admin"]))], dependencies=[Depends(allow_any_authenticated())],
summary="Start multi-camera recording export", summary="Start recording export batch",
description=( description=(
"Starts recording exports for multiple cameras for the same time range and " "Starts recording exports for a batch of items, each with its own camera "
"assigns them to a single export case." "and time range, and assigns them to a single export case. Attaching to "
"an existing case is temporarily admin-only until case-level ACLs exist."
), ),
) )
def export_recordings_batch(request: Request, body: BatchExportBody): def export_recordings_batch(
request: Request,
body: BatchExportBody,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
current_user: dict = Depends(get_current_user),
):
if isinstance(current_user, JSONResponse):
return current_user
# Stopgap: attaching to an existing case remains admin-only until
# case-level ACLs exist. Non-admins can still create a fresh case
# as a side effect of queueing items they already have camera access to.
if body.export_case_id is not None and current_user["role"] != "admin":
return JSONResponse(
content={
"success": False,
"message": "Only admins can attach exports to an existing case.",
},
status_code=403,
)
case_validation_error = _validate_export_case(body.export_case_id) case_validation_error = _validate_export_case(body.export_case_id)
if case_validation_error is not None: if case_validation_error is not None:
return case_validation_error return case_validation_error
export_ids: list[str] = [] # Fail-closed camera access: any item referencing an inaccessible
results: list[dict[str, Optional[str] | bool]] = [] # camera rejects the whole request. The UI's review list is already
camera_errors = _get_batch_recording_export_errors( # filtered by camera access, so reaching this branch implies a stale
request, # session or a crafted request — reject loudly rather than silently
body.camera_ids, # dropping items.
body.start_time, allowed_camera_set = set(allowed_cameras)
body.end_time, for item in body.items:
) if item.camera not in allowed_camera_set:
return JSONResponse(
content={
"success": False,
"message": f"Cannot export from {item.camera}: access denied",
},
status_code=403,
)
cameras_to_queue = [ # Sanitize each item's image_path up front. A bad path in any item
camera_name # kills the whole request, consistent with single-export behavior.
for camera_name in dict.fromkeys(body.camera_ids) sanitized_images: list[Optional[str]] = []
if camera_errors.get(camera_name) is None for item in body.items:
existing_image, image_validation_error = _sanitize_existing_image(
item.image_path
)
if image_validation_error is not None:
return image_validation_error
sanitized_images.append(existing_image)
item_errors = _get_item_recording_export_errors(request, body.items)
queueable_indexes = [
index for index in range(len(body.items)) if index not in item_errors
] ]
if not queueable_indexes:
return JSONResponse(
content={
"success": False,
"message": (
"No exports could be queued: no recordings found for the "
"requested ranges."
),
},
status_code=400,
)
# Preflight admission: reject the whole batch if we can't fit every # Preflight admission: reject the whole batch if we can't fit every
# queueable camera. Prevents creating a case we'd just roll back and # queueable item. Prevents partial batches where the tail fails with
# avoids partial batches where the tail all fails with "queue full". # "queue full" after we've already created a case.
if cameras_to_queue and available_export_queue_slots( if available_export_queue_slots(request.app.frigate_config) < len(
request.app.frigate_config queueable_indexes
) < len(cameras_to_queue): ):
return JSONResponse( return JSONResponse(
content={ content={
"success": False, "success": False,
@ -516,33 +589,36 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
export_case = None export_case = None
export_case_id = body.export_case_id export_case_id = body.export_case_id
if export_case_id is None and cameras_to_queue: if export_case_id is None:
export_case = _create_export_case_record( export_case = _create_export_case_record(
body.new_case_name or body.name or "New Case", body.new_case_name or "New Case",
body.new_case_description, body.new_case_description,
) )
export_case_id = export_case.id export_case_id = export_case.id
for camera_name in dict.fromkeys(body.camera_ids): export_ids: list[str] = []
camera_error = camera_errors.get(camera_name) results: list[dict[str, Optional[str] | bool | int]] = []
if camera_error is not None: for index, item in enumerate(body.items):
if index in item_errors:
results.append( results.append(
{ {
"camera": camera_name, "camera": item.camera,
"export_id": None, "export_id": None,
"success": False, "success": False,
"status": None, "status": None,
"error": camera_error, "error": item_errors[index],
"item_index": index,
"client_item_id": item.client_item_id,
} }
) )
continue continue
export_job = _build_export_job( export_job = _build_export_job(
camera_name, item.camera,
body.start_time, item.start_time,
body.end_time, item.end_time,
f"{body.name} - {camera_name}" if body.name else None, item.friendly_name,
None, sanitized_images[index],
PlaybackSourceEnum.recordings, PlaybackSourceEnum.recordings,
export_case_id, export_case_id,
) )
@ -552,11 +628,13 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
logger.exception("Failed to queue export job %s", export_job.id) logger.exception("Failed to queue export job %s", export_job.id)
results.append( results.append(
{ {
"camera": camera_name, "camera": item.camera,
"export_id": None, "export_id": None,
"success": False, "success": False,
"status": None, "status": None,
"error": str(err), "error": str(err),
"item_index": index,
"client_item_id": item.client_item_id,
} }
) )
continue continue
@ -564,11 +642,13 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
export_ids.append(export_job.id) export_ids.append(export_job.id)
results.append( results.append(
{ {
"camera": camera_name, "camera": item.camera,
"export_id": export_job.id, "export_id": export_job.id,
"success": True, "success": True,
"status": "queued", "status": "queued",
"error": None, "error": None,
"item_index": index,
"client_item_id": item.client_item_id,
} }
) )
@ -602,7 +682,11 @@ def export_recording(
start_time: float, start_time: float,
end_time: float, end_time: float,
body: ExportRecordingsBody, body: ExportRecordingsBody,
current_user: dict = Depends(get_current_user),
): ):
if isinstance(current_user, JSONResponse):
return current_user
camera_validation_error = _validate_camera_name(request, camera_name) camera_validation_error = _validate_camera_name(request, camera_name)
if camera_validation_error is not None: if camera_validation_error is not None:
return camera_validation_error return camera_validation_error
@ -614,6 +698,19 @@ def export_recording(
return image_validation_error return image_validation_error
export_case_id = body.export_case_id export_case_id = body.export_case_id
# Attaching to an existing case requires admin. Single-export for
# cameras the user can access is otherwise non-admin; we only gate
# the case-attachment side effect.
if export_case_id is not None and current_user["role"] != "admin":
return JSONResponse(
content={
"success": False,
"message": "Only admins can attach exports to an existing case.",
},
status_code=403,
)
case_validation_error = _validate_export_case(export_case_id) case_validation_error = _validate_export_case(export_case_id)
if case_validation_error is not None: if case_validation_error is not None:
return case_validation_error return case_validation_error