mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-09 15:05:26 +03:00
add stale export reaper on startup
This commit is contained in:
parent
0c70818a32
commit
482652ef87
@ -52,6 +52,7 @@ from frigate.embeddings import EmbeddingProcess, EmbeddingsContext
|
|||||||
from frigate.events.audio import AudioProcessor
|
from frigate.events.audio import AudioProcessor
|
||||||
from frigate.events.cleanup import EventCleanup
|
from frigate.events.cleanup import EventCleanup
|
||||||
from frigate.events.maintainer import EventProcessor
|
from frigate.events.maintainer import EventProcessor
|
||||||
|
from frigate.jobs.export import reap_stale_exports
|
||||||
from frigate.jobs.motion_search import stop_all_motion_search_jobs
|
from frigate.jobs.motion_search import stop_all_motion_search_jobs
|
||||||
from frigate.log import _stop_logging
|
from frigate.log import _stop_logging
|
||||||
from frigate.models import (
|
from frigate.models import (
|
||||||
@ -611,6 +612,11 @@ class FrigateApp:
|
|||||||
# Clean up any stale replay camera artifacts (filesystem + DB)
|
# Clean up any stale replay camera artifacts (filesystem + DB)
|
||||||
cleanup_replay_cameras()
|
cleanup_replay_cameras()
|
||||||
|
|
||||||
|
# Reap any Export rows still marked in_progress from a previous
|
||||||
|
# session (crash, kill, broken migration). Runs synchronously before
|
||||||
|
# uvicorn binds so no API request can observe a stale row.
|
||||||
|
reap_stale_exports()
|
||||||
|
|
||||||
self.init_inter_process_communicator()
|
self.init_inter_process_communicator()
|
||||||
self.start_detectors()
|
self.start_detectors()
|
||||||
self.init_dispatcher()
|
self.init_dispatcher()
|
||||||
|
|||||||
@ -1,9 +1,11 @@
|
|||||||
"""Export job management with queued background execution."""
|
"""Export job management with queued background execution."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
from queue import Full, Queue
|
from queue import Full, Queue
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
@ -230,6 +232,88 @@ def _get_max_concurrent(config: FrigateConfig) -> int:
|
|||||||
return int(config.record.export.max_concurrent)
|
return int(config.record.export.max_concurrent)
|
||||||
|
|
||||||
|
|
||||||
|
def reap_stale_exports() -> None:
|
||||||
|
"""Sweep Export rows stuck with in_progress=True from previous sessions.
|
||||||
|
|
||||||
|
On Frigate startup no export job is alive yet, so any in_progress=True
|
||||||
|
row must be a leftover from a previous session that crashed, was killed
|
||||||
|
mid-export, or returned early from RecordingExporter.run() without
|
||||||
|
flipping the flag. For each stale row we either:
|
||||||
|
|
||||||
|
- delete the row (and any thumb) if the video file is missing or empty,
|
||||||
|
since there is nothing worth recovering
|
||||||
|
- flip in_progress to False if the video file exists on disk and is
|
||||||
|
non-empty, treating it as a completed export the user can manage
|
||||||
|
through the normal UI
|
||||||
|
|
||||||
|
Must only be called when the export job manager is certain to have no
|
||||||
|
active jobs — i.e., at Frigate startup, before any worker runs.
|
||||||
|
|
||||||
|
All exceptions are caught and logged; the caller does not need to wrap
|
||||||
|
this in a try/except. A failure on a single row will not stop the rest
|
||||||
|
of the sweep, and a failure in the top-level query will log and return.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
stale_exports = list(Export.select().where(Export.in_progress == True)) # noqa: E712
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to query stale in-progress exports")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not stale_exports:
|
||||||
|
logger.debug("No stale in-progress exports found on startup")
|
||||||
|
return
|
||||||
|
|
||||||
|
flipped = 0
|
||||||
|
deleted = 0
|
||||||
|
errored = 0
|
||||||
|
|
||||||
|
for export in stale_exports:
|
||||||
|
try:
|
||||||
|
video_path = export.video_path
|
||||||
|
has_usable_file = False
|
||||||
|
|
||||||
|
if video_path:
|
||||||
|
try:
|
||||||
|
has_usable_file = os.path.getsize(video_path) > 0
|
||||||
|
except OSError:
|
||||||
|
has_usable_file = False
|
||||||
|
|
||||||
|
if has_usable_file:
|
||||||
|
# Unassign from any case on recovery: the user should
|
||||||
|
# re-triage a recovered export rather than have it silently
|
||||||
|
# reappear inside a case they curated.
|
||||||
|
Export.update(
|
||||||
|
{Export.in_progress: False, Export.export_case: None}
|
||||||
|
).where(Export.id == export.id).execute()
|
||||||
|
flipped += 1
|
||||||
|
logger.info(
|
||||||
|
"Recovered stale in-progress export %s (file intact on disk)",
|
||||||
|
export.id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if export.thumb_path:
|
||||||
|
Path(export.thumb_path).unlink(missing_ok=True)
|
||||||
|
if video_path:
|
||||||
|
Path(video_path).unlink(missing_ok=True)
|
||||||
|
Export.delete().where(Export.id == export.id).execute()
|
||||||
|
deleted += 1
|
||||||
|
logger.info(
|
||||||
|
"Deleted stale in-progress export %s (no usable file on disk)",
|
||||||
|
export.id,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
errored += 1
|
||||||
|
logger.exception("Failed to reap stale export %s", export.id)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Stale export cleanup complete: %d recovered, %d deleted, %d errored",
|
||||||
|
flipped,
|
||||||
|
deleted,
|
||||||
|
errored,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_export_job_manager(config: FrigateConfig) -> ExportJobManager:
|
def get_export_job_manager(config: FrigateConfig) -> ExportJobManager:
|
||||||
"""Get or create the singleton export job manager."""
|
"""Get or create the singleton export job manager."""
|
||||||
global _job_manager
|
global _job_manager
|
||||||
|
|||||||
@ -1,6 +1,8 @@
|
|||||||
|
import os
|
||||||
|
import tempfile
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from frigate.jobs.export import ExportJob
|
from frigate.jobs.export import ExportJob, reap_stale_exports
|
||||||
from frigate.models import Export, ExportCase, Previews, Recordings
|
from frigate.models import Export, ExportCase, Previews, Recordings
|
||||||
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
|
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
|
||||||
|
|
||||||
@ -227,6 +229,144 @@ class TestHttpExport(BaseTestHttp):
|
|||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.json() == [queued_job.to_dict()]
|
assert response.json() == [queued_job.to_dict()]
|
||||||
|
|
||||||
|
def test_reap_stale_exports_deletes_rows_with_no_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
stale_video = os.path.join(tmpdir, "stale.mp4")
|
||||||
|
stale_thumb = os.path.join(tmpdir, "stale.webp")
|
||||||
|
# stale_video is intentionally NOT created
|
||||||
|
with open(stale_thumb, "w") as handle:
|
||||||
|
handle.write("thumb")
|
||||||
|
|
||||||
|
Export.create(
|
||||||
|
id="stale_no_file",
|
||||||
|
camera="front_door",
|
||||||
|
name="Stuck export",
|
||||||
|
date=100,
|
||||||
|
video_path=stale_video,
|
||||||
|
thumb_path=stale_thumb,
|
||||||
|
in_progress=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
reap_stale_exports()
|
||||||
|
|
||||||
|
assert Export.get_or_none(Export.id == "stale_no_file") is None
|
||||||
|
assert not os.path.exists(stale_thumb)
|
||||||
|
|
||||||
|
def test_reap_stale_exports_recovers_rows_with_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
intact_video = os.path.join(tmpdir, "intact.mp4")
|
||||||
|
intact_thumb = os.path.join(tmpdir, "intact.webp")
|
||||||
|
with open(intact_video, "wb") as handle:
|
||||||
|
handle.write(b"not actually an mp4 but non-empty")
|
||||||
|
with open(intact_thumb, "wb") as handle:
|
||||||
|
handle.write(b"thumb")
|
||||||
|
|
||||||
|
case = ExportCase.create(
|
||||||
|
id="case_for_stale",
|
||||||
|
name="Curated case",
|
||||||
|
description="",
|
||||||
|
created_at=10,
|
||||||
|
updated_at=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
Export.create(
|
||||||
|
id="stale_with_file",
|
||||||
|
camera="front_door",
|
||||||
|
name="Recoverable export",
|
||||||
|
date=200,
|
||||||
|
video_path=intact_video,
|
||||||
|
thumb_path=intact_thumb,
|
||||||
|
in_progress=True,
|
||||||
|
export_case=case,
|
||||||
|
)
|
||||||
|
|
||||||
|
reap_stale_exports()
|
||||||
|
|
||||||
|
recovered = Export.get(Export.id == "stale_with_file")
|
||||||
|
assert recovered.in_progress is False
|
||||||
|
# Case link must be cleared so the user re-triages the recovered row
|
||||||
|
assert recovered.export_case is None
|
||||||
|
# The case itself is untouched
|
||||||
|
assert ExportCase.get_or_none(ExportCase.id == "case_for_stale") is not None
|
||||||
|
# Recovered files must NOT be unlinked
|
||||||
|
assert os.path.exists(intact_video)
|
||||||
|
assert os.path.exists(intact_thumb)
|
||||||
|
|
||||||
|
def test_reap_stale_exports_delete_path_severs_case_link(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
missing_video = os.path.join(tmpdir, "missing.mp4")
|
||||||
|
# file intentionally not created
|
||||||
|
|
||||||
|
case = ExportCase.create(
|
||||||
|
id="case_losing_member",
|
||||||
|
name="Case losing a member",
|
||||||
|
description="",
|
||||||
|
created_at=20,
|
||||||
|
updated_at=20,
|
||||||
|
)
|
||||||
|
|
||||||
|
Export.create(
|
||||||
|
id="stale_in_case_no_file",
|
||||||
|
camera="front_door",
|
||||||
|
name="Stuck and in a case",
|
||||||
|
date=250,
|
||||||
|
video_path=missing_video,
|
||||||
|
thumb_path="",
|
||||||
|
in_progress=True,
|
||||||
|
export_case=case,
|
||||||
|
)
|
||||||
|
|
||||||
|
reap_stale_exports()
|
||||||
|
|
||||||
|
# The export row is gone entirely
|
||||||
|
assert Export.get_or_none(Export.id == "stale_in_case_no_file") is None
|
||||||
|
# The case stays but has no exports pointing at it
|
||||||
|
remaining_case = ExportCase.get(ExportCase.id == "case_losing_member")
|
||||||
|
assert list(remaining_case.exports) == []
|
||||||
|
|
||||||
|
def test_reap_stale_exports_deletes_rows_with_empty_file(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
empty_video = os.path.join(tmpdir, "empty.mp4")
|
||||||
|
# Create a zero-byte file — partial ffmpeg output
|
||||||
|
open(empty_video, "w").close()
|
||||||
|
|
||||||
|
Export.create(
|
||||||
|
id="stale_empty_file",
|
||||||
|
camera="front_door",
|
||||||
|
name="Zero byte export",
|
||||||
|
date=300,
|
||||||
|
video_path=empty_video,
|
||||||
|
thumb_path="",
|
||||||
|
in_progress=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
reap_stale_exports()
|
||||||
|
|
||||||
|
assert Export.get_or_none(Export.id == "stale_empty_file") is None
|
||||||
|
assert not os.path.exists(empty_video)
|
||||||
|
|
||||||
|
def test_reap_stale_exports_skips_completed_rows(self):
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
done_video = os.path.join(tmpdir, "done.mp4")
|
||||||
|
with open(done_video, "wb") as handle:
|
||||||
|
handle.write(b"done")
|
||||||
|
|
||||||
|
Export.create(
|
||||||
|
id="already_done",
|
||||||
|
camera="front_door",
|
||||||
|
name="Completed export",
|
||||||
|
date=400,
|
||||||
|
video_path=done_video,
|
||||||
|
thumb_path="",
|
||||||
|
in_progress=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
reap_stale_exports()
|
||||||
|
|
||||||
|
row = Export.get(Export.id == "already_done")
|
||||||
|
assert row.in_progress is False
|
||||||
|
assert os.path.exists(done_video)
|
||||||
|
|
||||||
def test_batch_export_requires_case_target(self):
|
def test_batch_export_requires_case_target(self):
|
||||||
with AuthTestClient(self.app) as client:
|
with AuthTestClient(self.app) as client:
|
||||||
response = client.post(
|
response = client.post(
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user