fix deletion of in-progress exports in a case

This commit is contained in:
Josh Hawkins 2026-04-11 22:08:35 -05:00
parent d366bb3818
commit 05913891e1
3 changed files with 171 additions and 4 deletions

View File

@ -55,6 +55,7 @@ from frigate.jobs.export import (
ExportJob, ExportJob,
ExportQueueFullError, ExportQueueFullError,
available_export_queue_slots, available_export_queue_slots,
cancel_queued_export_jobs_for_case,
get_export_job, get_export_job,
list_active_export_jobs, list_active_export_jobs,
start_export_job, start_export_job,
@ -394,7 +395,7 @@ def update_export_case(case_id: str, body: ExportCaseUpdateBody):
summary="Delete export case", summary="Delete export case",
description="""Deletes an export case.\n Exports that reference this case will have their export_case set to null.\n """, description="""Deletes an export case.\n Exports that reference this case will have their export_case set to null.\n """,
) )
def delete_export_case(case_id: str): def delete_export_case(case_id: str, request: Request, delete_exports: bool = False):
try: try:
case = ExportCase.get(ExportCase.id == case_id) case = ExportCase.get(ExportCase.id == case_id)
except DoesNotExist: except DoesNotExist:
@ -403,8 +404,18 @@ def delete_export_case(case_id: str):
status_code=404, status_code=404,
) )
if delete_exports:
cancel_queued_export_jobs_for_case(request.app.frigate_config, case_id)
exports = list(Export.select().where(Export.export_case == case_id))
for export in exports:
Path(export.video_path).unlink(missing_ok=True)
if export.thumb_path:
Path(export.thumb_path).unlink(missing_ok=True)
export.delete_instance()
else:
# Unassign exports from this case but keep the exports themselves # Unassign exports from this case but keep the exports themselves
Export.update(export_case=None).where(Export.export_case == case).execute() Export.update(export_case=None).where(Export.export_case == case_id).execute()
case.delete_instance() case.delete_instance()

View File

@ -167,6 +167,41 @@ class ExportJobManager:
if job.status in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running) if job.status in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running)
] ]
def cancel_queued_jobs_for_case(self, case_id: str) -> list[ExportJob]:
"""Cancel queued export jobs assigned to a deleted case."""
cancelled_jobs: list[ExportJob] = []
with self.lock:
with self.queue.mutex:
retained_jobs: list[ExportJob] = []
while self.queue.queue:
job = self.queue.queue.popleft()
if (
job.export_case_id == case_id
and job.status == JobStatusTypesEnum.queued
):
job.status = JobStatusTypesEnum.cancelled
job.end_time = time.time()
cancelled_jobs.append(job)
continue
retained_jobs.append(job)
self.queue.queue.extend(retained_jobs)
if cancelled_jobs:
self.queue.unfinished_tasks = max(
0,
self.queue.unfinished_tasks - len(cancelled_jobs),
)
if self.queue.unfinished_tasks == 0:
self.queue.all_tasks_done.notify_all()
self.queue.not_full.notify_all()
return cancelled_jobs
def available_slots(self) -> int: def available_slots(self) -> int:
"""Approximate number of additional jobs that could be queued right now. """Approximate number of additional jobs that could be queued right now.
@ -340,6 +375,13 @@ def list_active_export_jobs(config: FrigateConfig) -> list[ExportJob]:
return get_export_job_manager(config).list_active_jobs() return get_export_job_manager(config).list_active_jobs()
def cancel_queued_export_jobs_for_case(
config: FrigateConfig, case_id: str
) -> list[ExportJob]:
"""Cancel queued export jobs that still point at a deleted case."""
return get_export_job_manager(config).cancel_queued_jobs_for_case(case_id)
def available_export_queue_slots(config: FrigateConfig) -> int: def available_export_queue_slots(config: FrigateConfig) -> int:
"""Approximate number of additional export jobs that could be queued now.""" """Approximate number of additional export jobs that could be queued now."""
return get_export_job_manager(config).available_slots() return get_export_job_manager(config).available_slots()

