diff --git a/frigate/api/defs/response/export_response.py b/frigate/api/defs/response/export_response.py index 963b646a5..8de28c182 100644 --- a/frigate/api/defs/response/export_response.py +++ b/frigate/api/defs/response/export_response.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Any, List, Optional from pydantic import BaseModel, Field @@ -28,6 +28,10 @@ class StartExportResponse(BaseModel): export_id: Optional[str] = Field( default=None, description="The export ID if successfully started" ) + status: Optional[str] = Field( + default=None, + description="Queue status for the export job", + ) class BatchExportResultModel(BaseModel): @@ -39,6 +43,10 @@ class BatchExportResultModel(BaseModel): description="The export ID when the export was successfully queued", ) success: bool = Field(description="Whether the export was successfully queued") + status: Optional[str] = Field( + default=None, + description="Queue status for this camera export", + ) error: Optional[str] = Field( default=None, description="Validation or queueing error for this camera, if any", @@ -48,11 +56,52 @@ class BatchExportResultModel(BaseModel): class BatchExportResponse(BaseModel): """Response model for starting a multi-camera export batch.""" - export_case_id: str = Field(description="Export case ID associated with the batch") + export_case_id: Optional[str] = Field( + default=None, + description="Export case ID associated with the batch", + ) export_ids: List[str] = Field(description="Export IDs successfully queued") results: List[BatchExportResultModel] = Field( description="Per-camera batch export results" ) +class ExportJobModel(BaseModel): + """Model representing a queued or running export job.""" + + id: str = Field(description="Unique identifier for the export job") + job_type: str = Field(description="Job type") + status: str = Field(description="Current job status") + camera: str = Field(description="Camera associated with this export job") + name: Optional[str] = Field( + default=None, + description="Friendly name for the export", + ) + export_case_id: Optional[str] = Field( + default=None, + description="ID of the export case this export belongs to", + ) + request_start_time: float = Field(description="Requested export start time") + request_end_time: float = Field(description="Requested export end time") + start_time: Optional[float] = Field( + default=None, + description="Unix timestamp when execution started", + ) + end_time: Optional[float] = Field( + default=None, + description="Unix timestamp when execution completed", + ) + error_message: Optional[str] = Field( + default=None, + description="Error message for failed jobs", + ) + results: Optional[dict[str, Any]] = Field( + default=None, + description="Result metadata for completed jobs", + ) + + +ExportJobsResponse = List[ExportJobModel] + + ExportsResponse = List[ExportModel] diff --git a/frigate/api/export.py b/frigate/api/export.py index cafe35693..58b3a018f 100644 --- a/frigate/api/export.py +++ b/frigate/api/export.py @@ -4,7 +4,6 @@ import datetime import logging import random import string -import threading import time from pathlib import Path from typing import List, Optional @@ -39,6 +38,8 @@ from frigate.api.defs.response.export_case_response import ( ) from frigate.api.defs.response.export_response import ( BatchExportResponse, + ExportJobModel, + ExportJobsResponse, ExportModel, ExportsResponse, StartExportResponse, @@ -46,11 +47,18 @@ from frigate.api.defs.response.export_response import ( from frigate.api.defs.response.generic_response import GenericResponse from frigate.api.defs.tags import Tags from frigate.const import CLIPS_DIR, EXPORT_DIR +from frigate.jobs.export import ( + ExportJob, + ExportQueueFullError, + available_export_queue_slots, + get_export_job, + list_active_export_jobs, + start_export_job, +) from frigate.models import Export, ExportCase, Previews, Recordings from frigate.record.export import ( DEFAULT_TIME_LAPSE_FFMPEG_ARGS, PlaybackSourceEnum, - RecordingExporter, validate_ffmpeg_args, ) from frigate.util.time import is_current_hour @@ -59,18 +67,6 @@ logger = logging.getLogger(__name__) router = APIRouter(tags=[Tags.export]) -EXPORT_START_SEMAPHORE = threading.Semaphore(3) - - -class ManagedRecordingExporter(RecordingExporter): - """Recording exporter that releases the shared concurrency slot on exit.""" - - def run(self) -> None: - try: - super().run() - finally: - EXPORT_START_SEMAPHORE.release() - def _generate_id(length: int = 12) -> str: return "".join(random.choices(string.ascii_lowercase + string.digits, k=length)) @@ -219,8 +215,7 @@ def _get_batch_recording_export_errors( return errors -def _build_exporter( - request: Request, +def _build_export_job( camera_name: str, start_time: float, end_time: float, @@ -231,32 +226,22 @@ def _build_exporter( ffmpeg_input_args: Optional[str] = None, ffmpeg_output_args: Optional[str] = None, cpu_fallback: bool = False, -) -> ManagedRecordingExporter: - return ManagedRecordingExporter( - request.app.frigate_config, - _generate_export_id(camera_name), - camera_name, - friendly_name, - existing_image, - int(start_time), - int(end_time), - playback_source, - export_case_id, - ffmpeg_input_args, - ffmpeg_output_args, - cpu_fallback, +) -> ExportJob: + return ExportJob( + id=_generate_export_id(camera_name), + camera=camera_name, + name=friendly_name, + image_path=existing_image, + export_case_id=export_case_id, + request_start_time=int(start_time), + request_end_time=int(end_time), + playback_source=playback_source.value, + ffmpeg_input_args=ffmpeg_input_args, + ffmpeg_output_args=ffmpeg_output_args, + cpu_fallback=cpu_fallback, ) -def _start_exporter(exporter: ManagedRecordingExporter) -> None: - EXPORT_START_SEMAPHORE.acquire() - try: - exporter.start() - except Exception: - EXPORT_START_SEMAPHORE.release() - raise - - def _export_case_to_dict(case: ExportCase) -> dict[str, object]: case_dict = model_to_dict(case) @@ -448,6 +433,43 @@ async def assign_export_case( ) +@router.get( + "/jobs/export", + response_model=ExportJobsResponse, + dependencies=[Depends(allow_any_authenticated())], + summary="Get active export jobs", + description="Gets queued and running export jobs.", +) +def get_active_export_jobs( + request: Request, + allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter), +): + jobs = list_active_export_jobs(request.app.frigate_config) + return JSONResponse( + content=[job.to_dict() for job in jobs if job.camera in allowed_cameras] + ) + + +@router.get( + "/jobs/export/{export_id}", + response_model=ExportJobModel, + dependencies=[Depends(allow_any_authenticated())], + summary="Get export job status", + description="Gets queued, running, or completed status for a specific export job.", +) +async def get_export_job_status(export_id: str, request: Request): + job = get_export_job(request.app.frigate_config, export_id) + if job is None: + return JSONResponse( + content={"success": False, "message": "Job not found"}, + status_code=404, + ) + + await require_camera_access(job.camera, request=request) + + return JSONResponse(content=job.to_dict()) + + @router.post( "/exports/batch", response_model=BatchExportResponse, @@ -463,14 +485,6 @@ def export_recordings_batch(request: Request, body: BatchExportBody): if case_validation_error is not None: return case_validation_error - export_case_id = body.export_case_id - if export_case_id is None: - export_case = _create_export_case_record( - body.new_case_name or body.name or "New Case", - body.new_case_description, - ) - export_case_id = export_case.id - export_ids: list[str] = [] results: list[dict[str, Optional[str] | bool]] = [] camera_errors = _get_batch_recording_export_errors( @@ -480,6 +494,35 @@ def export_recordings_batch(request: Request, body: BatchExportBody): body.end_time, ) + cameras_to_queue = [ + camera_name + for camera_name in dict.fromkeys(body.camera_ids) + if camera_errors.get(camera_name) is None + ] + + # Preflight admission: reject the whole batch if we can't fit every + # queueable camera. Prevents creating a case we'd just roll back and + # avoids partial batches where the tail all fails with "queue full". + if cameras_to_queue and available_export_queue_slots( + request.app.frigate_config + ) < len(cameras_to_queue): + return JSONResponse( + content={ + "success": False, + "message": "Export queue is full. Try again once current exports finish.", + }, + status_code=503, + ) + + export_case = None + export_case_id = body.export_case_id + if export_case_id is None and cameras_to_queue: + export_case = _create_export_case_record( + body.new_case_name or body.name or "New Case", + body.new_case_description, + ) + export_case_id = export_case.id + for camera_name in dict.fromkeys(body.camera_ids): camera_error = camera_errors.get(camera_name) if camera_error is not None: @@ -488,13 +531,13 @@ def export_recordings_batch(request: Request, body: BatchExportBody): "camera": camera_name, "export_id": None, "success": False, + "status": None, "error": camera_error, } ) continue - exporter = _build_exporter( - request, + export_job = _build_export_job( camera_name, body.start_time, body.end_time, @@ -503,24 +546,43 @@ def export_recordings_batch(request: Request, body: BatchExportBody): PlaybackSourceEnum.recordings, export_case_id, ) - _start_exporter(exporter) + try: + start_export_job(request.app.frigate_config, export_job) + except Exception as err: + logger.exception("Failed to queue export job %s", export_job.id) + results.append( + { + "camera": camera_name, + "export_id": None, + "success": False, + "status": None, + "error": str(err), + } + ) + continue - export_ids.append(exporter.export_id) + export_ids.append(export_job.id) results.append( { "camera": camera_name, - "export_id": exporter.export_id, + "export_id": export_job.id, "success": True, + "status": "queued", "error": None, } ) + if export_case is not None and not export_ids: + export_case.delete_instance() + export_case_id = None + return JSONResponse( content={ "export_case_id": export_case_id, "export_ids": export_ids, "results": results, - } + }, + status_code=202, ) @@ -568,8 +630,7 @@ def export_recording( status_code=400, ) - exporter = _build_exporter( - request, + export_job = _build_export_job( camera_name, start_time, end_time, @@ -578,17 +639,28 @@ def export_recording( playback_source, export_case_id, ) - _start_exporter(exporter) + try: + start_export_job(request.app.frigate_config, export_job) + except ExportQueueFullError: + logger.warning("Export queue is full; rejecting %s", export_job.id) + return JSONResponse( + content={ + "success": False, + "message": "Export queue is full. Try again once current exports finish.", + }, + status_code=503, + ) return JSONResponse( content=( { "success": True, - "message": "Starting export of recording.", - "export_id": exporter.export_id, + "message": "Export queued.", + "export_id": export_job.id, + "status": "queued", } ), - status_code=200, + status_code=202, ) @@ -765,8 +837,7 @@ def export_recording_custom( if ffmpeg_output_args is None: ffmpeg_output_args = DEFAULT_TIME_LAPSE_FFMPEG_ARGS - exporter = _build_exporter( - request, + export_job = _build_export_job( camera_name, start_time, end_time, @@ -778,17 +849,28 @@ def export_recording_custom( ffmpeg_output_args, cpu_fallback, ) - _start_exporter(exporter) + try: + start_export_job(request.app.frigate_config, export_job) + except ExportQueueFullError: + logger.warning("Export queue is full; rejecting %s", export_job.id) + return JSONResponse( + content={ + "success": False, + "message": "Export queue is full. Try again once current exports finish.", + }, + status_code=503, + ) return JSONResponse( content=( { "success": True, - "message": "Starting export of recording.", - "export_id": exporter.export_id, + "message": "Export queued.", + "export_id": export_job.id, + "status": "queued", } ), - status_code=200, + status_code=202, ) diff --git a/frigate/config/camera/record.py b/frigate/config/camera/record.py index 7eae7500d..1f7afc6ce 100644 --- a/frigate/config/camera/record.py +++ b/frigate/config/camera/record.py @@ -92,6 +92,12 @@ class RecordExportConfig(FrigateBaseModel): title="Export hwaccel args", description="Hardware acceleration args to use for export/transcode operations.", ) + max_concurrent: int = Field( + default=3, + ge=1, + title="Maximum concurrent exports", + description="Maximum number of export jobs to process at the same time.", + ) class RecordConfig(FrigateBaseModel): diff --git a/frigate/jobs/export.py b/frigate/jobs/export.py new file mode 100644 index 000000000..950f46dd2 --- /dev/null +++ b/frigate/jobs/export.py @@ -0,0 +1,261 @@ +"""Export job management with queued background execution.""" + +import logging +import threading +import time +from dataclasses import dataclass +from queue import Full, Queue +from typing import Any, Optional + +from peewee import DoesNotExist + +from frigate.config import FrigateConfig +from frigate.jobs.job import Job +from frigate.models import Export +from frigate.record.export import PlaybackSourceEnum, RecordingExporter +from frigate.types import JobStatusTypesEnum + +logger = logging.getLogger(__name__) + +# Maximum number of jobs that can sit in the queue waiting to run. +# Prevents a runaway client from unbounded memory growth. +MAX_QUEUED_EXPORT_JOBS = 100 + + +class ExportQueueFullError(RuntimeError): + """Raised when the export queue is at capacity.""" + + +@dataclass +class ExportJob(Job): + """Job state for export operations.""" + + job_type: str = "export" + camera: str = "" + name: Optional[str] = None + image_path: Optional[str] = None + export_case_id: Optional[str] = None + request_start_time: float = 0.0 + request_end_time: float = 0.0 + playback_source: str = PlaybackSourceEnum.recordings.value + ffmpeg_input_args: Optional[str] = None + ffmpeg_output_args: Optional[str] = None + cpu_fallback: bool = False + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for API responses. + + Only exposes fields that are part of the public ExportJobModel schema. + Internal execution details (image_path, ffmpeg args, cpu_fallback) are + intentionally omitted so they don't leak through the API. + """ + return { + "id": self.id, + "job_type": self.job_type, + "status": self.status, + "camera": self.camera, + "name": self.name, + "export_case_id": self.export_case_id, + "request_start_time": self.request_start_time, + "request_end_time": self.request_end_time, + "start_time": self.start_time, + "end_time": self.end_time, + "error_message": self.error_message, + "results": self.results, + } + + +class ExportQueueWorker(threading.Thread): + """Worker that executes queued exports.""" + + def __init__(self, manager: "ExportJobManager", worker_index: int) -> None: + super().__init__( + daemon=True, + name=f"export_queue_worker_{worker_index}", + ) + self.manager = manager + + def run(self) -> None: + while True: + job = self.manager.queue.get() + + try: + self.manager.run_job(job) + except Exception: + logger.exception( + "Export queue worker failed while processing %s", job.id + ) + finally: + self.manager.queue.task_done() + + +class ExportJobManager: + """Concurrency-limited manager for queued export jobs.""" + + def __init__( + self, + config: FrigateConfig, + max_concurrent: int, + max_queued: int = MAX_QUEUED_EXPORT_JOBS, + ) -> None: + self.config = config + self.max_concurrent = max(1, max_concurrent) + self.queue: Queue[ExportJob] = Queue(maxsize=max(1, max_queued)) + self.jobs: dict[str, ExportJob] = {} + self.lock = threading.Lock() + self.workers: list[ExportQueueWorker] = [] + self.started = False + + def ensure_started(self) -> None: + """Ensure worker threads are started exactly once.""" + with self.lock: + if self.started: + self._restart_dead_workers_locked() + return + + for index in range(self.max_concurrent): + worker = ExportQueueWorker(self, index) + worker.start() + self.workers.append(worker) + + self.started = True + + def _restart_dead_workers_locked(self) -> None: + for index, worker in enumerate(self.workers): + if worker.is_alive(): + continue + + logger.error( + "Export queue worker %s died unexpectedly, restarting", worker.name + ) + replacement = ExportQueueWorker(self, index) + replacement.start() + self.workers[index] = replacement + + def enqueue(self, job: ExportJob) -> str: + """Queue a job for background execution. + + Raises ExportQueueFullError if the queue is at capacity. + """ + self.ensure_started() + + try: + self.queue.put_nowait(job) + except Full as err: + raise ExportQueueFullError( + "Export queue is full; try again once current exports finish" + ) from err + + with self.lock: + self.jobs[job.id] = job + + return job.id + + def get_job(self, job_id: str) -> Optional[ExportJob]: + """Get a job by ID.""" + with self.lock: + return self.jobs.get(job_id) + + def list_active_jobs(self) -> list[ExportJob]: + """List queued and running jobs.""" + with self.lock: + return [ + job + for job in self.jobs.values() + if job.status in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running) + ] + + def available_slots(self) -> int: + """Approximate number of additional jobs that could be queued right now. + + Uses Queue.qsize() which is best-effort; callers should treat the + result as advisory since another thread could enqueue between + checking and enqueueing. + """ + return max(0, self.queue.maxsize - self.queue.qsize()) + + def run_job(self, job: ExportJob) -> None: + """Execute a queued export job.""" + job.status = JobStatusTypesEnum.running + job.start_time = time.time() + + exporter = RecordingExporter( + self.config, + job.id, + job.camera, + job.name, + job.image_path, + int(job.request_start_time), + int(job.request_end_time), + PlaybackSourceEnum(job.playback_source), + job.export_case_id, + job.ffmpeg_input_args, + job.ffmpeg_output_args, + job.cpu_fallback, + ) + + try: + exporter.run() + export = Export.get_or_none(Export.id == job.id) + if export is None: + job.status = JobStatusTypesEnum.failed + job.error_message = "Export failed" + elif export.in_progress: + job.status = JobStatusTypesEnum.failed + job.error_message = "Export did not complete" + else: + job.status = JobStatusTypesEnum.success + job.results = { + "export_id": export.id, + "export_case_id": export.export_case_id, + "video_path": export.video_path, + "thumb_path": export.thumb_path, + } + except DoesNotExist: + job.status = JobStatusTypesEnum.failed + job.error_message = "Export not found" + except Exception as err: + logger.exception("Export job %s failed: %s", job.id, err) + job.status = JobStatusTypesEnum.failed + job.error_message = str(err) + finally: + job.end_time = time.time() + + +_job_manager: Optional[ExportJobManager] = None +_job_manager_lock = threading.Lock() + + +def _get_max_concurrent(config: FrigateConfig) -> int: + return int(config.record.export.max_concurrent) + + +def get_export_job_manager(config: FrigateConfig) -> ExportJobManager: + """Get or create the singleton export job manager.""" + global _job_manager + + with _job_manager_lock: + if _job_manager is None: + _job_manager = ExportJobManager(config, _get_max_concurrent(config)) + _job_manager.ensure_started() + return _job_manager + + +def start_export_job(config: FrigateConfig, job: ExportJob) -> str: + """Queue an export job and return its ID.""" + return get_export_job_manager(config).enqueue(job) + + +def get_export_job(config: FrigateConfig, job_id: str) -> Optional[ExportJob]: + """Get a queued or completed export job by ID.""" + return get_export_job_manager(config).get_job(job_id) + + +def list_active_export_jobs(config: FrigateConfig) -> list[ExportJob]: + """List queued and running export jobs.""" + return get_export_job_manager(config).list_active_jobs() + + +def available_export_queue_slots(config: FrigateConfig) -> int: + """Approximate number of additional export jobs that could be queued now.""" + return get_export_job_manager(config).available_slots()