Rewrite storage cleanup logic to be much more efficient

This commit is contained in:
Nick Mowen 2022-09-26 18:00:58 -06:00
parent dfb9d9ce70
commit 8a32071790

View File

@ -5,15 +5,13 @@ from pathlib import Path
import shutil import shutil
import threading import threading
from peewee import SqliteDatabase, operator, fn, DoesNotExist from peewee import fn
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR from frigate.const import RECORD_DIR
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
bandwidth_equation = Recordings.segment_size / ( bandwidth_equation = Recordings.segment_size / (
Recordings.end_time - Recordings.start_time Recordings.end_time - Recordings.start_time
) )
@ -22,15 +20,14 @@ bandwidth_equation = Recordings.segment_size / (
class StorageMaintainer(threading.Thread): class StorageMaintainer(threading.Thread):
"""Maintain frigates recording storage.""" """Maintain frigates recording storage."""
def __init__(self, config: FrigateConfig, stop_event): def __init__(self, config: FrigateConfig, stop_event) -> None:
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "recording_cleanup" self.name = "recording_cleanup"
self.config = config self.config = config
self.stop_event = stop_event self.stop_event = stop_event
self.camera_storage_stats: dict[str, dict] = {} self.camera_storage_stats: dict[str, dict] = {}
self.avg_segment_sizes: dict[str, dict] = {}
def calculate_camera_bandwidth(self): def calculate_camera_bandwidth(self) -> None:
"""Calculate an average MB/hr for each camera.""" """Calculate an average MB/hr for each camera."""
for camera in self.config.cameras.keys(): for camera in self.config.cameras.keys():
# cameras with < 50 segments should be refreshed to keep size accurate # cameras with < 50 segments should be refreshed to keep size accurate
@ -55,7 +52,7 @@ class StorageMaintainer(threading.Thread):
2, 2,
) )
self.camera_storage_stats[camera]["bandwidth"] = bandwidth self.camera_storage_stats[camera]["bandwidth"] = bandwidth
logger.debug(f"{camera} has a bandwidth of {bandwidth} MB/hr") logger.debug(f"{camera} has a bandwidth of {bandwidth} MB/hr.")
def check_storage_needs_cleanup(self) -> bool: def check_storage_needs_cleanup(self) -> bool:
"""Return if storage needs cleanup.""" """Return if storage needs cleanup."""
@ -66,20 +63,35 @@ class StorageMaintainer(threading.Thread):
) )
remaining_storage = round(shutil.disk_usage(RECORD_DIR).free / 1000000, 1) remaining_storage = round(shutil.disk_usage(RECORD_DIR).free / 1000000, 1)
logger.debug( logger.debug(
f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}" f"Storage cleanup check: {hourly_bandwidth} hourly with remaining storage: {remaining_storage}."
) )
return remaining_storage < hourly_bandwidth return remaining_storage < hourly_bandwidth
def delete_recording_segments( def reduce_storage_consumption(self) -> None:
self, recordings, retained_events, segment_count: int """Remove oldest hour of recordings."""
) -> set[str]: logger.debug("Starting storage cleanup.")
"""Delete Recording Segments""" deleted_segments_size = 0
# loop over recordings and see if they overlap with any retained events hourly_bandwidth = sum(
event_start = 0 [b["bandwidth"] for b in self.camera_storage_stats.values()]
)
recordings: Recordings = Recordings.select().order_by(
Recordings.start_time.asc()
)
retained_events: Event = (
Event.select()
.where(
Event.retain_indefinitely == True,
Event.has_clip,
)
.order_by(Event.start_time.asc())
.objects()
)
deleted_recordings = set() deleted_recordings = set()
for recording in recordings.objects().iterator(): for recording in recordings.objects().iterator():
# check if 2 hours of recordings have been deleted # check if 1 hour of storage has been reclaimed
if len(deleted_recordings) >= segment_count: if deleted_segments_size > hourly_bandwidth:
break break
keep = False keep = False
@ -109,109 +121,39 @@ class StorageMaintainer(threading.Thread):
# Delete recordings not retained indefinitely # Delete recordings not retained indefinitely
if not keep: if not keep:
deleted_segments_size += recording.segment_size
Path(recording.path).unlink(missing_ok=True) Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id) deleted_recordings.add(recording.id)
return deleted_recordings # need to delete retained segments
if deleted_recordings < hourly_bandwidth:
logger.error("Could not clear enough storage, retained recordings must be deleted.")
for recording in recordings.objects().iterator():
if deleted_segments_size > hourly_bandwidth:
break
def reduce_storage_consumption(self) -> None: deleted_segments_size += recording.segment_size
"""Cleanup the last 2 hours of recordings.""" Path(recording.path).unlink(missing_ok=True)
logger.debug("Start all cameras.") deleted_recordings.add(recording.id)
for camera in self.config.cameras.keys():
logger.debug(f"Start camera: {camera}.")
# Get last 24 hours of recordings segments
segment_count = int(
7200 / self.avg_segment_sizes[camera]["segment_duration"]
)
recordings: Recordings = (
Recordings.select()
.where(Recordings.camera == camera)
.order_by(Recordings.start_time.asc())
.limit(segment_count * 12)
)
if len(recordings) == 0: logger.debug(f"Expiring {len(deleted_recordings)} recordings")
continue # delete up to 100,000 at a time
max_deletes = 100000
# cameras that are only recording part time deleted_recordings_list = list(deleted_recordings)
# should not be forced to have 2 hours of for i in range(0, len(deleted_recordings_list), max_deletes):
# recordings disabled Recordings.delete().where(
# NOTE: A camera is considered a part time recorder Recordings.id << deleted_recordings_list[i : i + max_deletes]
# if it has less than 12 hours of recordings saved ).execute()
limited_recorder = len(recordings) < (segment_count * 6)
# Get retained events to check against
retained_events: Event = (
Event.select()
.where(
Event.camera == camera,
Event.retain_indefinitely == True,
Event.has_clip,
)
.order_by(Event.start_time.asc())
.objects()
)
deleted_recordings: set[str] = self.delete_recording_segments(
recordings, retained_events, segment_count
)
# check if 2 hours of segments were deleted from the 24 hours retrieved
# if camera is a limited recorder then do not force removal
if not limited_recorder and len(deleted_recordings) < segment_count:
logger.debug(
f"segment target of {segment_count} > {len(deleted_recordings)}, pulling all non-retained recordings"
)
# get the rest of the recording segments to look through
recordings: Recordings = (
Recordings.select()
.where(Recordings.camera == camera)
.order_by(Recordings.start_time.asc())
)
second_run: set[str] = self.delete_recording_segments(
recordings, retained_events, segment_count
)
deleted_recordings = deleted_recordings.union(second_run)
# check if still 2 hour quota still not meant
if not limited_recorder and len(deleted_recordings) < segment_count:
logger.debug(
f"segment target of {segment_count} > {len(deleted_recordings)}, pulling all recordings"
)
recordings: Recordings = (
Recordings.select()
.where(Recordings.camera == camera)
.order_by(Recordings.start_time.asc())
)
# delete segments including retained events
last_run: set[str] = self.delete_recording_segments(
recordings, [], segment_count
)
deleted_recordings = deleted_recordings.union(last_run)
logger.debug(f"Expiring {len(deleted_recordings)} recordings")
# delete up to 100,000 at a time
max_deletes = 100000
deleted_recordings_list = list(deleted_recordings)
for i in range(0, len(deleted_recordings_list), max_deletes):
Recordings.delete().where(
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
logger.debug(f"End camera: {camera}.")
logger.debug("End all cameras.")
logger.debug("End storage cleanup.")
def run(self): def run(self):
# Check storage consumption every 5 minutes """Check every 5 minutes if storage needs to be cleaned up."""
while not self.stop_event.wait(300): while not self.stop_event.wait(20):
if not self.camera_storage_stats or False in [ if not self.camera_storage_stats or False in [
r["needs_refresh"] for r in self.camera_storage_stats.values() r["needs_refresh"] for r in self.camera_storage_stats.values()
]: ]:
self.calculate_camera_bandwidth() self.calculate_camera_bandwidth()
logger.debug(f"Default camera bandwidths: {self.camera_storage_stats}") logger.debug(f"Default camera bandwidths: {self.camera_storage_stats}.")
if self.check_storage_needs_cleanup(): if self.check_storage_needs_cleanup():
self.reduce_storage_consumption() self.reduce_storage_consumption()