From a6c405a2a528494230f852359ed2242c346f23c2 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sat, 11 Apr 2026 18:20:59 -0500 Subject: [PATCH] refactor batch endpoint for multiple review items --- frigate/api/defs/request/batch_export_body.py | 42 +++- frigate/api/defs/response/export_response.py | 16 +- frigate/api/export.py | 235 +++++++++++++----- 3 files changed, 212 insertions(+), 81 deletions(-) diff --git a/frigate/api/defs/request/batch_export_body.py b/frigate/api/defs/request/batch_export_body.py index 7fd739bb3..5f4f6b311 100644 --- a/frigate/api/defs/request/batch_export_body.py +++ b/frigate/api/defs/request/batch_export_body.py @@ -2,22 +2,47 @@ from typing import List, Optional from pydantic import BaseModel, Field, model_validator +MAX_BATCH_EXPORT_ITEMS = 50 -class BatchExportBody(BaseModel): + +class BatchExportItem(BaseModel): + camera: str = Field(title="Camera name") start_time: float = Field(title="Start time") end_time: float = Field(title="End time") - camera_ids: List[str] = Field(title="Camera IDs", min_length=1) - name: Optional[str] = Field( + image_path: Optional[str] = Field( default=None, - title="Friendly name template", + title="Existing thumbnail path", + description="Optional existing image to use as the export thumbnail", + ) + friendly_name: Optional[str] = Field( + default=None, + title="Friendly name", max_length=256, - description="Base export name. Each export is saved as ' - '", + description="Optional friendly name for this specific export item", + ) + client_item_id: Optional[str] = Field( + default=None, + title="Client item ID", + max_length=128, + description="Optional opaque client identifier echoed back in results", + ) + + +class BatchExportBody(BaseModel): + items: List[BatchExportItem] = Field( + title="Items", + min_length=1, + max_length=MAX_BATCH_EXPORT_ITEMS, + description="List of export items. Each item has its own camera and time range.", ) export_case_id: Optional[str] = Field( default=None, title="Export case ID", max_length=30, - description="Existing export case ID to assign all exports to", + description=( + "Existing export case ID to assign all exports to. Attaching to an " + "existing case is temporarily admin-only until case-level ACLs exist." + ), ) new_case_name: Optional[str] = Field( default=None, @@ -33,8 +58,9 @@ class BatchExportBody(BaseModel): @model_validator(mode="after") def validate_case_target(self) -> "BatchExportBody": - if self.end_time <= self.start_time: - raise ValueError("end_time must be after start_time") + for item in self.items: + if item.end_time <= item.start_time: + raise ValueError("end_time must be after start_time") if self.export_case_id is None and self.new_case_name is None: raise ValueError("Either export_case_id or new_case_name must be provided") diff --git a/frigate/api/defs/response/export_response.py b/frigate/api/defs/response/export_response.py index 8de28c182..b796ba9ac 100644 --- a/frigate/api/defs/response/export_response.py +++ b/frigate/api/defs/response/export_response.py @@ -35,7 +35,7 @@ class StartExportResponse(BaseModel): class BatchExportResultModel(BaseModel): - """Per-camera result for a batch export request.""" + """Per-item result for a batch export request.""" camera: str = Field(description="Camera name for this export attempt") export_id: Optional[str] = Field( @@ -49,12 +49,20 @@ class BatchExportResultModel(BaseModel): ) error: Optional[str] = Field( default=None, - description="Validation or queueing error for this camera, if any", + description="Validation or queueing error for this item, if any", + ) + item_index: Optional[int] = Field( + default=None, + description="Zero-based index of this result within the request items list", + ) + client_item_id: Optional[str] = Field( + default=None, + description="Opaque client-supplied item identifier echoed from the request", ) class BatchExportResponse(BaseModel): - """Response model for starting a multi-camera export batch.""" + """Response model for starting an export batch.""" export_case_id: Optional[str] = Field( default=None, @@ -62,7 +70,7 @@ class BatchExportResponse(BaseModel): ) export_ids: List[str] = Field(description="Export IDs successfully queued") results: List[BatchExportResultModel] = Field( - description="Per-camera batch export results" + description="Per-item batch export results" ) diff --git a/frigate/api/export.py b/frigate/api/export.py index 58b3a018f..fc26e6913 100644 --- a/frigate/api/export.py +++ b/frigate/api/export.py @@ -18,10 +18,14 @@ from playhouse.shortcuts import model_to_dict from frigate.api.auth import ( allow_any_authenticated, get_allowed_cameras_for_filter, + get_current_user, require_camera_access, require_role, ) -from frigate.api.defs.request.batch_export_body import BatchExportBody +from frigate.api.defs.request.batch_export_body import ( + BatchExportBody, + BatchExportItem, +) from frigate.api.defs.request.export_case_body import ( ExportCaseAssignBody, ExportCaseCreateBody, @@ -172,45 +176,63 @@ def _validate_export_source( return None -def _get_batch_recording_export_errors( +def _get_item_recording_export_errors( request: Request, - camera_names: list[str], - start_time: float, - end_time: float, -) -> dict[str, str]: - unique_camera_names = list(dict.fromkeys(camera_names)) + items: list[BatchExportItem], +) -> dict[int, str]: + """Return {item_index: error message} for items with invalid state. + + Checks camera configuration and recording presence per item. Groups by + camera and issues one query per unique camera covering that camera's + full requested range, then checks each item's range against the returned + rows in Python. This avoids O(N) DB round-trips on large batches. + """ configured_cameras = request.app.frigate_config.cameras - errors: dict[str, str] = {} + errors: dict[int, str] = {} - valid_camera_names = [ - camera_name - for camera_name in unique_camera_names - if configured_cameras.get(camera_name) - ] + # Validate camera configuration first + item_ranges_by_camera: dict[str, list[tuple[int, float, float]]] = {} + for index, item in enumerate(items): + if not configured_cameras.get(item.camera): + errors[index] = f"{item.camera} is not a valid camera." + continue + item_ranges_by_camera.setdefault(item.camera, []).append( + (index, item.start_time, item.end_time) + ) - for camera_name in unique_camera_names: - if camera_name not in valid_camera_names: - errors[camera_name] = f"{camera_name} is not a valid camera." - - if not valid_camera_names: + if not item_ranges_by_camera: return errors - recordings = ( - Recordings.select(Recordings.camera) - .distinct() - .where( - Recordings.camera << valid_camera_names, - Recordings.start_time.between(start_time, end_time) - | Recordings.end_time.between(start_time, end_time) - | ((start_time > Recordings.start_time) & (end_time < Recordings.end_time)), - ) - .iterator() - ) - cameras_with_recordings = {recording.camera for recording in recordings} + # For each camera, fetch recordings that cover the union of ranges + for camera_name, indexed_ranges in item_ranges_by_camera.items(): + min_start = min(r[1] for r in indexed_ranges) + max_end = max(r[2] for r in indexed_ranges) - for camera_name in valid_camera_names: - if camera_name not in cameras_with_recordings: - errors[camera_name] = "No recordings found for time range" + recording_ranges = list( + Recordings.select(Recordings.start_time, Recordings.end_time) + .where( + Recordings.camera == camera_name, + Recordings.start_time.between(min_start, max_end) + | Recordings.end_time.between(min_start, max_end) + | ( + (min_start > Recordings.start_time) + & (max_end < Recordings.end_time) + ), + ) + .iterator() + ) + + for index, start_time, end_time in indexed_ranges: + has_recording = any( + ( + start_time <= rec.start_time <= end_time + or start_time <= rec.end_time <= end_time + or (start_time > rec.start_time and end_time < rec.end_time) + ) + for rec in recording_ranges + ) + if not has_recording: + errors[index] = "No recordings found for time range" return errors @@ -473,39 +495,90 @@ async def get_export_job_status(export_id: str, request: Request): @router.post( "/exports/batch", response_model=BatchExportResponse, - dependencies=[Depends(require_role(["admin"]))], - summary="Start multi-camera recording export", + dependencies=[Depends(allow_any_authenticated())], + summary="Start recording export batch", description=( - "Starts recording exports for multiple cameras for the same time range and " - "assigns them to a single export case." + "Starts recording exports for a batch of items, each with its own camera " + "and time range, and assigns them to a single export case. Attaching to " + "an existing case is temporarily admin-only until case-level ACLs exist." ), ) -def export_recordings_batch(request: Request, body: BatchExportBody): +def export_recordings_batch( + request: Request, + body: BatchExportBody, + allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter), + current_user: dict = Depends(get_current_user), +): + if isinstance(current_user, JSONResponse): + return current_user + + # Stopgap: attaching to an existing case remains admin-only until + # case-level ACLs exist. Non-admins can still create a fresh case + # as a side effect of queueing items they already have camera access to. + if body.export_case_id is not None and current_user["role"] != "admin": + return JSONResponse( + content={ + "success": False, + "message": "Only admins can attach exports to an existing case.", + }, + status_code=403, + ) + case_validation_error = _validate_export_case(body.export_case_id) if case_validation_error is not None: return case_validation_error - export_ids: list[str] = [] - results: list[dict[str, Optional[str] | bool]] = [] - camera_errors = _get_batch_recording_export_errors( - request, - body.camera_ids, - body.start_time, - body.end_time, - ) + # Fail-closed camera access: any item referencing an inaccessible + # camera rejects the whole request. The UI's review list is already + # filtered by camera access, so reaching this branch implies a stale + # session or a crafted request — reject loudly rather than silently + # dropping items. + allowed_camera_set = set(allowed_cameras) + for item in body.items: + if item.camera not in allowed_camera_set: + return JSONResponse( + content={ + "success": False, + "message": f"Cannot export from {item.camera}: access denied", + }, + status_code=403, + ) - cameras_to_queue = [ - camera_name - for camera_name in dict.fromkeys(body.camera_ids) - if camera_errors.get(camera_name) is None + # Sanitize each item's image_path up front. A bad path in any item + # kills the whole request, consistent with single-export behavior. + sanitized_images: list[Optional[str]] = [] + for item in body.items: + existing_image, image_validation_error = _sanitize_existing_image( + item.image_path + ) + if image_validation_error is not None: + return image_validation_error + sanitized_images.append(existing_image) + + item_errors = _get_item_recording_export_errors(request, body.items) + + queueable_indexes = [ + index for index in range(len(body.items)) if index not in item_errors ] + if not queueable_indexes: + return JSONResponse( + content={ + "success": False, + "message": ( + "No exports could be queued: no recordings found for the " + "requested ranges." + ), + }, + status_code=400, + ) + # Preflight admission: reject the whole batch if we can't fit every - # queueable camera. Prevents creating a case we'd just roll back and - # avoids partial batches where the tail all fails with "queue full". - if cameras_to_queue and available_export_queue_slots( - request.app.frigate_config - ) < len(cameras_to_queue): + # queueable item. Prevents partial batches where the tail fails with + # "queue full" after we've already created a case. + if available_export_queue_slots(request.app.frigate_config) < len( + queueable_indexes + ): return JSONResponse( content={ "success": False, @@ -516,33 +589,36 @@ def export_recordings_batch(request: Request, body: BatchExportBody): export_case = None export_case_id = body.export_case_id - if export_case_id is None and cameras_to_queue: + if export_case_id is None: export_case = _create_export_case_record( - body.new_case_name or body.name or "New Case", + body.new_case_name or "New Case", body.new_case_description, ) export_case_id = export_case.id - for camera_name in dict.fromkeys(body.camera_ids): - camera_error = camera_errors.get(camera_name) - if camera_error is not None: + export_ids: list[str] = [] + results: list[dict[str, Optional[str] | bool | int]] = [] + for index, item in enumerate(body.items): + if index in item_errors: results.append( { - "camera": camera_name, + "camera": item.camera, "export_id": None, "success": False, "status": None, - "error": camera_error, + "error": item_errors[index], + "item_index": index, + "client_item_id": item.client_item_id, } ) continue export_job = _build_export_job( - camera_name, - body.start_time, - body.end_time, - f"{body.name} - {camera_name}" if body.name else None, - None, + item.camera, + item.start_time, + item.end_time, + item.friendly_name, + sanitized_images[index], PlaybackSourceEnum.recordings, export_case_id, ) @@ -552,11 +628,13 @@ def export_recordings_batch(request: Request, body: BatchExportBody): logger.exception("Failed to queue export job %s", export_job.id) results.append( { - "camera": camera_name, + "camera": item.camera, "export_id": None, "success": False, "status": None, "error": str(err), + "item_index": index, + "client_item_id": item.client_item_id, } ) continue @@ -564,11 +642,13 @@ def export_recordings_batch(request: Request, body: BatchExportBody): export_ids.append(export_job.id) results.append( { - "camera": camera_name, + "camera": item.camera, "export_id": export_job.id, "success": True, "status": "queued", "error": None, + "item_index": index, + "client_item_id": item.client_item_id, } ) @@ -602,7 +682,11 @@ def export_recording( start_time: float, end_time: float, body: ExportRecordingsBody, + current_user: dict = Depends(get_current_user), ): + if isinstance(current_user, JSONResponse): + return current_user + camera_validation_error = _validate_camera_name(request, camera_name) if camera_validation_error is not None: return camera_validation_error @@ -614,6 +698,19 @@ def export_recording( return image_validation_error export_case_id = body.export_case_id + + # Attaching to an existing case requires admin. Single-export for + # cameras the user can access is otherwise non-admin; we only gate + # the case-attachment side effect. + if export_case_id is not None and current_user["role"] != "admin": + return JSONResponse( + content={ + "success": False, + "message": "Only admins can attach exports to an existing case.", + }, + status_code=403, + ) + case_validation_error = _validate_export_case(export_case_id) if case_validation_error is not None: return case_validation_error