From 6afba3a6c7ce02fe2c06b7a657b68c7d3e15d5be Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Tue, 23 May 2023 17:54:44 +0300 Subject: [PATCH] Add temporary table for deletion and use pagination to process recordings in chunks for deletion of recordings with missing files --- frigate/record/cleanup.py | 64 +++++++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index 605979ee4..074494a7f 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -7,7 +7,7 @@ import os import threading from pathlib import Path -from peewee import DoesNotExist +from peewee import chunked, DoesNotExist, DatabaseError, Model, CharField from multiprocessing.synchronize import Event as MpEvent from frigate.config import RetainModeEnum, FrigateConfig @@ -18,6 +18,10 @@ from frigate.record.util import remove_empty_directories logger = logging.getLogger(__name__) +class RecordingsToDelete(Model): + id = CharField(null=False, primary_key=False, max_length=30) + + class RecordingCleanup(threading.Thread): """Cleanup existing recordings based on retention config.""" @@ -215,34 +219,56 @@ class RecordingCleanup(threading.Thread): logger.debug("Start sync recordings.") # get all recordings in the db - recordings: Recordings = Recordings.select() + recordings = Recordings.select(Recordings.id, Recordings.path) - # get all recordings files on disk - files_on_disk = [] - for root, _, files in os.walk(RECORD_DIR): - for file in files: - files_on_disk.append(os.path.join(root, file)) + # get all recordings files on disk and put them in a set + files_on_disk = { + os.path.join(root, file) + for root, _, files in os.walk(RECORD_DIR) + for file in files + } - recordings_to_delete = [] - for recording in recordings.objects().iterator(): - if not recording.path in files_on_disk: - recordings_to_delete.append(recording.id) + # Use pagination to process records in chunks + page_size = 1000 + num_pages = (recordings.count() + page_size - 1) // page_size + recordings_to_delete = set() + + for page in range(num_pages): + for recording in recordings.paginate(page, page_size): + if recording.path not in files_on_disk: + recordings_to_delete.add(recording.id) + + # convert back to list of dictionaries for insertion + recordings_to_delete = [ + {"id": recording_id} for recording_id in recordings_to_delete + ] logger.debug( f"Deleting {len(recordings_to_delete)} recordings with missing files" ) - # delete up to 100,000 at a time - max_deletes = 100000 - for i in range(0, len(recordings_to_delete), max_deletes): - Recordings.delete().where( - Recordings.id << recordings_to_delete[i : i + max_deletes] - ).execute() + + # create a temporary table for deletion + RecordingsToDelete.create_table(temporary=True) + + # insert ids to the temporary table + max_inserts = 1000 + for batch in chunked(recordings_to_delete, max_inserts): + RecordingsToDelete.insert_many(batch).execute() + + try: + # delete records in the main table that exist in the temporary table + query = Recordings.delete().where( + Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id)) + ) + query.execute() + except DatabaseError as e: + logger.error(f"Database error during delete: {e}") logger.debug("End sync recordings.") def run(self) -> None: - # on startup sync recordings with disk (disabled due to too much CPU usage) - # self.sync_recordings() + # on startup sync recordings with disk + self.sync_recordings() # Expire tmp clips every minute, recordings and clean directories every hour. for counter in itertools.cycle(range(self.config.record.expire_interval)):