Add temporary table for deletion and use pagination to process recordings in chunks for deletion of recordings with missing files

This commit is contained in:
Sergey Krashevich 2023-05-23 17:54:44 +03:00
parent 846a180a7b
commit 6afba3a6c7
No known key found for this signature in database
GPG Key ID: 625171324E7D3856

View File

@ -7,7 +7,7 @@ import os
import threading import threading
from pathlib import Path from pathlib import Path
from peewee import DoesNotExist from peewee import chunked, DoesNotExist, DatabaseError, Model, CharField
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from frigate.config import RetainModeEnum, FrigateConfig from frigate.config import RetainModeEnum, FrigateConfig
@ -18,6 +18,10 @@ from frigate.record.util import remove_empty_directories
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RecordingsToDelete(Model):
id = CharField(null=False, primary_key=False, max_length=30)
class RecordingCleanup(threading.Thread): class RecordingCleanup(threading.Thread):
"""Cleanup existing recordings based on retention config.""" """Cleanup existing recordings based on retention config."""
@ -215,34 +219,56 @@ class RecordingCleanup(threading.Thread):
logger.debug("Start sync recordings.") logger.debug("Start sync recordings.")
# get all recordings in the db # get all recordings in the db
recordings: Recordings = Recordings.select() recordings = Recordings.select(Recordings.id, Recordings.path)
# get all recordings files on disk # get all recordings files on disk and put them in a set
files_on_disk = [] files_on_disk = {
for root, _, files in os.walk(RECORD_DIR): os.path.join(root, file)
for file in files: for root, _, files in os.walk(RECORD_DIR)
files_on_disk.append(os.path.join(root, file)) for file in files
}
recordings_to_delete = [] # Use pagination to process records in chunks
for recording in recordings.objects().iterator(): page_size = 1000
if not recording.path in files_on_disk: num_pages = (recordings.count() + page_size - 1) // page_size
recordings_to_delete.append(recording.id) recordings_to_delete = set()
for page in range(num_pages):
for recording in recordings.paginate(page, page_size):
if recording.path not in files_on_disk:
recordings_to_delete.add(recording.id)
# convert back to list of dictionaries for insertion
recordings_to_delete = [
{"id": recording_id} for recording_id in recordings_to_delete
]
logger.debug( logger.debug(
f"Deleting {len(recordings_to_delete)} recordings with missing files" f"Deleting {len(recordings_to_delete)} recordings with missing files"
) )
# delete up to 100,000 at a time
max_deletes = 100000 # create a temporary table for deletion
for i in range(0, len(recordings_to_delete), max_deletes): RecordingsToDelete.create_table(temporary=True)
Recordings.delete().where(
Recordings.id << recordings_to_delete[i : i + max_deletes] # insert ids to the temporary table
).execute() max_inserts = 1000
for batch in chunked(recordings_to_delete, max_inserts):
RecordingsToDelete.insert_many(batch).execute()
try:
# delete records in the main table that exist in the temporary table
query = Recordings.delete().where(
Recordings.id.in_(RecordingsToDelete.select(RecordingsToDelete.id))
)
query.execute()
except DatabaseError as e:
logger.error(f"Database error during delete: {e}")
logger.debug("End sync recordings.") logger.debug("End sync recordings.")
def run(self) -> None: def run(self) -> None:
# on startup sync recordings with disk (disabled due to too much CPU usage) # on startup sync recordings with disk
# self.sync_recordings() self.sync_recordings()
# Expire tmp clips every minute, recordings and clean directories every hour. # Expire tmp clips every minute, recordings and clean directories every hour.
for counter in itertools.cycle(range(self.config.record.expire_interval)): for counter in itertools.cycle(range(self.config.record.expire_interval)):