mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-03-10 10:33:11 +03:00
feat: add smart prioritized deletion for rollover storage mode
StorageMaintainer now uses three-tier prioritized deletion when in continuous_rollover mode instead of naive oldest-first: 1. Overwritable: continuous recordings with no event/review overlap 2. Event retention: recordings tied to active review segments 3. Protected: recordings tied to retain_indefinitely events (last resort) This ensures event-associated footage is preserved longer than plain continuous filler when disk space runs low. The default time-based policy path is completely unchanged. - Extract _finalize_deleted_recordings() helper to share cleanup logic - Add camera-scoped review segment overlap checking - Add 4 new tests covering prioritized deletion scenarios
This commit is contained in:
parent
5ed2e15172
commit
01f32b1b46
@ -1,15 +1,18 @@
|
||||
"""Handle storage retention and usage."""
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
import shutil
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
|
||||
from peewee import SQL, fn
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.config import FrigateConfig, RetainPolicyEnum
|
||||
from frigate.const import RECORD_DIR
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.models import Event, Recordings, ReviewSegment
|
||||
from frigate.util.builtin import clear_and_unlink
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -108,8 +111,65 @@ class StorageMaintainer(threading.Thread):
|
||||
)
|
||||
return remaining_storage < hourly_bandwidth
|
||||
|
||||
def _finalize_deleted_recordings(self, deleted_recordings: list) -> None:
|
||||
"""Delete recording DB rows and update has_clip on affected events."""
|
||||
logger.debug("Expiring %s recordings", len(deleted_recordings))
|
||||
max_deletes = 100000
|
||||
|
||||
# Update has_clip for events that overlap with deleted recordings
|
||||
if deleted_recordings:
|
||||
# Group deleted recordings by camera
|
||||
camera_recordings: dict[str, dict] = {}
|
||||
for recording in deleted_recordings:
|
||||
if recording.camera not in camera_recordings:
|
||||
camera_recordings[recording.camera] = {
|
||||
"min_start": recording.start_time,
|
||||
"max_end": recording.end_time,
|
||||
}
|
||||
else:
|
||||
camera_recordings[recording.camera]["min_start"] = min(
|
||||
camera_recordings[recording.camera]["min_start"],
|
||||
recording.start_time,
|
||||
)
|
||||
camera_recordings[recording.camera]["max_end"] = max(
|
||||
camera_recordings[recording.camera]["max_end"],
|
||||
recording.end_time,
|
||||
)
|
||||
|
||||
# Find all events that overlap with deleted recordings time range per camera
|
||||
events_to_update = []
|
||||
for camera, time_range in camera_recordings.items():
|
||||
overlapping_events = Event.select(Event.id).where(
|
||||
Event.camera == camera,
|
||||
Event.has_clip == True,
|
||||
Event.start_time < time_range["max_end"],
|
||||
Event.end_time > time_range["min_start"],
|
||||
)
|
||||
|
||||
for event in overlapping_events:
|
||||
events_to_update.append(event.id)
|
||||
|
||||
# Update has_clip to False for overlapping events
|
||||
if events_to_update:
|
||||
for i in range(0, len(events_to_update), max_deletes):
|
||||
batch = events_to_update[i : i + max_deletes]
|
||||
Event.update(has_clip=False).where(Event.id << batch).execute()
|
||||
logger.debug(
|
||||
"Updated has_clip to False for %s events",
|
||||
len(events_to_update),
|
||||
)
|
||||
|
||||
deleted_recordings_list = [r.id for r in deleted_recordings]
|
||||
for i in range(0, len(deleted_recordings_list), max_deletes):
|
||||
Recordings.delete().where(
|
||||
Recordings.id << deleted_recordings_list[i : i + max_deletes]
|
||||
).execute()
|
||||
|
||||
def reduce_storage_consumption(self) -> None:
|
||||
"""Remove oldest hour of recordings."""
|
||||
if self.config.record.retain_policy == RetainPolicyEnum.continuous_rollover:
|
||||
return self._reduce_storage_rollover()
|
||||
|
||||
logger.debug("Starting storage cleanup.")
|
||||
deleted_segments_size = 0
|
||||
hourly_bandwidth = sum(
|
||||
@ -218,57 +278,209 @@ class StorageMaintainer(threading.Thread):
|
||||
else:
|
||||
logger.info(f"Cleaned up {deleted_segments_size} MB of recordings")
|
||||
|
||||
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
|
||||
# delete up to 100,000 at a time
|
||||
max_deletes = 100000
|
||||
self._finalize_deleted_recordings(deleted_recordings)
|
||||
|
||||
# Update has_clip for events that overlap with deleted recordings
|
||||
if deleted_recordings:
|
||||
# Group deleted recordings by camera
|
||||
camera_recordings = {}
|
||||
for recording in deleted_recordings:
|
||||
if recording.camera not in camera_recordings:
|
||||
camera_recordings[recording.camera] = {
|
||||
"min_start": recording.start_time,
|
||||
"max_end": recording.end_time,
|
||||
}
|
||||
else:
|
||||
camera_recordings[recording.camera]["min_start"] = min(
|
||||
camera_recordings[recording.camera]["min_start"],
|
||||
recording.start_time,
|
||||
)
|
||||
camera_recordings[recording.camera]["max_end"] = max(
|
||||
camera_recordings[recording.camera]["max_end"],
|
||||
recording.end_time,
|
||||
)
|
||||
def _reduce_storage_rollover(self) -> None:
|
||||
"""Remove recordings using smart prioritized deletion for rollover mode.
|
||||
|
||||
# Find all events that overlap with deleted recordings time range per camera
|
||||
events_to_update = []
|
||||
for camera, time_range in camera_recordings.items():
|
||||
overlapping_events = Event.select(Event.id).where(
|
||||
Event.camera == camera,
|
||||
Event.has_clip == True,
|
||||
Event.start_time < time_range["max_end"],
|
||||
Event.end_time > time_range["min_start"],
|
||||
Deletion priority:
|
||||
1. Overwritable: continuous recordings with no event/review overlap (oldest first)
|
||||
2. Event retention: recordings overlapping active review segments (oldest first)
|
||||
3. Protected: recordings overlapping retain_indefinitely events (emergency only)
|
||||
"""
|
||||
logger.debug("Starting smart rollover storage cleanup.")
|
||||
hourly_bandwidth = sum(
|
||||
b["bandwidth"] for b in self.camera_storage_stats.values()
|
||||
)
|
||||
now = datetime.datetime.now().timestamp()
|
||||
|
||||
# Compute retention cutoffs for review segments
|
||||
alert_cutoff = (
|
||||
now - timedelta(days=self.config.record.alerts.retain.days).total_seconds()
|
||||
)
|
||||
detection_cutoff = (
|
||||
now
|
||||
- timedelta(days=self.config.record.detections.retain.days).total_seconds()
|
||||
)
|
||||
|
||||
# Query 1: All recordings, oldest first
|
||||
recordings: Recordings = (
|
||||
Recordings.select(
|
||||
Recordings.id,
|
||||
Recordings.camera,
|
||||
Recordings.start_time,
|
||||
Recordings.end_time,
|
||||
Recordings.segment_size,
|
||||
Recordings.path,
|
||||
)
|
||||
.order_by(Recordings.start_time.asc())
|
||||
.namedtuples()
|
||||
.iterator()
|
||||
)
|
||||
|
||||
# Query 2: Protected events (retain_indefinitely), sorted
|
||||
# No camera filter — matches existing StorageMaintainer behavior
|
||||
retained_events: list = list(
|
||||
Event.select(
|
||||
Event.start_time,
|
||||
Event.end_time,
|
||||
)
|
||||
.where(
|
||||
Event.retain_indefinitely == True,
|
||||
Event.has_clip,
|
||||
)
|
||||
.order_by(Event.start_time.asc())
|
||||
.namedtuples()
|
||||
)
|
||||
|
||||
# Query 3: Non-expired review segments, sorted by start_time
|
||||
# Camera-scoped: only protect recordings from the same camera
|
||||
active_reviews_raw: list = list(
|
||||
ReviewSegment.select(
|
||||
ReviewSegment.camera,
|
||||
ReviewSegment.start_time,
|
||||
ReviewSegment.end_time,
|
||||
)
|
||||
.where(
|
||||
(ReviewSegment.end_time.is_null())
|
||||
| (
|
||||
(ReviewSegment.severity == "alert")
|
||||
& (ReviewSegment.end_time >= alert_cutoff)
|
||||
)
|
||||
|
||||
for event in overlapping_events:
|
||||
events_to_update.append(event.id)
|
||||
|
||||
# Update has_clip to False for overlapping events
|
||||
if events_to_update:
|
||||
for i in range(0, len(events_to_update), max_deletes):
|
||||
batch = events_to_update[i : i + max_deletes]
|
||||
Event.update(has_clip=False).where(Event.id << batch).execute()
|
||||
logger.debug(
|
||||
f"Updated has_clip to False for {len(events_to_update)} events"
|
||||
| (
|
||||
(ReviewSegment.severity == "detection")
|
||||
& (ReviewSegment.end_time >= detection_cutoff)
|
||||
)
|
||||
)
|
||||
.order_by(ReviewSegment.start_time.asc())
|
||||
.namedtuples()
|
||||
)
|
||||
|
||||
deleted_recordings_list = [r.id for r in deleted_recordings]
|
||||
for i in range(0, len(deleted_recordings_list), max_deletes):
|
||||
Recordings.delete().where(
|
||||
Recordings.id << deleted_recordings_list[i : i + max_deletes]
|
||||
).execute()
|
||||
# Group reviews by camera for camera-scoped overlap checking
|
||||
reviews_by_camera: dict[str, list] = defaultdict(list)
|
||||
for review in active_reviews_raw:
|
||||
reviews_by_camera[review.camera].append(review)
|
||||
|
||||
# Classification pass: single iteration through recordings
|
||||
overwritable: list = []
|
||||
event_retention: list = []
|
||||
protected: list = []
|
||||
|
||||
event_start = 0
|
||||
review_start_by_camera: dict[str, int] = defaultdict(int)
|
||||
|
||||
for recording in recordings:
|
||||
is_protected = False
|
||||
is_event_retention = False
|
||||
|
||||
# Check if recording overlaps with any retain_indefinitely event
|
||||
for idx in range(event_start, len(retained_events)):
|
||||
event = retained_events[idx]
|
||||
|
||||
if event.start_time > recording.end_time:
|
||||
break
|
||||
|
||||
if event.end_time is None or event.end_time >= recording.start_time:
|
||||
is_protected = True
|
||||
break
|
||||
|
||||
if event.end_time < recording.start_time:
|
||||
event_start = idx
|
||||
|
||||
# If not protected, check if recording overlaps with active reviews
|
||||
if not is_protected:
|
||||
cam = recording.camera
|
||||
cam_reviews = reviews_by_camera.get(cam, [])
|
||||
cam_review_start = review_start_by_camera[cam]
|
||||
|
||||
for idx in range(cam_review_start, len(cam_reviews)):
|
||||
review = cam_reviews[idx]
|
||||
|
||||
if review.start_time > recording.end_time:
|
||||
break
|
||||
|
||||
if (
|
||||
review.end_time is None
|
||||
or review.end_time >= recording.start_time
|
||||
):
|
||||
is_event_retention = True
|
||||
break
|
||||
|
||||
if review.end_time < recording.start_time:
|
||||
review_start_by_camera[cam] = idx
|
||||
|
||||
# Classify
|
||||
if is_protected:
|
||||
protected.append(recording)
|
||||
elif is_event_retention:
|
||||
event_retention.append(recording)
|
||||
else:
|
||||
overwritable.append(recording)
|
||||
|
||||
logger.debug(
|
||||
"Rollover classification: %s overwritable, %s event_retention, %s protected",
|
||||
len(overwritable),
|
||||
len(event_retention),
|
||||
len(protected),
|
||||
)
|
||||
|
||||
# Multi-phase deletion
|
||||
deleted_segments_size = 0.0
|
||||
deleted_recordings: list = []
|
||||
|
||||
# Phase 1: Delete overwritable recordings (oldest first)
|
||||
for recording in overwritable:
|
||||
if deleted_segments_size > hourly_bandwidth:
|
||||
break
|
||||
try:
|
||||
clear_and_unlink(Path(recording.path), missing_ok=False)
|
||||
deleted_recordings.append(recording)
|
||||
deleted_segments_size += recording.segment_size
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Phase 2: Delete event-retention recordings if still needed
|
||||
if deleted_segments_size < hourly_bandwidth:
|
||||
logger.warning(
|
||||
"Overwritable recordings insufficient (%.1f MB of %.1f MB needed). "
|
||||
"Deleting event-retention recordings.",
|
||||
deleted_segments_size,
|
||||
hourly_bandwidth,
|
||||
)
|
||||
for recording in event_retention:
|
||||
if deleted_segments_size > hourly_bandwidth:
|
||||
break
|
||||
try:
|
||||
clear_and_unlink(Path(recording.path), missing_ok=False)
|
||||
deleted_recordings.append(recording)
|
||||
deleted_segments_size += recording.segment_size
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
# Phase 3: Delete protected recordings as emergency last resort
|
||||
if deleted_segments_size < hourly_bandwidth:
|
||||
logger.error(
|
||||
"Could not clear %.1f MB, currently %.1f MB cleared. "
|
||||
"Protected retained recordings must be deleted.",
|
||||
hourly_bandwidth,
|
||||
deleted_segments_size,
|
||||
)
|
||||
for recording in protected:
|
||||
if deleted_segments_size > hourly_bandwidth:
|
||||
break
|
||||
try:
|
||||
clear_and_unlink(Path(recording.path), missing_ok=False)
|
||||
deleted_recordings.append(recording)
|
||||
deleted_segments_size += recording.segment_size
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
logger.info(
|
||||
"Cleaned up %.1f MB of recordings (rollover mode)",
|
||||
deleted_segments_size,
|
||||
)
|
||||
|
||||
self._finalize_deleted_recordings(deleted_recordings)
|
||||
|
||||
def run(self):
|
||||
"""Check every 5 minutes if storage needs to be cleaned up."""
|
||||
|
||||
@ -11,7 +11,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase
|
||||
from playhouse.sqliteq import SqliteQueueDatabase
|
||||
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.models import Event, Recordings, ReviewSegment
|
||||
from frigate.storage import StorageMaintainer
|
||||
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
|
||||
|
||||
@ -25,7 +25,7 @@ class TestHttp(unittest.TestCase):
|
||||
router.run()
|
||||
migrate_db.close()
|
||||
self.db = SqliteQueueDatabase(TEST_DB)
|
||||
models = [Event, Recordings]
|
||||
models = [Event, Recordings, ReviewSegment]
|
||||
self.db.bind(models)
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
|
||||
@ -285,6 +285,198 @@ class TestHttp(unittest.TestCase):
|
||||
# without errors in rollover mode
|
||||
storage.reduce_storage_consumption()
|
||||
|
||||
def _get_rollover_config(self):
|
||||
"""Return a config dict with rollover mode and event retention settings."""
|
||||
config = dict(self.minimal_config)
|
||||
config["record"] = {
|
||||
"enabled": True,
|
||||
"retain_policy": "continuous_rollover",
|
||||
"alerts": {"retain": {"days": 10}},
|
||||
"detections": {"retain": {"days": 5}},
|
||||
}
|
||||
return config
|
||||
|
||||
def test_rollover_deletes_overwritable_before_event_retention(self):
|
||||
"""Overwritable recordings are deleted first; event-retention survive."""
|
||||
config = FrigateConfig(**self._get_rollover_config())
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
now = datetime.datetime.now().timestamp()
|
||||
time_old = now - 7200
|
||||
|
||||
# Insert overwritable recordings (no review overlap) — old, should be deleted
|
||||
for i in range(60):
|
||||
rec_id = f"{200000 + i}.overwritable"
|
||||
_insert_mock_recording(
|
||||
rec_id,
|
||||
os.path.join(self.test_dir, f"{rec_id}.tmp"),
|
||||
time_old + (i * 10),
|
||||
time_old + ((i + 1) * 10),
|
||||
)
|
||||
|
||||
# Insert event-retention recordings (overlap with active review segment)
|
||||
time_event = now - 3600
|
||||
_insert_mock_review_segment(
|
||||
"review_001",
|
||||
time_event,
|
||||
time_event + 30,
|
||||
severity="alert",
|
||||
)
|
||||
rec_event_id = "300000.event_retention"
|
||||
_insert_mock_recording(
|
||||
rec_event_id,
|
||||
os.path.join(self.test_dir, f"{rec_event_id}.tmp"),
|
||||
time_event,
|
||||
time_event + 10,
|
||||
)
|
||||
|
||||
storage.calculate_camera_bandwidth()
|
||||
storage.reduce_storage_consumption()
|
||||
|
||||
# Event-retention recording should survive
|
||||
assert Recordings.get(Recordings.id == rec_event_id)
|
||||
|
||||
def test_rollover_falls_through_to_event_retention(self):
|
||||
"""When overwritable is insufficient, event-retention recordings are deleted."""
|
||||
config = FrigateConfig(**self._get_rollover_config())
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
now = datetime.datetime.now().timestamp()
|
||||
time_old = now - 7200
|
||||
|
||||
# Insert only 1 small overwritable recording (not enough to meet bandwidth)
|
||||
rec_ow_id = "400000.overwritable"
|
||||
_insert_mock_recording(
|
||||
rec_ow_id,
|
||||
os.path.join(self.test_dir, f"{rec_ow_id}.tmp"),
|
||||
time_old,
|
||||
time_old + 10,
|
||||
seg_size=1,
|
||||
)
|
||||
|
||||
# Insert many event-retention recordings
|
||||
time_event = now - 3600
|
||||
_insert_mock_review_segment(
|
||||
"review_002",
|
||||
time_event,
|
||||
time_event + 600,
|
||||
severity="detection",
|
||||
)
|
||||
for i in range(60):
|
||||
rec_id = f"{400100 + i}.event_ret"
|
||||
_insert_mock_recording(
|
||||
rec_id,
|
||||
os.path.join(self.test_dir, f"{rec_id}.tmp"),
|
||||
time_event + (i * 10),
|
||||
time_event + ((i + 1) * 10),
|
||||
)
|
||||
|
||||
storage.calculate_camera_bandwidth()
|
||||
storage.reduce_storage_consumption()
|
||||
|
||||
# Overwritable should be deleted
|
||||
with self.assertRaises(DoesNotExist):
|
||||
Recordings.get(Recordings.id == rec_ow_id)
|
||||
|
||||
# Some event-retention recordings should also be deleted
|
||||
remaining = Recordings.select().count()
|
||||
assert remaining < 61 # started with 61 total
|
||||
|
||||
def test_rollover_protects_retained_events_last(self):
|
||||
"""Protected recordings are only deleted as emergency last resort."""
|
||||
config = FrigateConfig(**self._get_rollover_config())
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
now = datetime.datetime.now().timestamp()
|
||||
time_old = now - 7200
|
||||
|
||||
# Insert protected recording (overlaps with retain_indefinitely event)
|
||||
_insert_mock_event(
|
||||
"evt_protected",
|
||||
time_old,
|
||||
time_old + 30,
|
||||
retain=True,
|
||||
)
|
||||
rec_protected_id = "500000.protected"
|
||||
_insert_mock_recording(
|
||||
rec_protected_id,
|
||||
os.path.join(self.test_dir, f"{rec_protected_id}.tmp"),
|
||||
time_old,
|
||||
time_old + 10,
|
||||
)
|
||||
|
||||
# Insert enough overwritable recordings to satisfy bandwidth
|
||||
for i in range(60):
|
||||
rec_id = f"{500100 + i}.overwritable"
|
||||
_insert_mock_recording(
|
||||
rec_id,
|
||||
os.path.join(self.test_dir, f"{rec_id}.tmp"),
|
||||
time_old + 100 + (i * 10),
|
||||
time_old + 100 + ((i + 1) * 10),
|
||||
)
|
||||
|
||||
storage.calculate_camera_bandwidth()
|
||||
storage.reduce_storage_consumption()
|
||||
|
||||
# Protected recording should survive when overwritable is sufficient
|
||||
assert Recordings.get(Recordings.id == rec_protected_id)
|
||||
|
||||
def test_rollover_keeps_protected_when_overwritable_suffices(self):
|
||||
"""Both protected and event-retention survive when overwritable frees enough."""
|
||||
config = FrigateConfig(**self._get_rollover_config())
|
||||
storage = StorageMaintainer(config, MagicMock())
|
||||
|
||||
now = datetime.datetime.now().timestamp()
|
||||
time_old = now - 7200
|
||||
|
||||
# Protected recording
|
||||
_insert_mock_event(
|
||||
"evt_keep",
|
||||
time_old,
|
||||
time_old + 30,
|
||||
retain=True,
|
||||
)
|
||||
rec_prot_id = "600000.protected"
|
||||
_insert_mock_recording(
|
||||
rec_prot_id,
|
||||
os.path.join(self.test_dir, f"{rec_prot_id}.tmp"),
|
||||
time_old,
|
||||
time_old + 10,
|
||||
)
|
||||
|
||||
# Event-retention recording
|
||||
time_event = now - 3600
|
||||
_insert_mock_review_segment(
|
||||
"review_003",
|
||||
time_event,
|
||||
time_event + 30,
|
||||
severity="alert",
|
||||
)
|
||||
rec_evt_id = "600100.event_ret"
|
||||
_insert_mock_recording(
|
||||
rec_evt_id,
|
||||
os.path.join(self.test_dir, f"{rec_evt_id}.tmp"),
|
||||
time_event,
|
||||
time_event + 10,
|
||||
)
|
||||
|
||||
# Plenty of overwritable recordings
|
||||
for i in range(60):
|
||||
rec_id = f"{600200 + i}.overwritable"
|
||||
_insert_mock_recording(
|
||||
rec_id,
|
||||
os.path.join(self.test_dir, f"{rec_id}.tmp"),
|
||||
time_old + 100 + (i * 10),
|
||||
time_old + 100 + ((i + 1) * 10),
|
||||
)
|
||||
|
||||
storage.calculate_camera_bandwidth()
|
||||
storage.reduce_storage_consumption()
|
||||
|
||||
# Both protected and event-retention should survive
|
||||
assert Recordings.get(Recordings.id == rec_prot_id)
|
||||
assert Recordings.get(Recordings.id == rec_evt_id)
|
||||
|
||||
|
||||
def _insert_mock_event(
|
||||
id: str,
|
||||
@ -339,3 +531,22 @@ def _insert_mock_recording(
|
||||
objects=True,
|
||||
segment_size=seg_size,
|
||||
).execute()
|
||||
|
||||
|
||||
def _insert_mock_review_segment(
|
||||
id: str,
|
||||
start: float,
|
||||
end: float,
|
||||
severity: str = "alert",
|
||||
camera: str = "front_door",
|
||||
) -> None:
|
||||
"""Inserts a basic review segment model with a given id."""
|
||||
ReviewSegment.insert(
|
||||
id=id,
|
||||
camera=camera,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
severity=severity,
|
||||
thumb_path=f"/tmp/{id}.jpg",
|
||||
data={},
|
||||
).execute()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user