From aa860fda81d4291d33fd40cb7afeacacc55d35e7 Mon Sep 17 00:00:00 2001 From: ibs0d <53568938+ibs0d@users.noreply.github.com> Date: Sat, 7 Mar 2026 10:30:46 +1100 Subject: [PATCH] Fix multi-root cleanup/sync behavior and add tests --- frigate/api/app.py | 7 ++- frigate/api/record.py | 13 +++-- frigate/app.py | 2 + frigate/config/camera/camera.py | 16 +++++- frigate/config/config.py | 8 ++- frigate/jobs/media_sync.py | 4 ++ frigate/record/cleanup.py | 12 ++++- frigate/record/maintainer.py | 3 +- frigate/stats/util.py | 4 +- frigate/storage.py | 54 ++++++++++++++------- frigate/test/test_config.py | 38 +++++++++++++++ frigate/test/test_media.py | 86 +++++++++++++++++++++++++++++++++ frigate/test/test_storage.py | 71 ++++++++++++++++++++++++++- frigate/util/media.py | 44 +++++++++-------- 14 files changed, 306 insertions(+), 56 deletions(-) create mode 100644 frigate/test/test_media.py diff --git a/frigate/api/app.py b/frigate/api/app.py index a28f174de..c2b730ec6 100644 --- a/frigate/api/app.py +++ b/frigate/api/app.py @@ -788,7 +788,7 @@ def restart(): description="""Start an asynchronous media sync job to find and (optionally) remove orphaned media files. Returns 202 with job details when queued, or 409 if a job is already running.""", ) -def sync_media(body: MediaSyncBody = Body(...)): +def sync_media(request: Request, body: MediaSyncBody = Body(...)): """Start async media sync job - remove orphaned files. Syncs specified media types: event snapshots, event thumbnails, review thumbnails, @@ -804,7 +804,10 @@ def sync_media(body: MediaSyncBody = Body(...)): 202 Accepted with job_id, or 409 Conflict if job already running. """ job_id = start_media_sync_job( - dry_run=body.dry_run, media_types=body.media_types, force=body.force + dry_run=body.dry_run, + media_types=body.media_types, + force=body.force, + recordings_roots=request.app.frigate_config.get_recordings_paths(), ) if job_id is None: diff --git a/frigate/api/record.py b/frigate/api/record.py index 6eeb9fbe6..579a7a1f5 100644 --- a/frigate/api/record.py +++ b/frigate/api/record.py @@ -25,7 +25,6 @@ from frigate.api.defs.query.recordings_query_parameters import ( ) from frigate.api.defs.response.generic_response import GenericResponse from frigate.api.defs.tags import Tags -from frigate.const import RECORD_DIR from frigate.models import Event, Recordings from frigate.util.time import get_dst_transitions @@ -36,15 +35,15 @@ router = APIRouter(tags=[Tags.recordings]) @router.get("/recordings/storage", dependencies=[Depends(allow_any_authenticated())]) def get_recordings_storage_usage(request: Request): - recording_stats = request.app.stats_emitter.get_latest_stats()["service"][ - "storage" - ][RECORD_DIR] + storage_stats = request.app.stats_emitter.get_latest_stats()["service"]["storage"] - if not recording_stats: + recording_paths = request.app.frigate_config.get_recordings_paths() + recording_stats = [storage_stats.get(path, {}) for path in recording_paths] + total_mb = sum(stat.get("total", 0) for stat in recording_stats) + + if total_mb == 0: return JSONResponse({}) - total_mb = recording_stats["total"] - camera_usages: dict[str, dict] = ( request.app.storage_maintainer.calculate_camera_usages() ) diff --git a/frigate/app.py b/frigate/app.py index 0add3e3b8..821f34e61 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -131,6 +131,8 @@ class FrigateApp: EXPORT_DIR, ] + dirs.extend(self.config.get_recordings_paths()) + if self.config.face_recognition.enabled: dirs.append(FACE_DIR) diff --git a/frigate/config/camera/camera.py b/frigate/config/camera/camera.py index 21397065b..2eb3d138b 100644 --- a/frigate/config/camera/camera.py +++ b/frigate/config/camera/camera.py @@ -2,7 +2,7 @@ import os from enum import Enum from typing import Optional -from pydantic import Field, PrivateAttr, model_validator +from pydantic import Field, PrivateAttr, field_validator, model_validator from frigate.const import CACHE_DIR, CACHE_SEGMENT_FORMAT, REGEX_CAMERA_NAME from frigate.ffmpeg_presets import ( @@ -72,6 +72,20 @@ class CameraConfig(FrigateBaseModel): enabled: bool = Field(default=True, title="Enabled", description="Enabled") + path: str = Field( + default="/media/frigate/recordings", + title="Recordings path", + description="Absolute base path for this camera's recordings. Recordings are stored in camera/date subdirectories under this path.", + ) + + @field_validator("path") + @classmethod + def validate_path(cls, value: str) -> str: + if not os.path.isabs(value): + raise ValueError("Camera path must be an absolute path.") + + return value.rstrip("/") or "/" + # Options with global fallback audio: AudioConfig = Field( default_factory=AudioConfig, diff --git a/frigate/config/config.py b/frigate/config/config.py index 7e2d0eddc..7a2a5d012 100644 --- a/frigate/config/config.py +++ b/frigate/config/config.py @@ -19,7 +19,7 @@ from pydantic import ( from ruamel.yaml import YAML from typing_extensions import Self -from frigate.const import REGEX_JSON +from frigate.const import RECORD_DIR, REGEX_JSON from frigate.detectors import DetectorConfig, ModelConfig from frigate.detectors.detector_config import BaseDetectorConfig from frigate.plus import PlusApi @@ -942,6 +942,12 @@ class FrigateConfig(FrigateBaseModel): raise ValueError("Zones cannot share names with cameras") return v + def get_camera_recordings_path(self, camera: str) -> str: + return self.cameras[camera].path + + def get_recordings_paths(self) -> list[str]: + return sorted({camera.path for camera in self.cameras.values()}) + @classmethod def load(cls, **kwargs): """Loads the Frigate config file, runs migrations, and creates the config object.""" diff --git a/frigate/jobs/media_sync.py b/frigate/jobs/media_sync.py index 7c15435fd..46b64c4fe 100644 --- a/frigate/jobs/media_sync.py +++ b/frigate/jobs/media_sync.py @@ -29,6 +29,7 @@ class MediaSyncJob(Job): dry_run: bool = False media_types: list[str] = field(default_factory=lambda: ["all"]) force: bool = False + recordings_roots: list[str] = field(default_factory=list) class MediaSyncRunner(threading.Thread): @@ -59,6 +60,7 @@ class MediaSyncRunner(threading.Thread): dry_run=self.job.dry_run, media_types=self.job.media_types, force=self.job.force, + recordings_roots=self.job.recordings_roots, ) # Store results and mark as complete @@ -95,6 +97,7 @@ def start_media_sync_job( dry_run: bool = False, media_types: Optional[list[str]] = None, force: bool = False, + recordings_roots: Optional[list[str]] = None, ) -> Optional[str]: """Start a new media sync job if none is currently running. @@ -113,6 +116,7 @@ def start_media_sync_job( dry_run=dry_run, media_types=media_types or ["all"], force=force, + recordings_roots=recordings_roots or [], ) logger.debug(f"Creating new media sync job: {job.id}") diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index 15a0ba7e8..8a0755223 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -11,7 +11,7 @@ from pathlib import Path from playhouse.sqlite_ext import SqliteExtDatabase from frigate.config import CameraConfig, FrigateConfig, RetainModeEnum -from frigate.const import CACHE_DIR, CLIPS_DIR, MAX_WAL_SIZE, RECORD_DIR +from frigate.const import CACHE_DIR, CLIPS_DIR, MAX_WAL_SIZE from frigate.models import Previews, Recordings, ReviewSegment, UserReviewStatus from frigate.util.builtin import clear_and_unlink from frigate.util.media import remove_empty_directories @@ -379,5 +379,13 @@ class RecordingCleanup(threading.Thread): if counter == 0: self.clean_tmp_clips() maybe_empty_dirs = self.expire_recordings() - remove_empty_directories(Path(RECORD_DIR), maybe_empty_dirs) + for recordings_root in self.config.get_recordings_paths(): + remove_empty_directories( + Path(recordings_root), + [ + path + for path in maybe_empty_dirs + if Path(recordings_root) in path.parents + ], + ) self.truncate_wal() diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 68040476a..814611df0 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -34,7 +34,6 @@ from frigate.const import ( INSERT_MANY_RECORDINGS, MAX_SEGMENT_DURATION, MAX_SEGMENTS_IN_CACHE, - RECORD_DIR, ) from frigate.models import Recordings, ReviewSegment from frigate.review.types import SeverityEnum @@ -585,7 +584,7 @@ class RecordingMaintainer(threading.Thread): # directory will be in utc due to start_time being in utc directory = os.path.join( - RECORD_DIR, + self.config.get_camera_recordings_path(camera), start_time.strftime("%Y-%m-%d/%H"), camera, ) diff --git a/frigate/stats/util.py b/frigate/stats/util.py index 40337268e..a7ae048c6 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -12,7 +12,7 @@ import requests from requests.exceptions import RequestException from frigate.config import FrigateConfig -from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR +from frigate.const import CACHE_DIR, CLIPS_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.object_detection.base import ObjectDetectProcess from frigate.types import StatsTrackingTypes @@ -483,7 +483,7 @@ def stats_snapshot( "last_updated": int(time.time()), } - for path in [RECORD_DIR, CLIPS_DIR, CACHE_DIR]: + for path in [*config.get_recordings_paths(), CLIPS_DIR, CACHE_DIR]: try: storage_stats = shutil.disk_usage(path) except (FileNotFoundError, OSError): diff --git a/frigate/storage.py b/frigate/storage.py index 93463c542..35f2f8811 100644 --- a/frigate/storage.py +++ b/frigate/storage.py @@ -8,7 +8,7 @@ from pathlib import Path from peewee import SQL, fn from frigate.config import FrigateConfig -from frigate.const import RECORD_DIR, REPLAY_CAMERA_PREFIX +from frigate.const import REPLAY_CAMERA_PREFIX from frigate.models import Event, Recordings from frigate.util.builtin import clear_and_unlink @@ -103,26 +103,41 @@ class StorageMaintainer(threading.Thread): return usages - def check_storage_needs_cleanup(self) -> bool: - """Return if storage needs cleanup.""" + def _get_path_bandwidths(self) -> dict[str, float]: + bandwidth_per_path: dict[str, float] = {} + + for camera, stats in self.camera_storage_stats.items(): + path = self.config.get_camera_recordings_path(camera) + bandwidth_per_path[path] = bandwidth_per_path.get(path, 0) + stats.get( + "bandwidth", 0 + ) + + return bandwidth_per_path + + def check_storage_needs_cleanup(self) -> str | None: + """Return recordings root path that needs cleanup, if any.""" # currently runs cleanup if less than 1 hour of space is left # disk_usage should not spin up disks - hourly_bandwidth = sum( - [b["bandwidth"] for b in self.camera_storage_stats.values()] - ) - remaining_storage = round(shutil.disk_usage(RECORD_DIR).free / pow(2, 20), 1) - logger.debug( - f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}." - ) - return remaining_storage < hourly_bandwidth + for path, hourly_bandwidth in self._get_path_bandwidths().items(): + try: + remaining_storage = round(shutil.disk_usage(path).free / pow(2, 20), 1) + except (FileNotFoundError, OSError): + continue - def reduce_storage_consumption(self) -> None: + logger.debug( + f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage} for path {path}." + ) + + if remaining_storage < hourly_bandwidth: + return path + + return None + + def reduce_storage_consumption(self, recordings_root: str) -> None: """Remove oldest hour of recordings.""" logger.debug("Starting storage cleanup.") deleted_segments_size = 0 - hourly_bandwidth = sum( - [b["bandwidth"] for b in self.camera_storage_stats.values()] - ) + hourly_bandwidth = self._get_path_bandwidths().get(recordings_root, 0) recordings: Recordings = ( Recordings.select( @@ -133,6 +148,7 @@ class StorageMaintainer(threading.Thread): Recordings.segment_size, Recordings.path, ) + .where(Recordings.path.startswith(f"{recordings_root}/")) .order_by(Recordings.start_time.asc()) .namedtuples() .iterator() @@ -207,6 +223,7 @@ class StorageMaintainer(threading.Thread): Recordings.path, Recordings.segment_size, ) + .where(Recordings.path.startswith(f"{recordings_root}/")) .order_by(Recordings.start_time.asc()) .namedtuples() .iterator() @@ -288,10 +305,11 @@ class StorageMaintainer(threading.Thread): self.calculate_camera_bandwidth() logger.debug(f"Default camera bandwidths: {self.camera_storage_stats}.") - if self.check_storage_needs_cleanup(): + cleanup_root = self.check_storage_needs_cleanup() + if cleanup_root: logger.info( - "Less than 1 hour of recording space left, running storage maintenance..." + f"Less than 1 hour of recording space left for {cleanup_root}, running storage maintenance..." ) - self.reduce_storage_consumption() + self.reduce_storage_consumption(cleanup_root) logger.info("Exiting storage maintainer...") diff --git a/frigate/test/test_config.py b/frigate/test/test_config.py index e903c2ac3..24f991d4e 100644 --- a/frigate/test/test_config.py +++ b/frigate/test/test_config.py @@ -68,6 +68,44 @@ class TestConfig(unittest.TestCase): assert frigate_config.detectors["cpu"].type == DetectorTypeEnum.cpu assert frigate_config.detectors["cpu"].model.width == 320 + def test_default_camera_recordings_path(self): + frigate_config = FrigateConfig(**self.minimal) + assert ( + frigate_config.cameras["back"].path == "/media/frigate/recordings" + ) + + def test_camera_recordings_path_must_be_absolute(self): + config = deep_merge( + self.minimal, + {"cameras": {"back": {"path": "recordings"}}}, + override=True, + ) + + self.assertRaises(ValidationError, lambda: FrigateConfig(**config)) + + def test_get_recordings_paths_uses_only_configured_paths(self): + config = deep_merge( + self.minimal, + { + "cameras": { + "back": {"path": "/video2"}, + "side": { + "path": "/video3", + "ffmpeg": { + "inputs": [ + {"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]} + ] + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + } + }, + override=True, + ) + + frigate_config = FrigateConfig(**config) + assert frigate_config.get_recordings_paths() == ["/video2", "/video3"] + @patch("frigate.detectors.detector_config.load_labels") def test_detector_custom_model_path(self, mock_labels): mock_labels.return_value = {} diff --git a/frigate/test/test_media.py b/frigate/test/test_media.py new file mode 100644 index 000000000..f490239cf --- /dev/null +++ b/frigate/test/test_media.py @@ -0,0 +1,86 @@ +import os +import tempfile +import unittest + +from peewee_migrate import Router +from playhouse.sqlite_ext import SqliteExtDatabase +from playhouse.sqliteq import SqliteQueueDatabase + +from frigate.models import Recordings, RecordingsToDelete +from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS +from frigate.util.media import sync_recordings + + +class TestMediaSync(unittest.TestCase): + def setUp(self): + migrate_db = SqliteExtDatabase(TEST_DB) + router = Router(migrate_db) + router.run() + migrate_db.close() + + self.db = SqliteQueueDatabase(TEST_DB) + models = [Recordings, RecordingsToDelete] + self.db.bind(models) + + self.root_a = tempfile.mkdtemp() + self.root_b = tempfile.mkdtemp() + + def tearDown(self): + if not self.db.is_closed(): + self.db.close() + + try: + for file in TEST_DB_CLEANUPS: + os.remove(file) + except OSError: + pass + + def test_sync_recordings_scans_all_configured_roots(self): + recording_path = os.path.join(self.root_a, "recording_a.mp4") + orphan_path = os.path.join(self.root_b, "orphan_b.mp4") + + with open(recording_path, "w"): + pass + with open(orphan_path, "w"): + pass + + Recordings.insert( + id="rec_a", + camera="front_door", + path=recording_path, + start_time=1, + end_time=2, + duration=1, + motion=True, + objects=True, + segment_size=1, + ).execute() + + result = sync_recordings( + dry_run=True, + force=True, + recordings_roots=[self.root_a, self.root_b], + ) + + assert result.files_checked == 2 + assert result.orphans_found == 1 + assert orphan_path in result.orphan_paths + + def test_sync_recordings_does_not_scan_unconfigured_roots(self): + orphan_path = os.path.join(self.root_b, "orphan_b.mp4") + + with open(orphan_path, "w"): + pass + + result = sync_recordings( + dry_run=True, + force=True, + recordings_roots=[self.root_a], + ) + + assert result.files_checked == 0 + assert result.orphans_found == 0 + + +if __name__ == "__main__": + unittest.main() diff --git a/frigate/test/test_storage.py b/frigate/test/test_storage.py index 4ae5715ca..1a6c9d070 100644 --- a/frigate/test/test_storage.py +++ b/frigate/test/test_storage.py @@ -200,7 +200,7 @@ class TestHttp(unittest.TestCase): ) storage.calculate_camera_bandwidth() - storage.reduce_storage_consumption() + storage.reduce_storage_consumption(config.get_camera_recordings_path("front_door")) with self.assertRaises(DoesNotExist): assert Recordings.get(Recordings.id == rec_k_id) assert Recordings.get(Recordings.id == rec_k2_id) @@ -255,12 +255,79 @@ class TestHttp(unittest.TestCase): ) storage.calculate_camera_bandwidth() - storage.reduce_storage_consumption() + storage.reduce_storage_consumption(config.get_camera_recordings_path("front_door")) assert Recordings.get(Recordings.id == rec_k_id) assert Recordings.get(Recordings.id == rec_k2_id) assert Recordings.get(Recordings.id == rec_k3_id) + + def test_storage_cleanup_only_for_overflowed_root(self): + """Ensure cleanup removes recordings only from the targeted recordings root.""" + root_a = tempfile.mkdtemp() + root_b = tempfile.mkdtemp() + + config = FrigateConfig( + **{ + "mqtt": {"host": "mqtt"}, + "cameras": { + "front_door": { + "path": root_a, + "ffmpeg": { + "inputs": [ + {"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]} + ] + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + "back_door": { + "path": root_b, + "ffmpeg": { + "inputs": [ + {"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]} + ] + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + }, + } + ) + storage = StorageMaintainer(config, MagicMock()) + + t0 = datetime.datetime.now().timestamp() + delete_a = "a.delete" + keep_b = "b.keep" + _insert_mock_recording( + delete_a, + os.path.join(root_a, f"{delete_a}.tmp"), + t0 - 100, + t0 - 90, + camera="front_door", + seg_size=8, + seg_dur=10, + ) + _insert_mock_recording( + keep_b, + os.path.join(root_b, f"{keep_b}.tmp"), + t0 - 80, + t0 - 70, + camera="back_door", + seg_size=8, + seg_dur=10, + ) + + storage.camera_storage_stats = { + "front_door": {"bandwidth": 4, "needs_refresh": False}, + "back_door": {"bandwidth": 0, "needs_refresh": False}, + } + + storage.reduce_storage_consumption(root_a) + + with self.assertRaises(DoesNotExist): + Recordings.get(Recordings.id == delete_a) + + assert Recordings.get(Recordings.id == keep_b) + def _insert_mock_event( id: str, start: int, diff --git a/frigate/util/media.py b/frigate/util/media.py index c7de85c9f..84575fa9d 100644 --- a/frigate/util/media.py +++ b/frigate/util/media.py @@ -84,7 +84,10 @@ def remove_empty_directories(root: Path, paths: Iterable[Path]) -> None: def sync_recordings( - limited: bool = False, dry_run: bool = False, force: bool = False + limited: bool = False, + dry_run: bool = False, + force: bool = False, + recordings_roots: list[str] | None = None, ) -> SyncResult: """Sync recordings between the database and disk using the SyncResult format.""" @@ -110,6 +113,7 @@ def sync_recordings( page_size = 1000 num_pages = (recordings_count + page_size - 1) // page_size recordings_to_delete: list[dict] = [] + configured_roots = set(recordings_roots or [RECORD_DIR]) for page in range(num_pages): for recording in recordings_query.paginate(page, page_size): @@ -175,22 +179,19 @@ def sync_recordings( return result # Only try to cleanup files if db cleanup was successful or dry_run - if limited: - # get recording files from last 36 hours - hour_check = f"{RECORD_DIR}/{check_point.strftime('%Y-%m-%d/%H')}" - files_on_disk = { - os.path.join(root, file) - for root, _, files in os.walk(RECORD_DIR) - for file in files - if root > hour_check - } - else: - # get all recordings files on disk and put them in a set - files_on_disk = { - os.path.join(root, file) - for root, _, files in os.walk(RECORD_DIR) - for file in files - } + # get recording files on disk and put them in a set + files_on_disk = set() + for recordings_root in configured_roots: + for root, _, files in os.walk(recordings_root): + for file in files: + file_path = os.path.join(root, file) + + if limited: + file_mtime = os.path.getmtime(file_path) + if file_mtime < check_point.timestamp(): + continue + + files_on_disk.add(file_path) result.files_checked = len(files_on_disk) @@ -759,7 +760,10 @@ class MediaSyncResults: def sync_all_media( - dry_run: bool = False, media_types: list[str] = ["all"], force: bool = False + dry_run: bool = False, + media_types: list[str] = ["all"], + force: bool = False, + recordings_roots: list[str] | None = None, ) -> MediaSyncResults: """Sync specified media types with the database. @@ -797,7 +801,9 @@ def sync_all_media( results.exports = sync_exports(dry_run=dry_run, force=force) if sync_all or "recordings" in media_types: - results.recordings = sync_recordings(dry_run=dry_run, force=force) + results.recordings = sync_recordings( + dry_run=dry_run, force=force, recordings_roots=recordings_roots + ) logger.info( f"Media sync complete: checked {results.total_files_checked} files, "