implement media sync job and endpoints

This commit is contained in:
Josh Hawkins 2026-01-05 13:09:48 -06:00
parent c6e079e3e0
commit a3a5086dbc
2 changed files with 198 additions and 44 deletions

View File

@ -33,8 +33,14 @@ from frigate.config.camera.updater import (
CameraConfigUpdateTopic, CameraConfigUpdateTopic,
) )
from frigate.ffmpeg_presets import FFMPEG_HWACCEL_VAAPI, _gpu_selector from frigate.ffmpeg_presets import FFMPEG_HWACCEL_VAAPI, _gpu_selector
from frigate.jobs.media_sync import (
get_current_media_sync_job,
get_media_sync_job_by_id,
start_media_sync_job,
)
from frigate.models import Event, Timeline from frigate.models import Event, Timeline
from frigate.stats.prometheus import get_metrics, update_metrics from frigate.stats.prometheus import get_metrics, update_metrics
from frigate.types import JobStatusTypesEnum
from frigate.util.builtin import ( from frigate.util.builtin import (
clean_camera_user_pass, clean_camera_user_pass,
flatten_config_data, flatten_config_data,
@ -42,7 +48,6 @@ from frigate.util.builtin import (
update_yaml_file_bulk, update_yaml_file_bulk,
) )
from frigate.util.config import find_config_file from frigate.util.config import find_config_file
from frigate.util.media import sync_all_media
from frigate.util.services import ( from frigate.util.services import (
get_nvidia_driver_info, get_nvidia_driver_info,
process_logs, process_logs,
@ -605,10 +610,11 @@ def restart():
@router.post("/media/sync", dependencies=[Depends(require_role(["admin"]))]) @router.post("/media/sync", dependencies=[Depends(require_role(["admin"]))])
def sync_media(body: MediaSyncBody = Body(...)): def sync_media(body: MediaSyncBody = Body(...)):
"""Sync media files with database - remove orphaned files. """Start async media sync job - remove orphaned files.
Syncs specified media types: event snapshots, event thumbnails, review thumbnails, Syncs specified media types: event snapshots, event thumbnails, review thumbnails,
previews, exports, and/or recordings. previews, exports, and/or recordings. Job runs in background; use /media/sync/current
or /media/sync/status/{job_id} to check status.
Args: Args:
body: MediaSyncBody with dry_run flag and media_types list. body: MediaSyncBody with dry_run flag and media_types list.
@ -616,52 +622,65 @@ def sync_media(body: MediaSyncBody = Body(...)):
'review_thumbnails', 'previews', 'exports', 'recordings' 'review_thumbnails', 'previews', 'exports', 'recordings'
Returns: Returns:
JSON response with sync results for each requested media type. 202 Accepted with job_id, or 409 Conflict if job already running.
""" """
try: job_id = start_media_sync_job(
results = sync_all_media(
dry_run=body.dry_run, media_types=body.media_types, force=body.force dry_run=body.dry_run, media_types=body.media_types, force=body.force
) )
# Check if any operations were aborted or had errors if job_id is None:
has_errors = False # A job is already running
for result_name in [ current = get_current_media_sync_job()
"event_snapshots", return JSONResponse(
"event_thumbnails",
"review_thumbnails",
"previews",
"exports",
"recordings",
]:
result = getattr(results, result_name, None)
if result and (result.aborted or result.error):
has_errors = True
break
content={ content={
"success": not has_errors, "error": "A media sync job is already running",
"dry_run": body.dry_run, "current_job_id": current.id if current else None,
"media_types": body.media_types, },
"results": results.to_dict(), status_code=409,
}
if has_errors:
content["message"] = (
"Some sync operations were aborted or had errors; check logs for details."
) )
return JSONResponse( return JSONResponse(
content=content, content={
"job": {
"job_type": "media_sync",
"status": JobStatusTypesEnum.queued,
"id": job_id,
}
},
status_code=202,
)
@router.get("/media/sync/current", dependencies=[Depends(require_role(["admin"]))])
def get_media_sync_current():
"""Get the current running media sync job, if any."""
job = get_current_media_sync_job()
if job is None:
return JSONResponse(content={"job": None}, status_code=200)
return JSONResponse(
content={"job": job.to_dict()},
status_code=200, status_code=200,
) )
except Exception as e:
logger.error(f"Error syncing media files: {e}")
@router.get(
"/media/sync/status/{job_id}", dependencies=[Depends(require_role(["admin"]))]
)
def get_media_sync_status(job_id: str):
"""Get the status of a specific media sync job."""
job = get_media_sync_job_by_id(job_id)
if job is None:
return JSONResponse( return JSONResponse(
content={ content={"error": "Job not found"},
"success": False, status_code=404,
"message": f"Error syncing media files: {str(e)}", )
},
status_code=500, return JSONResponse(
content={"job": job.to_dict()},
status_code=200,
) )

135
frigate/jobs/media_sync.py Normal file
View File

@ -0,0 +1,135 @@
"""Media sync job management with background execution."""
import logging
import threading
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from frigate.comms.inter_process import InterProcessRequestor
from frigate.const import UPDATE_JOB_STATE
from frigate.jobs.job import Job
from frigate.jobs.manager import (
get_current_job,
get_job_by_id,
job_is_running,
set_current_job,
)
from frigate.types import JobStatusTypesEnum
from frigate.util.media import sync_all_media
logger = logging.getLogger(__name__)
@dataclass
class MediaSyncJob(Job):
"""In-memory job state for media sync operations."""
job_type: str = "media_sync"
dry_run: bool = False
media_types: list[str] = field(default_factory=lambda: ["all"])
force: bool = False
class MediaSyncRunner(threading.Thread):
"""Thread-based runner for media sync jobs."""
def __init__(self, job: MediaSyncJob) -> None:
super().__init__(daemon=True, name="media_sync")
self.job = job
self.requestor = InterProcessRequestor()
def run(self) -> None:
"""Execute the media sync job and broadcast status updates."""
try:
# Update job status to running
self.job.status = JobStatusTypesEnum.running
self.job.start_time = datetime.now().timestamp()
self._broadcast_status()
# Execute sync with provided parameters
logger.info(
f"Starting media sync job {self.job.id}: "
f"media_types={self.job.media_types}, "
f"dry_run={self.job.dry_run}, "
f"force={self.job.force}"
)
results = sync_all_media(
dry_run=self.job.dry_run,
media_types=self.job.media_types,
force=self.job.force,
)
# Store results and mark as complete
self.job.results = results.to_dict()
self.job.status = JobStatusTypesEnum.success
self.job.end_time = datetime.now().timestamp()
logger.info(f"Media sync job {self.job.id} completed successfully")
self._broadcast_status()
except Exception as e:
logger.error(f"Media sync job {self.job.id} failed: {e}", exc_info=True)
self.job.status = JobStatusTypesEnum.failed
self.job.error_message = str(e)
self.job.end_time = datetime.now().timestamp()
self._broadcast_status()
finally:
if self.requestor:
self.requestor.stop()
def _broadcast_status(self) -> None:
"""Broadcast job status update via IPC to all WebSocket subscribers."""
try:
self.requestor.send_data(
UPDATE_JOB_STATE,
self.job.to_dict(),
)
except Exception as e:
logger.warning(f"Failed to broadcast media sync status: {e}")
def start_media_sync_job(
dry_run: bool = False,
media_types: Optional[list[str]] = None,
force: bool = False,
) -> Optional[str]:
"""Start a new media sync job if none is currently running.
Returns job ID on success, None if job already running.
"""
# Check if a job is already running
if job_is_running("media_sync"):
current = get_current_job("media_sync")
logger.warning(
f"Media sync job {current.id} is already running. Rejecting new request."
)
return None
# Create and start new job
job = MediaSyncJob(
dry_run=dry_run,
media_types=media_types or ["all"],
force=force,
)
logger.info(f"Creating new media sync job: {job.id}")
set_current_job(job)
# Start the background runner
runner = MediaSyncRunner(job)
runner.start()
return job.id
def get_current_media_sync_job() -> Optional[MediaSyncJob]:
"""Get the current running/queued media sync job, if any."""
return get_current_job("media_sync")
def get_media_sync_job_by_id(job_id: str) -> Optional[MediaSyncJob]:
"""Get media sync job by ID. Currently only tracks the current job."""
return get_job_by_id("media_sync", job_id)