tweak backend to use Job infrastructure for exports

This commit is contained in:
Josh Hawkins 2026-04-11 13:20:40 -05:00
parent 66812a10f2
commit 0d9d1d7652
4 changed files with 465 additions and 67 deletions

View File

@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Any, List, Optional
from pydantic import BaseModel, Field
@ -28,6 +28,10 @@ class StartExportResponse(BaseModel):
export_id: Optional[str] = Field(
default=None, description="The export ID if successfully started"
)
status: Optional[str] = Field(
default=None,
description="Queue status for the export job",
)
class BatchExportResultModel(BaseModel):
@ -39,6 +43,10 @@ class BatchExportResultModel(BaseModel):
description="The export ID when the export was successfully queued",
)
success: bool = Field(description="Whether the export was successfully queued")
status: Optional[str] = Field(
default=None,
description="Queue status for this camera export",
)
error: Optional[str] = Field(
default=None,
description="Validation or queueing error for this camera, if any",
@ -48,11 +56,52 @@ class BatchExportResultModel(BaseModel):
class BatchExportResponse(BaseModel):
"""Response model for starting a multi-camera export batch."""
export_case_id: str = Field(description="Export case ID associated with the batch")
export_case_id: Optional[str] = Field(
default=None,
description="Export case ID associated with the batch",
)
export_ids: List[str] = Field(description="Export IDs successfully queued")
results: List[BatchExportResultModel] = Field(
description="Per-camera batch export results"
)
class ExportJobModel(BaseModel):
"""Model representing a queued or running export job."""
id: str = Field(description="Unique identifier for the export job")
job_type: str = Field(description="Job type")
status: str = Field(description="Current job status")
camera: str = Field(description="Camera associated with this export job")
name: Optional[str] = Field(
default=None,
description="Friendly name for the export",
)
export_case_id: Optional[str] = Field(
default=None,
description="ID of the export case this export belongs to",
)
request_start_time: float = Field(description="Requested export start time")
request_end_time: float = Field(description="Requested export end time")
start_time: Optional[float] = Field(
default=None,
description="Unix timestamp when execution started",
)
end_time: Optional[float] = Field(
default=None,
description="Unix timestamp when execution completed",
)
error_message: Optional[str] = Field(
default=None,
description="Error message for failed jobs",
)
results: Optional[dict[str, Any]] = Field(
default=None,
description="Result metadata for completed jobs",
)
ExportJobsResponse = List[ExportJobModel]
ExportsResponse = List[ExportModel]

View File

@ -4,7 +4,6 @@ import datetime
import logging
import random
import string
import threading
import time
from pathlib import Path
from typing import List, Optional
@ -39,6 +38,8 @@ from frigate.api.defs.response.export_case_response import (
)
from frigate.api.defs.response.export_response import (
BatchExportResponse,
ExportJobModel,
ExportJobsResponse,
ExportModel,
ExportsResponse,
StartExportResponse,
@ -46,11 +47,18 @@ from frigate.api.defs.response.export_response import (
from frigate.api.defs.response.generic_response import GenericResponse
from frigate.api.defs.tags import Tags
from frigate.const import CLIPS_DIR, EXPORT_DIR
from frigate.jobs.export import (
ExportJob,
ExportQueueFullError,
available_export_queue_slots,
get_export_job,
list_active_export_jobs,
start_export_job,
)
from frigate.models import Export, ExportCase, Previews, Recordings
from frigate.record.export import (
DEFAULT_TIME_LAPSE_FFMPEG_ARGS,
PlaybackSourceEnum,
RecordingExporter,
validate_ffmpeg_args,
)
from frigate.util.time import is_current_hour
@ -59,18 +67,6 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.export])
EXPORT_START_SEMAPHORE = threading.Semaphore(3)
class ManagedRecordingExporter(RecordingExporter):
"""Recording exporter that releases the shared concurrency slot on exit."""
def run(self) -> None:
try:
super().run()
finally:
EXPORT_START_SEMAPHORE.release()
def _generate_id(length: int = 12) -> str:
return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))
@ -219,8 +215,7 @@ def _get_batch_recording_export_errors(
return errors
def _build_exporter(
request: Request,
def _build_export_job(
camera_name: str,
start_time: float,
end_time: float,
@ -231,32 +226,22 @@ def _build_exporter(
ffmpeg_input_args: Optional[str] = None,
ffmpeg_output_args: Optional[str] = None,
cpu_fallback: bool = False,
) -> ManagedRecordingExporter:
return ManagedRecordingExporter(
request.app.frigate_config,
_generate_export_id(camera_name),
camera_name,
friendly_name,
existing_image,
int(start_time),
int(end_time),
playback_source,
export_case_id,
ffmpeg_input_args,
ffmpeg_output_args,
cpu_fallback,
) -> ExportJob:
return ExportJob(
id=_generate_export_id(camera_name),
camera=camera_name,
name=friendly_name,
image_path=existing_image,
export_case_id=export_case_id,
request_start_time=int(start_time),
request_end_time=int(end_time),
playback_source=playback_source.value,
ffmpeg_input_args=ffmpeg_input_args,
ffmpeg_output_args=ffmpeg_output_args,
cpu_fallback=cpu_fallback,
)
def _start_exporter(exporter: ManagedRecordingExporter) -> None:
EXPORT_START_SEMAPHORE.acquire()
try:
exporter.start()
except Exception:
EXPORT_START_SEMAPHORE.release()
raise
def _export_case_to_dict(case: ExportCase) -> dict[str, object]:
case_dict = model_to_dict(case)
@ -448,6 +433,43 @@ async def assign_export_case(
)
@router.get(
"/jobs/export",
response_model=ExportJobsResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Get active export jobs",
description="Gets queued and running export jobs.",
)
def get_active_export_jobs(
request: Request,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
):
jobs = list_active_export_jobs(request.app.frigate_config)
return JSONResponse(
content=[job.to_dict() for job in jobs if job.camera in allowed_cameras]
)
@router.get(
"/jobs/export/{export_id}",
response_model=ExportJobModel,
dependencies=[Depends(allow_any_authenticated())],
summary="Get export job status",
description="Gets queued, running, or completed status for a specific export job.",
)
async def get_export_job_status(export_id: str, request: Request):
job = get_export_job(request.app.frigate_config, export_id)
if job is None:
return JSONResponse(
content={"success": False, "message": "Job not found"},
status_code=404,
)
await require_camera_access(job.camera, request=request)
return JSONResponse(content=job.to_dict())
@router.post(
"/exports/batch",
response_model=BatchExportResponse,
@ -463,14 +485,6 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
if case_validation_error is not None:
return case_validation_error
export_case_id = body.export_case_id
if export_case_id is None:
export_case = _create_export_case_record(
body.new_case_name or body.name or "New Case",
body.new_case_description,
)
export_case_id = export_case.id
export_ids: list[str] = []
results: list[dict[str, Optional[str] | bool]] = []
camera_errors = _get_batch_recording_export_errors(
@ -480,6 +494,35 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
body.end_time,
)
cameras_to_queue = [
camera_name
for camera_name in dict.fromkeys(body.camera_ids)
if camera_errors.get(camera_name) is None
]
# Preflight admission: reject the whole batch if we can't fit every
# queueable camera. Prevents creating a case we'd just roll back and
# avoids partial batches where the tail all fails with "queue full".
if cameras_to_queue and available_export_queue_slots(
request.app.frigate_config
) < len(cameras_to_queue):
return JSONResponse(
content={
"success": False,
"message": "Export queue is full. Try again once current exports finish.",
},
status_code=503,
)
export_case = None
export_case_id = body.export_case_id
if export_case_id is None and cameras_to_queue:
export_case = _create_export_case_record(
body.new_case_name or body.name or "New Case",
body.new_case_description,
)
export_case_id = export_case.id
for camera_name in dict.fromkeys(body.camera_ids):
camera_error = camera_errors.get(camera_name)
if camera_error is not None:
@ -488,13 +531,13 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
"camera": camera_name,
"export_id": None,
"success": False,
"status": None,
"error": camera_error,
}
)
continue
exporter = _build_exporter(
request,
export_job = _build_export_job(
camera_name,
body.start_time,
body.end_time,
@ -503,24 +546,43 @@ def export_recordings_batch(request: Request, body: BatchExportBody):
PlaybackSourceEnum.recordings,
export_case_id,
)
_start_exporter(exporter)
try:
start_export_job(request.app.frigate_config, export_job)
except Exception as err:
logger.exception("Failed to queue export job %s", export_job.id)
results.append(
{
"camera": camera_name,
"export_id": None,
"success": False,
"status": None,
"error": str(err),
}
)
continue
export_ids.append(exporter.export_id)
export_ids.append(export_job.id)
results.append(
{
"camera": camera_name,
"export_id": exporter.export_id,
"export_id": export_job.id,
"success": True,
"status": "queued",
"error": None,
}
)
if export_case is not None and not export_ids:
export_case.delete_instance()
export_case_id = None
return JSONResponse(
content={
"export_case_id": export_case_id,
"export_ids": export_ids,
"results": results,
}
},
status_code=202,
)
@ -568,8 +630,7 @@ def export_recording(
status_code=400,
)
exporter = _build_exporter(
request,
export_job = _build_export_job(
camera_name,
start_time,
end_time,
@ -578,17 +639,28 @@ def export_recording(
playback_source,
export_case_id,
)
_start_exporter(exporter)
try:
start_export_job(request.app.frigate_config, export_job)
except ExportQueueFullError:
logger.warning("Export queue is full; rejecting %s", export_job.id)
return JSONResponse(
content={
"success": False,
"message": "Export queue is full. Try again once current exports finish.",
},
status_code=503,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Starting export of recording.",
"export_id": exporter.export_id,
"message": "Export queued.",
"export_id": export_job.id,
"status": "queued",
}
),
status_code=200,
status_code=202,
)
@ -765,8 +837,7 @@ def export_recording_custom(
if ffmpeg_output_args is None:
ffmpeg_output_args = DEFAULT_TIME_LAPSE_FFMPEG_ARGS
exporter = _build_exporter(
request,
export_job = _build_export_job(
camera_name,
start_time,
end_time,
@ -778,17 +849,28 @@ def export_recording_custom(
ffmpeg_output_args,
cpu_fallback,
)
_start_exporter(exporter)
try:
start_export_job(request.app.frigate_config, export_job)
except ExportQueueFullError:
logger.warning("Export queue is full; rejecting %s", export_job.id)
return JSONResponse(
content={
"success": False,
"message": "Export queue is full. Try again once current exports finish.",
},
status_code=503,
)
return JSONResponse(
content=(
{
"success": True,
"message": "Starting export of recording.",
"export_id": exporter.export_id,
"message": "Export queued.",
"export_id": export_job.id,
"status": "queued",
}
),
status_code=200,
status_code=202,
)

View File

@ -92,6 +92,12 @@ class RecordExportConfig(FrigateBaseModel):
title="Export hwaccel args",
description="Hardware acceleration args to use for export/transcode operations.",
)
max_concurrent: int = Field(
default=3,
ge=1,
title="Maximum concurrent exports",
description="Maximum number of export jobs to process at the same time.",
)
class RecordConfig(FrigateBaseModel):

261
frigate/jobs/export.py Normal file
View File

@ -0,0 +1,261 @@
"""Export job management with queued background execution."""
import logging
import threading
import time
from dataclasses import dataclass
from queue import Full, Queue
from typing import Any, Optional
from peewee import DoesNotExist
from frigate.config import FrigateConfig
from frigate.jobs.job import Job
from frigate.models import Export
from frigate.record.export import PlaybackSourceEnum, RecordingExporter
from frigate.types import JobStatusTypesEnum
logger = logging.getLogger(__name__)
# Maximum number of jobs that can sit in the queue waiting to run.
# Prevents a runaway client from unbounded memory growth.
MAX_QUEUED_EXPORT_JOBS = 100
class ExportQueueFullError(RuntimeError):
"""Raised when the export queue is at capacity."""
@dataclass
class ExportJob(Job):
"""Job state for export operations."""
job_type: str = "export"
camera: str = ""
name: Optional[str] = None
image_path: Optional[str] = None
export_case_id: Optional[str] = None
request_start_time: float = 0.0
request_end_time: float = 0.0
playback_source: str = PlaybackSourceEnum.recordings.value
ffmpeg_input_args: Optional[str] = None
ffmpeg_output_args: Optional[str] = None
cpu_fallback: bool = False
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for API responses.
Only exposes fields that are part of the public ExportJobModel schema.
Internal execution details (image_path, ffmpeg args, cpu_fallback) are
intentionally omitted so they don't leak through the API.
"""
return {
"id": self.id,
"job_type": self.job_type,
"status": self.status,
"camera": self.camera,
"name": self.name,
"export_case_id": self.export_case_id,
"request_start_time": self.request_start_time,
"request_end_time": self.request_end_time,
"start_time": self.start_time,
"end_time": self.end_time,
"error_message": self.error_message,
"results": self.results,
}
class ExportQueueWorker(threading.Thread):
"""Worker that executes queued exports."""
def __init__(self, manager: "ExportJobManager", worker_index: int) -> None:
super().__init__(
daemon=True,
name=f"export_queue_worker_{worker_index}",
)
self.manager = manager
def run(self) -> None:
while True:
job = self.manager.queue.get()
try:
self.manager.run_job(job)
except Exception:
logger.exception(
"Export queue worker failed while processing %s", job.id
)
finally:
self.manager.queue.task_done()
class ExportJobManager:
"""Concurrency-limited manager for queued export jobs."""
def __init__(
self,
config: FrigateConfig,
max_concurrent: int,
max_queued: int = MAX_QUEUED_EXPORT_JOBS,
) -> None:
self.config = config
self.max_concurrent = max(1, max_concurrent)
self.queue: Queue[ExportJob] = Queue(maxsize=max(1, max_queued))
self.jobs: dict[str, ExportJob] = {}
self.lock = threading.Lock()
self.workers: list[ExportQueueWorker] = []
self.started = False
def ensure_started(self) -> None:
"""Ensure worker threads are started exactly once."""
with self.lock:
if self.started:
self._restart_dead_workers_locked()
return
for index in range(self.max_concurrent):
worker = ExportQueueWorker(self, index)
worker.start()
self.workers.append(worker)
self.started = True
def _restart_dead_workers_locked(self) -> None:
for index, worker in enumerate(self.workers):
if worker.is_alive():
continue
logger.error(
"Export queue worker %s died unexpectedly, restarting", worker.name
)
replacement = ExportQueueWorker(self, index)
replacement.start()
self.workers[index] = replacement
def enqueue(self, job: ExportJob) -> str:
"""Queue a job for background execution.
Raises ExportQueueFullError if the queue is at capacity.
"""
self.ensure_started()
try:
self.queue.put_nowait(job)
except Full as err:
raise ExportQueueFullError(
"Export queue is full; try again once current exports finish"
) from err
with self.lock:
self.jobs[job.id] = job
return job.id
def get_job(self, job_id: str) -> Optional[ExportJob]:
"""Get a job by ID."""
with self.lock:
return self.jobs.get(job_id)
def list_active_jobs(self) -> list[ExportJob]:
"""List queued and running jobs."""
with self.lock:
return [
job
for job in self.jobs.values()
if job.status in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running)
]
def available_slots(self) -> int:
"""Approximate number of additional jobs that could be queued right now.
Uses Queue.qsize() which is best-effort; callers should treat the
result as advisory since another thread could enqueue between
checking and enqueueing.
"""
return max(0, self.queue.maxsize - self.queue.qsize())
def run_job(self, job: ExportJob) -> None:
"""Execute a queued export job."""
job.status = JobStatusTypesEnum.running
job.start_time = time.time()
exporter = RecordingExporter(
self.config,
job.id,
job.camera,
job.name,
job.image_path,
int(job.request_start_time),
int(job.request_end_time),
PlaybackSourceEnum(job.playback_source),
job.export_case_id,
job.ffmpeg_input_args,
job.ffmpeg_output_args,
job.cpu_fallback,
)
try:
exporter.run()
export = Export.get_or_none(Export.id == job.id)
if export is None:
job.status = JobStatusTypesEnum.failed
job.error_message = "Export failed"
elif export.in_progress:
job.status = JobStatusTypesEnum.failed
job.error_message = "Export did not complete"
else:
job.status = JobStatusTypesEnum.success
job.results = {
"export_id": export.id,
"export_case_id": export.export_case_id,
"video_path": export.video_path,
"thumb_path": export.thumb_path,
}
except DoesNotExist:
job.status = JobStatusTypesEnum.failed
job.error_message = "Export not found"
except Exception as err:
logger.exception("Export job %s failed: %s", job.id, err)
job.status = JobStatusTypesEnum.failed
job.error_message = str(err)
finally:
job.end_time = time.time()
_job_manager: Optional[ExportJobManager] = None
_job_manager_lock = threading.Lock()
def _get_max_concurrent(config: FrigateConfig) -> int:
return int(config.record.export.max_concurrent)
def get_export_job_manager(config: FrigateConfig) -> ExportJobManager:
"""Get or create the singleton export job manager."""
global _job_manager
with _job_manager_lock:
if _job_manager is None:
_job_manager = ExportJobManager(config, _get_max_concurrent(config))
_job_manager.ensure_started()
return _job_manager
def start_export_job(config: FrigateConfig, job: ExportJob) -> str:
"""Queue an export job and return its ID."""
return get_export_job_manager(config).enqueue(job)
def get_export_job(config: FrigateConfig, job_id: str) -> Optional[ExportJob]:
"""Get a queued or completed export job by ID."""
return get_export_job_manager(config).get_job(job_id)
def list_active_export_jobs(config: FrigateConfig) -> list[ExportJob]:
"""List queued and running export jobs."""
return get_export_job_manager(config).list_active_jobs()
def available_export_queue_slots(config: FrigateConfig) -> int:
"""Approximate number of additional export jobs that could be queued now."""
return get_export_job_manager(config).available_slots()