From b9b669e09b2fc9ff7dc7f0e68b503905efcb9375 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Wed, 6 Nov 2024 09:53:38 -0600 Subject: [PATCH] Implement batching for event cleanup --- frigate/events/cleanup.py | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/frigate/events/cleanup.py b/frigate/events/cleanup.py index 7d3e7c456..0cdce5220 100644 --- a/frigate/events/cleanup.py +++ b/frigate/events/cleanup.py @@ -9,7 +9,7 @@ from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from frigate.config import FrigateConfig -from frigate.const import CLIPS_DIR +from frigate.const import CHUNK_SIZE, CLIPS_DIR from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.models import Event, Timeline @@ -21,6 +21,9 @@ class EventCleanupType(str, Enum): snapshots = "snapshots" +CHUNK_SIZE = 50 + + class EventCleanup(threading.Thread): def __init__( self, config: FrigateConfig, stop_event: MpEvent, db: SqliteVecQueueDatabase @@ -125,13 +128,28 @@ class EventCleanup(threading.Thread): logger.warning(f"Unable to delete event images: {e}") # update the clips attribute for the db entry - update_query = Event.update(update_params).where( + query = Event.select(Event.id).where( Event.camera.not_in(self.camera_keys), Event.start_time < expire_after, Event.label == event.label, Event.retain_indefinitely == False, ) - update_query.execute() + + events_to_update = [] + + for batch in query.iterator(): + events_to_update.extend([event.id for event in batch]) + if len(events_to_update) >= CHUNK_SIZE: + Event.update(update_params).where( + Event.id << events_to_update + ).execute() + events_to_update = [] + + # Update any remaining events + if events_to_update: + Event.update(update_params).where( + Event.id << events_to_update + ).execute() events_to_update = [] @@ -196,7 +214,10 @@ class EventCleanup(threading.Thread): logger.warning(f"Unable to delete event images: {e}") # update the clips attribute for the db entry - Event.update(update_params).where(Event.id << events_to_update).execute() + for i in range(0, len(events_to_update), CHUNK_SIZE): + batch = events_to_update[i : i + CHUNK_SIZE] + Event.update(update_params).where(Event.id << batch).execute() + return events_to_update def run(self) -> None: @@ -223,9 +244,8 @@ class EventCleanup(threading.Thread): ) events_to_delete = [e.id for e in events] if len(events_to_delete) > 0: - chunk_size = 50 - for i in range(0, len(events_to_delete), chunk_size): - chunk = events_to_delete[i : i + chunk_size] + for i in range(0, len(events_to_delete), CHUNK_SIZE): + chunk = events_to_delete[i : i + CHUNK_SIZE] Event.delete().where(Event.id << chunk).execute() if self.config.semantic_search.enabled: