From 8a32071790640c8ed4bbf2e10d6e89b25688e8cc Mon Sep 17 00:00:00 2001 From: Nick Mowen Date: Mon, 26 Sep 2022 18:00:58 -0600 Subject: [PATCH] Rewrite storage cleanup logic to be much more efficient --- frigate/storage.py | 156 ++++++++++++++------------------------------- 1 file changed, 49 insertions(+), 107 deletions(-) diff --git a/frigate/storage.py b/frigate/storage.py index 8bde88837..ac5c69e7d 100644 --- a/frigate/storage.py +++ b/frigate/storage.py @@ -5,15 +5,13 @@ from pathlib import Path import shutil import threading -from peewee import SqliteDatabase, operator, fn, DoesNotExist +from peewee import fn from frigate.config import FrigateConfig from frigate.const import RECORD_DIR from frigate.models import Event, Recordings logger = logging.getLogger(__name__) - - bandwidth_equation = Recordings.segment_size / ( Recordings.end_time - Recordings.start_time ) @@ -22,15 +20,14 @@ bandwidth_equation = Recordings.segment_size / ( class StorageMaintainer(threading.Thread): """Maintain frigates recording storage.""" - def __init__(self, config: FrigateConfig, stop_event): + def __init__(self, config: FrigateConfig, stop_event) -> None: threading.Thread.__init__(self) self.name = "recording_cleanup" self.config = config self.stop_event = stop_event self.camera_storage_stats: dict[str, dict] = {} - self.avg_segment_sizes: dict[str, dict] = {} - def calculate_camera_bandwidth(self): + def calculate_camera_bandwidth(self) -> None: """Calculate an average MB/hr for each camera.""" for camera in self.config.cameras.keys(): # cameras with < 50 segments should be refreshed to keep size accurate @@ -55,7 +52,7 @@ class StorageMaintainer(threading.Thread): 2, ) self.camera_storage_stats[camera]["bandwidth"] = bandwidth - logger.debug(f"{camera} has a bandwidth of {bandwidth} MB/hr") + logger.debug(f"{camera} has a bandwidth of {bandwidth} MB/hr.") def check_storage_needs_cleanup(self) -> bool: """Return if storage needs cleanup.""" @@ -66,20 +63,35 @@ class StorageMaintainer(threading.Thread): ) remaining_storage = round(shutil.disk_usage(RECORD_DIR).free / 1000000, 1) logger.debug( - f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}" + f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}." ) return remaining_storage < hourly_bandwidth - def delete_recording_segments( - self, recordings, retained_events, segment_count: int - ) -> set[str]: - """Delete Recording Segments""" - # loop over recordings and see if they overlap with any retained events - event_start = 0 + def reduce_storage_consumption(self) -> None: + """Remove oldest hour of recordings.""" + logger.debug("Starting storage cleanup.") + deleted_segments_size = 0 + hourly_bandwidth = sum( + [b["bandwidth"] for b in self.camera_storage_stats.values()] + ) + + recordings: Recordings = Recordings.select().order_by( + Recordings.start_time.asc() + ) + retained_events: Event = ( + Event.select() + .where( + Event.retain_indefinitely == True, + Event.has_clip, + ) + .order_by(Event.start_time.asc()) + .objects() + ) + deleted_recordings = set() for recording in recordings.objects().iterator(): - # check if 2 hours of recordings have been deleted - if len(deleted_recordings) >= segment_count: + # check if 1 hour of storage has been reclaimed + if deleted_segments_size > hourly_bandwidth: break keep = False @@ -109,109 +121,39 @@ class StorageMaintainer(threading.Thread): # Delete recordings not retained indefinitely if not keep: + deleted_segments_size += recording.segment_size Path(recording.path).unlink(missing_ok=True) deleted_recordings.add(recording.id) - return deleted_recordings + # need to delete retained segments + if deleted_recordings < hourly_bandwidth: + logger.error("Could not clear enough storage, retained recordings must be deleted.") + for recording in recordings.objects().iterator(): + if deleted_segments_size > hourly_bandwidth: + break - def reduce_storage_consumption(self) -> None: - """Cleanup the last 2 hours of recordings.""" - logger.debug("Start all cameras.") - for camera in self.config.cameras.keys(): - logger.debug(f"Start camera: {camera}.") - # Get last 24 hours of recordings segments - segment_count = int( - 7200 / self.avg_segment_sizes[camera]["segment_duration"] - ) - recordings: Recordings = ( - Recordings.select() - .where(Recordings.camera == camera) - .order_by(Recordings.start_time.asc()) - .limit(segment_count * 12) - ) + deleted_segments_size += recording.segment_size + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) - if len(recordings) == 0: - continue - - # cameras that are only recording part time - # should not be forced to have 2 hours of - # recordings disabled - # NOTE: A camera is considered a part time recorder - # if it has less than 12 hours of recordings saved - limited_recorder = len(recordings) < (segment_count * 6) - - # Get retained events to check against - retained_events: Event = ( - Event.select() - .where( - Event.camera == camera, - Event.retain_indefinitely == True, - Event.has_clip, - ) - .order_by(Event.start_time.asc()) - .objects() - ) - - deleted_recordings: set[str] = self.delete_recording_segments( - recordings, retained_events, segment_count - ) - - # check if 2 hours of segments were deleted from the 24 hours retrieved - # if camera is a limited recorder then do not force removal - if not limited_recorder and len(deleted_recordings) < segment_count: - logger.debug( - f"segment target of {segment_count} > {len(deleted_recordings)}, pulling all non-retained recordings" - ) - # get the rest of the recording segments to look through - recordings: Recordings = ( - Recordings.select() - .where(Recordings.camera == camera) - .order_by(Recordings.start_time.asc()) - ) - second_run: set[str] = self.delete_recording_segments( - recordings, retained_events, segment_count - ) - deleted_recordings = deleted_recordings.union(second_run) - - # check if still 2 hour quota still not meant - if not limited_recorder and len(deleted_recordings) < segment_count: - logger.debug( - f"segment target of {segment_count} > {len(deleted_recordings)}, pulling all recordings" - ) - recordings: Recordings = ( - Recordings.select() - .where(Recordings.camera == camera) - .order_by(Recordings.start_time.asc()) - ) - # delete segments including retained events - last_run: set[str] = self.delete_recording_segments( - recordings, [], segment_count - ) - deleted_recordings = deleted_recordings.union(last_run) - - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - # delete up to 100,000 at a time - max_deletes = 100000 - deleted_recordings_list = list(deleted_recordings) - for i in range(0, len(deleted_recordings_list), max_deletes): - Recordings.delete().where( - Recordings.id << deleted_recordings_list[i : i + max_deletes] - ).execute() - - logger.debug(f"End camera: {camera}.") - - logger.debug("End all cameras.") - logger.debug("End storage cleanup.") + logger.debug(f"Expiring {len(deleted_recordings)} recordings") + # delete up to 100,000 at a time + max_deletes = 100000 + deleted_recordings_list = list(deleted_recordings) + for i in range(0, len(deleted_recordings_list), max_deletes): + Recordings.delete().where( + Recordings.id << deleted_recordings_list[i : i + max_deletes] + ).execute() def run(self): - # Check storage consumption every 5 minutes - while not self.stop_event.wait(300): + """Check every 5 minutes if storage needs to be cleaned up.""" + while not self.stop_event.wait(20): if not self.camera_storage_stats or False in [ r["needs_refresh"] for r in self.camera_storage_stats.values() ]: self.calculate_camera_bandwidth() - logger.debug(f"Default camera bandwidths: {self.camera_storage_stats}") + logger.debug(f"Default camera bandwidths: {self.camera_storage_stats}.") if self.check_storage_needs_cleanup(): self.reduce_storage_consumption()