View File

@ -2,7 +2,12 @@ import os
import tempfile import tempfile
from unittest.mock import patch from unittest.mock import patch
from frigate.jobs.export import ExportJob, reap_stale_exports from frigate.jobs.export import (
ExportJob,
get_export_job_manager,
reap_stale_exports,
start_export_job,
)
from frigate.models import Export, ExportCase, Previews, Recordings from frigate.models import Export, ExportCase, Previews, Recordings
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
@ -91,6 +96,115 @@ class TestHttpExport(BaseTestHttp):
assert refreshed.description == "Updated" assert refreshed.description == "Updated"
assert refreshed.updated_at.timestamp() == 2222.0 assert refreshed.updated_at.timestamp() == 2222.0
def test_delete_export_case_delete_exports_cancels_queued_jobs(self):
case = ExportCase.create(
id="case_delete_me",
name="Delete me",
description="",
created_at=10,
updated_at=10,
)
other_case = ExportCase.create(
id="case_keep_me",
name="Keep me",
description="",
created_at=20,
updated_at=20,
)
with tempfile.TemporaryDirectory() as tmpdir:
video_path = os.path.join(tmpdir, "case_export.mp4")
thumb_path = os.path.join(tmpdir, "case_export.webp")
other_video_path = os.path.join(tmpdir, "other_export.mp4")
other_thumb_path = os.path.join(tmpdir, "other_export.webp")
with open(video_path, "wb") as handle:
handle.write(b"case")
with open(thumb_path, "wb") as handle:
handle.write(b"thumb")
with open(other_video_path, "wb") as handle:
handle.write(b"other")
with open(other_thumb_path, "wb") as handle:
handle.write(b"thumb")
Export.create(
id="export_in_case",
camera="front_door",
name="Case export",
date=100,
video_path=video_path,
thumb_path=thumb_path,
in_progress=False,
export_case=case,
)
Export.create(
id="export_other_case",
camera="front_door",
name="Other export",
date=110,
video_path=other_video_path,
thumb_path=other_thumb_path,
in_progress=False,
export_case=other_case,
)
with (
patch("frigate.jobs.export._job_manager", None),
patch(
"frigate.jobs.export.ExportJobManager.ensure_started",
autospec=True,
return_value=None,
),
):
start_export_job(
self.app.frigate_config,
ExportJob(
id="queued_case_job",
camera="front_door",
export_case_id=case.id,
request_start_time=100,
request_end_time=120,
),
)
start_export_job(
self.app.frigate_config,
ExportJob(
id="queued_other_job",
camera="front_door",
export_case_id=other_case.id,
request_start_time=130,
request_end_time=150,
),
)
manager = get_export_job_manager(self.app.frigate_config)
assert {job.id for job in manager.list_active_jobs()} == {
"queued_case_job",
"queued_other_job",
}
with AuthTestClient(self.app) as client:
response = client.delete(f"/cases/{case.id}?delete_exports=true")
assert response.status_code == 200
assert ExportCase.get_or_none(ExportCase.id == case.id) is None
assert ExportCase.get_or_none(ExportCase.id == other_case.id) is not None
assert Export.get_or_none(Export.id == "export_in_case") is None
assert Export.get_or_none(Export.id == "export_other_case") is not None
assert not os.path.exists(video_path)
assert not os.path.exists(thumb_path)
cancelled_job = manager.get_job("queued_case_job")
assert cancelled_job is not None
assert cancelled_job.status == "cancelled"
remaining_job = manager.get_job("queued_other_job")
assert remaining_job is not None
assert remaining_job.status == "queued"
assert [job.id for job in manager.list_active_jobs()] == [
"queued_other_job"
]
def test_batch_export_creates_case_and_reports_partial_success(self): def test_batch_export_creates_case_and_reports_partial_success(self):
self._insert_recording("rec-front", "front_door", 100, 200) self._insert_recording("rec-front", "front_door", 100, 200)