diff --git a/frigate/storage.py b/frigate/storage.py index feabe06ff..7f0d4784a 100644 --- a/frigate/storage.py +++ b/frigate/storage.py @@ -1,15 +1,18 @@ """Handle storage retention and usage.""" +import datetime import logging import shutil import threading +from collections import defaultdict +from datetime import timedelta from pathlib import Path from peewee import SQL, fn -from frigate.config import FrigateConfig +from frigate.config import FrigateConfig, RetainPolicyEnum from frigate.const import RECORD_DIR -from frigate.models import Event, Recordings +from frigate.models import Event, Recordings, ReviewSegment from frigate.util.builtin import clear_and_unlink logger = logging.getLogger(__name__) @@ -108,8 +111,65 @@ class StorageMaintainer(threading.Thread): ) return remaining_storage < hourly_bandwidth + def _finalize_deleted_recordings(self, deleted_recordings: list) -> None: + """Delete recording DB rows and update has_clip on affected events.""" + logger.debug("Expiring %s recordings", len(deleted_recordings)) + max_deletes = 100000 + + # Update has_clip for events that overlap with deleted recordings + if deleted_recordings: + # Group deleted recordings by camera + camera_recordings: dict[str, dict] = {} + for recording in deleted_recordings: + if recording.camera not in camera_recordings: + camera_recordings[recording.camera] = { + "min_start": recording.start_time, + "max_end": recording.end_time, + } + else: + camera_recordings[recording.camera]["min_start"] = min( + camera_recordings[recording.camera]["min_start"], + recording.start_time, + ) + camera_recordings[recording.camera]["max_end"] = max( + camera_recordings[recording.camera]["max_end"], + recording.end_time, + ) + + # Find all events that overlap with deleted recordings time range per camera + events_to_update = [] + for camera, time_range in camera_recordings.items(): + overlapping_events = Event.select(Event.id).where( + Event.camera == camera, + Event.has_clip == True, + Event.start_time < time_range["max_end"], + Event.end_time > time_range["min_start"], + ) + + for event in overlapping_events: + events_to_update.append(event.id) + + # Update has_clip to False for overlapping events + if events_to_update: + for i in range(0, len(events_to_update), max_deletes): + batch = events_to_update[i : i + max_deletes] + Event.update(has_clip=False).where(Event.id << batch).execute() + logger.debug( + "Updated has_clip to False for %s events", + len(events_to_update), + ) + + deleted_recordings_list = [r.id for r in deleted_recordings] + for i in range(0, len(deleted_recordings_list), max_deletes): + Recordings.delete().where( + Recordings.id << deleted_recordings_list[i : i + max_deletes] + ).execute() + def reduce_storage_consumption(self) -> None: """Remove oldest hour of recordings.""" + if self.config.record.retain_policy == RetainPolicyEnum.continuous_rollover: + return self._reduce_storage_rollover() + logger.debug("Starting storage cleanup.") deleted_segments_size = 0 hourly_bandwidth = sum( @@ -218,57 +278,209 @@ class StorageMaintainer(threading.Thread): else: logger.info(f"Cleaned up {deleted_segments_size} MB of recordings") - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - # delete up to 100,000 at a time - max_deletes = 100000 + self._finalize_deleted_recordings(deleted_recordings) - # Update has_clip for events that overlap with deleted recordings - if deleted_recordings: - # Group deleted recordings by camera - camera_recordings = {} - for recording in deleted_recordings: - if recording.camera not in camera_recordings: - camera_recordings[recording.camera] = { - "min_start": recording.start_time, - "max_end": recording.end_time, - } - else: - camera_recordings[recording.camera]["min_start"] = min( - camera_recordings[recording.camera]["min_start"], - recording.start_time, - ) - camera_recordings[recording.camera]["max_end"] = max( - camera_recordings[recording.camera]["max_end"], - recording.end_time, - ) + def _reduce_storage_rollover(self) -> None: + """Remove recordings using smart prioritized deletion for rollover mode. - # Find all events that overlap with deleted recordings time range per camera - events_to_update = [] - for camera, time_range in camera_recordings.items(): - overlapping_events = Event.select(Event.id).where( - Event.camera == camera, - Event.has_clip == True, - Event.start_time < time_range["max_end"], - Event.end_time > time_range["min_start"], + Deletion priority: + 1. Overwritable: continuous recordings with no event/review overlap (oldest first) + 2. Event retention: recordings overlapping active review segments (oldest first) + 3. Protected: recordings overlapping retain_indefinitely events (emergency only) + """ + logger.debug("Starting smart rollover storage cleanup.") + hourly_bandwidth = sum( + b["bandwidth"] for b in self.camera_storage_stats.values() + ) + now = datetime.datetime.now().timestamp() + + # Compute retention cutoffs for review segments + alert_cutoff = ( + now - timedelta(days=self.config.record.alerts.retain.days).total_seconds() + ) + detection_cutoff = ( + now + - timedelta(days=self.config.record.detections.retain.days).total_seconds() + ) + + # Query 1: All recordings, oldest first + recordings: Recordings = ( + Recordings.select( + Recordings.id, + Recordings.camera, + Recordings.start_time, + Recordings.end_time, + Recordings.segment_size, + Recordings.path, + ) + .order_by(Recordings.start_time.asc()) + .namedtuples() + .iterator() + ) + + # Query 2: Protected events (retain_indefinitely), sorted + # No camera filter — matches existing StorageMaintainer behavior + retained_events: list = list( + Event.select( + Event.start_time, + Event.end_time, + ) + .where( + Event.retain_indefinitely == True, + Event.has_clip, + ) + .order_by(Event.start_time.asc()) + .namedtuples() + ) + + # Query 3: Non-expired review segments, sorted by start_time + # Camera-scoped: only protect recordings from the same camera + active_reviews_raw: list = list( + ReviewSegment.select( + ReviewSegment.camera, + ReviewSegment.start_time, + ReviewSegment.end_time, + ) + .where( + (ReviewSegment.end_time.is_null()) + | ( + (ReviewSegment.severity == "alert") + & (ReviewSegment.end_time >= alert_cutoff) ) - - for event in overlapping_events: - events_to_update.append(event.id) - - # Update has_clip to False for overlapping events - if events_to_update: - for i in range(0, len(events_to_update), max_deletes): - batch = events_to_update[i : i + max_deletes] - Event.update(has_clip=False).where(Event.id << batch).execute() - logger.debug( - f"Updated has_clip to False for {len(events_to_update)} events" + | ( + (ReviewSegment.severity == "detection") + & (ReviewSegment.end_time >= detection_cutoff) ) + ) + .order_by(ReviewSegment.start_time.asc()) + .namedtuples() + ) - deleted_recordings_list = [r.id for r in deleted_recordings] - for i in range(0, len(deleted_recordings_list), max_deletes): - Recordings.delete().where( - Recordings.id << deleted_recordings_list[i : i + max_deletes] - ).execute() + # Group reviews by camera for camera-scoped overlap checking + reviews_by_camera: dict[str, list] = defaultdict(list) + for review in active_reviews_raw: + reviews_by_camera[review.camera].append(review) + + # Classification pass: single iteration through recordings + overwritable: list = [] + event_retention: list = [] + protected: list = [] + + event_start = 0 + review_start_by_camera: dict[str, int] = defaultdict(int) + + for recording in recordings: + is_protected = False + is_event_retention = False + + # Check if recording overlaps with any retain_indefinitely event + for idx in range(event_start, len(retained_events)): + event = retained_events[idx] + + if event.start_time > recording.end_time: + break + + if event.end_time is None or event.end_time >= recording.start_time: + is_protected = True + break + + if event.end_time < recording.start_time: + event_start = idx + + # If not protected, check if recording overlaps with active reviews + if not is_protected: + cam = recording.camera + cam_reviews = reviews_by_camera.get(cam, []) + cam_review_start = review_start_by_camera[cam] + + for idx in range(cam_review_start, len(cam_reviews)): + review = cam_reviews[idx] + + if review.start_time > recording.end_time: + break + + if ( + review.end_time is None + or review.end_time >= recording.start_time + ): + is_event_retention = True + break + + if review.end_time < recording.start_time: + review_start_by_camera[cam] = idx + + # Classify + if is_protected: + protected.append(recording) + elif is_event_retention: + event_retention.append(recording) + else: + overwritable.append(recording) + + logger.debug( + "Rollover classification: %s overwritable, %s event_retention, %s protected", + len(overwritable), + len(event_retention), + len(protected), + ) + + # Multi-phase deletion + deleted_segments_size = 0.0 + deleted_recordings: list = [] + + # Phase 1: Delete overwritable recordings (oldest first) + for recording in overwritable: + if deleted_segments_size > hourly_bandwidth: + break + try: + clear_and_unlink(Path(recording.path), missing_ok=False) + deleted_recordings.append(recording) + deleted_segments_size += recording.segment_size + except FileNotFoundError: + pass + + # Phase 2: Delete event-retention recordings if still needed + if deleted_segments_size < hourly_bandwidth: + logger.warning( + "Overwritable recordings insufficient (%.1f MB of %.1f MB needed). " + "Deleting event-retention recordings.", + deleted_segments_size, + hourly_bandwidth, + ) + for recording in event_retention: + if deleted_segments_size > hourly_bandwidth: + break + try: + clear_and_unlink(Path(recording.path), missing_ok=False) + deleted_recordings.append(recording) + deleted_segments_size += recording.segment_size + except FileNotFoundError: + pass + + # Phase 3: Delete protected recordings as emergency last resort + if deleted_segments_size < hourly_bandwidth: + logger.error( + "Could not clear %.1f MB, currently %.1f MB cleared. " + "Protected retained recordings must be deleted.", + hourly_bandwidth, + deleted_segments_size, + ) + for recording in protected: + if deleted_segments_size > hourly_bandwidth: + break + try: + clear_and_unlink(Path(recording.path), missing_ok=False) + deleted_recordings.append(recording) + deleted_segments_size += recording.segment_size + except FileNotFoundError: + pass + else: + logger.info( + "Cleaned up %.1f MB of recordings (rollover mode)", + deleted_segments_size, + ) + + self._finalize_deleted_recordings(deleted_recordings) def run(self): """Check every 5 minutes if storage needs to be cleaned up.""" diff --git a/frigate/test/test_storage.py b/frigate/test/test_storage.py index b899c28f4..19659cd92 100644 --- a/frigate/test/test_storage.py +++ b/frigate/test/test_storage.py @@ -11,7 +11,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase from frigate.config import FrigateConfig -from frigate.models import Event, Recordings +from frigate.models import Event, Recordings, ReviewSegment from frigate.storage import StorageMaintainer from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS @@ -25,7 +25,7 @@ class TestHttp(unittest.TestCase): router.run() migrate_db.close() self.db = SqliteQueueDatabase(TEST_DB) - models = [Event, Recordings] + models = [Event, Recordings, ReviewSegment] self.db.bind(models) self.test_dir = tempfile.mkdtemp() @@ -285,6 +285,198 @@ class TestHttp(unittest.TestCase): # without errors in rollover mode storage.reduce_storage_consumption() + def _get_rollover_config(self): + """Return a config dict with rollover mode and event retention settings.""" + config = dict(self.minimal_config) + config["record"] = { + "enabled": True, + "retain_policy": "continuous_rollover", + "alerts": {"retain": {"days": 10}}, + "detections": {"retain": {"days": 5}}, + } + return config + + def test_rollover_deletes_overwritable_before_event_retention(self): + """Overwritable recordings are deleted first; event-retention survive.""" + config = FrigateConfig(**self._get_rollover_config()) + storage = StorageMaintainer(config, MagicMock()) + + now = datetime.datetime.now().timestamp() + time_old = now - 7200 + + # Insert overwritable recordings (no review overlap) — old, should be deleted + for i in range(60): + rec_id = f"{200000 + i}.overwritable" + _insert_mock_recording( + rec_id, + os.path.join(self.test_dir, f"{rec_id}.tmp"), + time_old + (i * 10), + time_old + ((i + 1) * 10), + ) + + # Insert event-retention recordings (overlap with active review segment) + time_event = now - 3600 + _insert_mock_review_segment( + "review_001", + time_event, + time_event + 30, + severity="alert", + ) + rec_event_id = "300000.event_retention" + _insert_mock_recording( + rec_event_id, + os.path.join(self.test_dir, f"{rec_event_id}.tmp"), + time_event, + time_event + 10, + ) + + storage.calculate_camera_bandwidth() + storage.reduce_storage_consumption() + + # Event-retention recording should survive + assert Recordings.get(Recordings.id == rec_event_id) + + def test_rollover_falls_through_to_event_retention(self): + """When overwritable is insufficient, event-retention recordings are deleted.""" + config = FrigateConfig(**self._get_rollover_config()) + storage = StorageMaintainer(config, MagicMock()) + + now = datetime.datetime.now().timestamp() + time_old = now - 7200 + + # Insert only 1 small overwritable recording (not enough to meet bandwidth) + rec_ow_id = "400000.overwritable" + _insert_mock_recording( + rec_ow_id, + os.path.join(self.test_dir, f"{rec_ow_id}.tmp"), + time_old, + time_old + 10, + seg_size=1, + ) + + # Insert many event-retention recordings + time_event = now - 3600 + _insert_mock_review_segment( + "review_002", + time_event, + time_event + 600, + severity="detection", + ) + for i in range(60): + rec_id = f"{400100 + i}.event_ret" + _insert_mock_recording( + rec_id, + os.path.join(self.test_dir, f"{rec_id}.tmp"), + time_event + (i * 10), + time_event + ((i + 1) * 10), + ) + + storage.calculate_camera_bandwidth() + storage.reduce_storage_consumption() + + # Overwritable should be deleted + with self.assertRaises(DoesNotExist): + Recordings.get(Recordings.id == rec_ow_id) + + # Some event-retention recordings should also be deleted + remaining = Recordings.select().count() + assert remaining < 61 # started with 61 total + + def test_rollover_protects_retained_events_last(self): + """Protected recordings are only deleted as emergency last resort.""" + config = FrigateConfig(**self._get_rollover_config()) + storage = StorageMaintainer(config, MagicMock()) + + now = datetime.datetime.now().timestamp() + time_old = now - 7200 + + # Insert protected recording (overlaps with retain_indefinitely event) + _insert_mock_event( + "evt_protected", + time_old, + time_old + 30, + retain=True, + ) + rec_protected_id = "500000.protected" + _insert_mock_recording( + rec_protected_id, + os.path.join(self.test_dir, f"{rec_protected_id}.tmp"), + time_old, + time_old + 10, + ) + + # Insert enough overwritable recordings to satisfy bandwidth + for i in range(60): + rec_id = f"{500100 + i}.overwritable" + _insert_mock_recording( + rec_id, + os.path.join(self.test_dir, f"{rec_id}.tmp"), + time_old + 100 + (i * 10), + time_old + 100 + ((i + 1) * 10), + ) + + storage.calculate_camera_bandwidth() + storage.reduce_storage_consumption() + + # Protected recording should survive when overwritable is sufficient + assert Recordings.get(Recordings.id == rec_protected_id) + + def test_rollover_keeps_protected_when_overwritable_suffices(self): + """Both protected and event-retention survive when overwritable frees enough.""" + config = FrigateConfig(**self._get_rollover_config()) + storage = StorageMaintainer(config, MagicMock()) + + now = datetime.datetime.now().timestamp() + time_old = now - 7200 + + # Protected recording + _insert_mock_event( + "evt_keep", + time_old, + time_old + 30, + retain=True, + ) + rec_prot_id = "600000.protected" + _insert_mock_recording( + rec_prot_id, + os.path.join(self.test_dir, f"{rec_prot_id}.tmp"), + time_old, + time_old + 10, + ) + + # Event-retention recording + time_event = now - 3600 + _insert_mock_review_segment( + "review_003", + time_event, + time_event + 30, + severity="alert", + ) + rec_evt_id = "600100.event_ret" + _insert_mock_recording( + rec_evt_id, + os.path.join(self.test_dir, f"{rec_evt_id}.tmp"), + time_event, + time_event + 10, + ) + + # Plenty of overwritable recordings + for i in range(60): + rec_id = f"{600200 + i}.overwritable" + _insert_mock_recording( + rec_id, + os.path.join(self.test_dir, f"{rec_id}.tmp"), + time_old + 100 + (i * 10), + time_old + 100 + ((i + 1) * 10), + ) + + storage.calculate_camera_bandwidth() + storage.reduce_storage_consumption() + + # Both protected and event-retention should survive + assert Recordings.get(Recordings.id == rec_prot_id) + assert Recordings.get(Recordings.id == rec_evt_id) + def _insert_mock_event( id: str, @@ -339,3 +531,22 @@ def _insert_mock_recording( objects=True, segment_size=seg_size, ).execute() + + +def _insert_mock_review_segment( + id: str, + start: float, + end: float, + severity: str = "alert", + camera: str = "front_door", +) -> None: + """Inserts a basic review segment model with a given id.""" + ReviewSegment.insert( + id=id, + camera=camera, + start_time=start, + end_time=end, + severity=severity, + thumb_path=f"/tmp/{id}.jpg", + data={}, + ).execute()