diff --git a/frigate/app.py b/frigate/app.py index 750f1ad23..0ead74268 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -52,6 +52,7 @@ from frigate.embeddings import EmbeddingProcess, EmbeddingsContext from frigate.events.audio import AudioProcessor from frigate.events.cleanup import EventCleanup from frigate.events.maintainer import EventProcessor +from frigate.jobs.export import reap_stale_exports from frigate.jobs.motion_search import stop_all_motion_search_jobs from frigate.log import _stop_logging from frigate.models import ( @@ -611,6 +612,11 @@ class FrigateApp: # Clean up any stale replay camera artifacts (filesystem + DB) cleanup_replay_cameras() + # Reap any Export rows still marked in_progress from a previous + # session (crash, kill, broken migration). Runs synchronously before + # uvicorn binds so no API request can observe a stale row. + reap_stale_exports() + self.init_inter_process_communicator() self.start_detectors() self.init_dispatcher() diff --git a/frigate/jobs/export.py b/frigate/jobs/export.py index 950f46dd2..e85bc55eb 100644 --- a/frigate/jobs/export.py +++ b/frigate/jobs/export.py @@ -1,9 +1,11 @@ """Export job management with queued background execution.""" import logging +import os import threading import time from dataclasses import dataclass +from pathlib import Path from queue import Full, Queue from typing import Any, Optional @@ -230,6 +232,88 @@ def _get_max_concurrent(config: FrigateConfig) -> int: return int(config.record.export.max_concurrent) +def reap_stale_exports() -> None: + """Sweep Export rows stuck with in_progress=True from previous sessions. + + On Frigate startup no export job is alive yet, so any in_progress=True + row must be a leftover from a previous session that crashed, was killed + mid-export, or returned early from RecordingExporter.run() without + flipping the flag. For each stale row we either: + + - delete the row (and any thumb) if the video file is missing or empty, + since there is nothing worth recovering + - flip in_progress to False if the video file exists on disk and is + non-empty, treating it as a completed export the user can manage + through the normal UI + + Must only be called when the export job manager is certain to have no + active jobs — i.e., at Frigate startup, before any worker runs. + + All exceptions are caught and logged; the caller does not need to wrap + this in a try/except. A failure on a single row will not stop the rest + of the sweep, and a failure in the top-level query will log and return. + """ + try: + stale_exports = list(Export.select().where(Export.in_progress == True)) # noqa: E712 + except Exception: + logger.exception("Failed to query stale in-progress exports") + return + + if not stale_exports: + logger.debug("No stale in-progress exports found on startup") + return + + flipped = 0 + deleted = 0 + errored = 0 + + for export in stale_exports: + try: + video_path = export.video_path + has_usable_file = False + + if video_path: + try: + has_usable_file = os.path.getsize(video_path) > 0 + except OSError: + has_usable_file = False + + if has_usable_file: + # Unassign from any case on recovery: the user should + # re-triage a recovered export rather than have it silently + # reappear inside a case they curated. + Export.update( + {Export.in_progress: False, Export.export_case: None} + ).where(Export.id == export.id).execute() + flipped += 1 + logger.info( + "Recovered stale in-progress export %s (file intact on disk)", + export.id, + ) + continue + + if export.thumb_path: + Path(export.thumb_path).unlink(missing_ok=True) + if video_path: + Path(video_path).unlink(missing_ok=True) + Export.delete().where(Export.id == export.id).execute() + deleted += 1 + logger.info( + "Deleted stale in-progress export %s (no usable file on disk)", + export.id, + ) + except Exception: + errored += 1 + logger.exception("Failed to reap stale export %s", export.id) + + logger.info( + "Stale export cleanup complete: %d recovered, %d deleted, %d errored", + flipped, + deleted, + errored, + ) + + def get_export_job_manager(config: FrigateConfig) -> ExportJobManager: """Get or create the singleton export job manager.""" global _job_manager diff --git a/frigate/test/http_api/test_http_export.py b/frigate/test/http_api/test_http_export.py index 3aa2fae1a..30e248cff 100644 --- a/frigate/test/http_api/test_http_export.py +++ b/frigate/test/http_api/test_http_export.py @@ -1,6 +1,8 @@ +import os +import tempfile from unittest.mock import patch -from frigate.jobs.export import ExportJob +from frigate.jobs.export import ExportJob, reap_stale_exports from frigate.models import Export, ExportCase, Previews, Recordings from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp @@ -227,6 +229,144 @@ class TestHttpExport(BaseTestHttp): assert response.status_code == 200 assert response.json() == [queued_job.to_dict()] + def test_reap_stale_exports_deletes_rows_with_no_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + stale_video = os.path.join(tmpdir, "stale.mp4") + stale_thumb = os.path.join(tmpdir, "stale.webp") + # stale_video is intentionally NOT created + with open(stale_thumb, "w") as handle: + handle.write("thumb") + + Export.create( + id="stale_no_file", + camera="front_door", + name="Stuck export", + date=100, + video_path=stale_video, + thumb_path=stale_thumb, + in_progress=True, + ) + + reap_stale_exports() + + assert Export.get_or_none(Export.id == "stale_no_file") is None + assert not os.path.exists(stale_thumb) + + def test_reap_stale_exports_recovers_rows_with_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + intact_video = os.path.join(tmpdir, "intact.mp4") + intact_thumb = os.path.join(tmpdir, "intact.webp") + with open(intact_video, "wb") as handle: + handle.write(b"not actually an mp4 but non-empty") + with open(intact_thumb, "wb") as handle: + handle.write(b"thumb") + + case = ExportCase.create( + id="case_for_stale", + name="Curated case", + description="", + created_at=10, + updated_at=10, + ) + + Export.create( + id="stale_with_file", + camera="front_door", + name="Recoverable export", + date=200, + video_path=intact_video, + thumb_path=intact_thumb, + in_progress=True, + export_case=case, + ) + + reap_stale_exports() + + recovered = Export.get(Export.id == "stale_with_file") + assert recovered.in_progress is False + # Case link must be cleared so the user re-triages the recovered row + assert recovered.export_case is None + # The case itself is untouched + assert ExportCase.get_or_none(ExportCase.id == "case_for_stale") is not None + # Recovered files must NOT be unlinked + assert os.path.exists(intact_video) + assert os.path.exists(intact_thumb) + + def test_reap_stale_exports_delete_path_severs_case_link(self): + with tempfile.TemporaryDirectory() as tmpdir: + missing_video = os.path.join(tmpdir, "missing.mp4") + # file intentionally not created + + case = ExportCase.create( + id="case_losing_member", + name="Case losing a member", + description="", + created_at=20, + updated_at=20, + ) + + Export.create( + id="stale_in_case_no_file", + camera="front_door", + name="Stuck and in a case", + date=250, + video_path=missing_video, + thumb_path="", + in_progress=True, + export_case=case, + ) + + reap_stale_exports() + + # The export row is gone entirely + assert Export.get_or_none(Export.id == "stale_in_case_no_file") is None + # The case stays but has no exports pointing at it + remaining_case = ExportCase.get(ExportCase.id == "case_losing_member") + assert list(remaining_case.exports) == [] + + def test_reap_stale_exports_deletes_rows_with_empty_file(self): + with tempfile.TemporaryDirectory() as tmpdir: + empty_video = os.path.join(tmpdir, "empty.mp4") + # Create a zero-byte file — partial ffmpeg output + open(empty_video, "w").close() + + Export.create( + id="stale_empty_file", + camera="front_door", + name="Zero byte export", + date=300, + video_path=empty_video, + thumb_path="", + in_progress=True, + ) + + reap_stale_exports() + + assert Export.get_or_none(Export.id == "stale_empty_file") is None + assert not os.path.exists(empty_video) + + def test_reap_stale_exports_skips_completed_rows(self): + with tempfile.TemporaryDirectory() as tmpdir: + done_video = os.path.join(tmpdir, "done.mp4") + with open(done_video, "wb") as handle: + handle.write(b"done") + + Export.create( + id="already_done", + camera="front_door", + name="Completed export", + date=400, + video_path=done_video, + thumb_path="", + in_progress=False, + ) + + reap_stale_exports() + + row = Export.get(Export.id == "already_done") + assert row.in_progress is False + assert os.path.exists(done_video) + def test_batch_export_requires_case_target(self): with AuthTestClient(self.app) as client: response = client.post(