diff --git a/frigate/api/app.py b/frigate/api/app.py index ef632155a..f5c874695 100644 --- a/frigate/api/app.py +++ b/frigate/api/app.py @@ -33,8 +33,14 @@ from frigate.config.camera.updater import ( CameraConfigUpdateTopic, ) from frigate.ffmpeg_presets import FFMPEG_HWACCEL_VAAPI, _gpu_selector +from frigate.jobs.media_sync import ( + get_current_media_sync_job, + get_media_sync_job_by_id, + start_media_sync_job, +) from frigate.models import Event, Timeline from frigate.stats.prometheus import get_metrics, update_metrics +from frigate.types import JobStatusTypesEnum from frigate.util.builtin import ( clean_camera_user_pass, flatten_config_data, @@ -42,7 +48,6 @@ from frigate.util.builtin import ( update_yaml_file_bulk, ) from frigate.util.config import find_config_file -from frigate.util.media import sync_all_media from frigate.util.services import ( get_nvidia_driver_info, process_logs, @@ -605,10 +610,11 @@ def restart(): @router.post("/media/sync", dependencies=[Depends(require_role(["admin"]))]) def sync_media(body: MediaSyncBody = Body(...)): - """Sync media files with database - remove orphaned files. + """Start async media sync job - remove orphaned files. Syncs specified media types: event snapshots, event thumbnails, review thumbnails, - previews, exports, and/or recordings. + previews, exports, and/or recordings. Job runs in background; use /media/sync/current + or /media/sync/status/{job_id} to check status. Args: body: MediaSyncBody with dry_run flag and media_types list. @@ -616,54 +622,67 @@ def sync_media(body: MediaSyncBody = Body(...)): 'review_thumbnails', 'previews', 'exports', 'recordings' Returns: - JSON response with sync results for each requested media type. + 202 Accepted with job_id, or 409 Conflict if job already running. """ - try: - results = sync_all_media( - dry_run=body.dry_run, media_types=body.media_types, force=body.force - ) + job_id = start_media_sync_job( + dry_run=body.dry_run, media_types=body.media_types, force=body.force + ) - # Check if any operations were aborted or had errors - has_errors = False - for result_name in [ - "event_snapshots", - "event_thumbnails", - "review_thumbnails", - "previews", - "exports", - "recordings", - ]: - result = getattr(results, result_name, None) - if result and (result.aborted or result.error): - has_errors = True - break - - content = { - "success": not has_errors, - "dry_run": body.dry_run, - "media_types": body.media_types, - "results": results.to_dict(), - } - - if has_errors: - content["message"] = ( - "Some sync operations were aborted or had errors; check logs for details." - ) - - return JSONResponse( - content=content, - status_code=200, - ) - except Exception as e: - logger.error(f"Error syncing media files: {e}") + if job_id is None: + # A job is already running + current = get_current_media_sync_job() return JSONResponse( content={ - "success": False, - "message": f"Error syncing media files: {str(e)}", + "error": "A media sync job is already running", + "current_job_id": current.id if current else None, }, - status_code=500, + status_code=409, ) + return JSONResponse( + content={ + "job": { + "job_type": "media_sync", + "status": JobStatusTypesEnum.queued, + "id": job_id, + } + }, + status_code=202, + ) + + +@router.get("/media/sync/current", dependencies=[Depends(require_role(["admin"]))]) +def get_media_sync_current(): + """Get the current running media sync job, if any.""" + job = get_current_media_sync_job() + + if job is None: + return JSONResponse(content={"job": None}, status_code=200) + + return JSONResponse( + content={"job": job.to_dict()}, + status_code=200, + ) + + +@router.get( + "/media/sync/status/{job_id}", dependencies=[Depends(require_role(["admin"]))] +) +def get_media_sync_status(job_id: str): + """Get the status of a specific media sync job.""" + job = get_media_sync_job_by_id(job_id) + + if job is None: + return JSONResponse( + content={"error": "Job not found"}, + status_code=404, + ) + + return JSONResponse( + content={"job": job.to_dict()}, + status_code=200, + ) + @router.get("/labels", dependencies=[Depends(allow_any_authenticated())]) def get_labels(camera: str = ""): diff --git a/frigate/jobs/media_sync.py b/frigate/jobs/media_sync.py new file mode 100644 index 000000000..3775110b4 --- /dev/null +++ b/frigate/jobs/media_sync.py @@ -0,0 +1,135 @@ +"""Media sync job management with background execution.""" + +import logging +import threading +from dataclasses import dataclass, field +from datetime import datetime +from typing import Optional + +from frigate.comms.inter_process import InterProcessRequestor +from frigate.const import UPDATE_JOB_STATE +from frigate.jobs.job import Job +from frigate.jobs.manager import ( + get_current_job, + get_job_by_id, + job_is_running, + set_current_job, +) +from frigate.types import JobStatusTypesEnum +from frigate.util.media import sync_all_media + +logger = logging.getLogger(__name__) + + +@dataclass +class MediaSyncJob(Job): + """In-memory job state for media sync operations.""" + + job_type: str = "media_sync" + dry_run: bool = False + media_types: list[str] = field(default_factory=lambda: ["all"]) + force: bool = False + + +class MediaSyncRunner(threading.Thread): + """Thread-based runner for media sync jobs.""" + + def __init__(self, job: MediaSyncJob) -> None: + super().__init__(daemon=True, name="media_sync") + self.job = job + self.requestor = InterProcessRequestor() + + def run(self) -> None: + """Execute the media sync job and broadcast status updates.""" + try: + # Update job status to running + self.job.status = JobStatusTypesEnum.running + self.job.start_time = datetime.now().timestamp() + self._broadcast_status() + + # Execute sync with provided parameters + logger.info( + f"Starting media sync job {self.job.id}: " + f"media_types={self.job.media_types}, " + f"dry_run={self.job.dry_run}, " + f"force={self.job.force}" + ) + + results = sync_all_media( + dry_run=self.job.dry_run, + media_types=self.job.media_types, + force=self.job.force, + ) + + # Store results and mark as complete + self.job.results = results.to_dict() + self.job.status = JobStatusTypesEnum.success + self.job.end_time = datetime.now().timestamp() + + logger.info(f"Media sync job {self.job.id} completed successfully") + self._broadcast_status() + + except Exception as e: + logger.error(f"Media sync job {self.job.id} failed: {e}", exc_info=True) + self.job.status = JobStatusTypesEnum.failed + self.job.error_message = str(e) + self.job.end_time = datetime.now().timestamp() + self._broadcast_status() + + finally: + if self.requestor: + self.requestor.stop() + + def _broadcast_status(self) -> None: + """Broadcast job status update via IPC to all WebSocket subscribers.""" + try: + self.requestor.send_data( + UPDATE_JOB_STATE, + self.job.to_dict(), + ) + except Exception as e: + logger.warning(f"Failed to broadcast media sync status: {e}") + + +def start_media_sync_job( + dry_run: bool = False, + media_types: Optional[list[str]] = None, + force: bool = False, +) -> Optional[str]: + """Start a new media sync job if none is currently running. + + Returns job ID on success, None if job already running. + """ + # Check if a job is already running + if job_is_running("media_sync"): + current = get_current_job("media_sync") + logger.warning( + f"Media sync job {current.id} is already running. Rejecting new request." + ) + return None + + # Create and start new job + job = MediaSyncJob( + dry_run=dry_run, + media_types=media_types or ["all"], + force=force, + ) + + logger.info(f"Creating new media sync job: {job.id}") + set_current_job(job) + + # Start the background runner + runner = MediaSyncRunner(job) + runner.start() + + return job.id + + +def get_current_media_sync_job() -> Optional[MediaSyncJob]: + """Get the current running/queued media sync job, if any.""" + return get_current_job("media_sync") + + +def get_media_sync_job_by_id(job_id: str) -> Optional[MediaSyncJob]: + """Get media sync job by ID. Currently only tracks the current job.""" + return get_job_by_id("media_sync", job_id)