Implement batching for event cleanup

This commit is contained in:
Josh Hawkins 2024-11-06 09:53:38 -06:00
parent 2eb5fbf112
commit b9b669e09b

View File

@ -9,7 +9,7 @@ from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import CLIPS_DIR from frigate.const import CHUNK_SIZE, CLIPS_DIR
from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event, Timeline from frigate.models import Event, Timeline
@ -21,6 +21,9 @@ class EventCleanupType(str, Enum):
snapshots = "snapshots" snapshots = "snapshots"
CHUNK_SIZE = 50
class EventCleanup(threading.Thread): class EventCleanup(threading.Thread):
def __init__( def __init__(
self, config: FrigateConfig, stop_event: MpEvent, db: SqliteVecQueueDatabase self, config: FrigateConfig, stop_event: MpEvent, db: SqliteVecQueueDatabase
@ -125,13 +128,28 @@ class EventCleanup(threading.Thread):
logger.warning(f"Unable to delete event images: {e}") logger.warning(f"Unable to delete event images: {e}")
# update the clips attribute for the db entry # update the clips attribute for the db entry
update_query = Event.update(update_params).where( query = Event.select(Event.id).where(
Event.camera.not_in(self.camera_keys), Event.camera.not_in(self.camera_keys),
Event.start_time < expire_after, Event.start_time < expire_after,
Event.label == event.label, Event.label == event.label,
Event.retain_indefinitely == False, Event.retain_indefinitely == False,
) )
update_query.execute()
events_to_update = []
for batch in query.iterator():
events_to_update.extend([event.id for event in batch])
if len(events_to_update) >= CHUNK_SIZE:
Event.update(update_params).where(
Event.id << events_to_update
).execute()
events_to_update = []
# Update any remaining events
if events_to_update:
Event.update(update_params).where(
Event.id << events_to_update
).execute()
events_to_update = [] events_to_update = []
@ -196,7 +214,10 @@ class EventCleanup(threading.Thread):
logger.warning(f"Unable to delete event images: {e}") logger.warning(f"Unable to delete event images: {e}")
# update the clips attribute for the db entry # update the clips attribute for the db entry
Event.update(update_params).where(Event.id << events_to_update).execute() for i in range(0, len(events_to_update), CHUNK_SIZE):
batch = events_to_update[i : i + CHUNK_SIZE]
Event.update(update_params).where(Event.id << batch).execute()
return events_to_update return events_to_update
def run(self) -> None: def run(self) -> None:
@ -223,9 +244,8 @@ class EventCleanup(threading.Thread):
) )
events_to_delete = [e.id for e in events] events_to_delete = [e.id for e in events]
if len(events_to_delete) > 0: if len(events_to_delete) > 0:
chunk_size = 50 for i in range(0, len(events_to_delete), CHUNK_SIZE):
for i in range(0, len(events_to_delete), chunk_size): chunk = events_to_delete[i : i + CHUNK_SIZE]
chunk = events_to_delete[i : i + chunk_size]
Event.delete().where(Event.id << chunk).execute() Event.delete().where(Event.id << chunk).execute()
if self.config.semantic_search.enabled: if self.config.semantic_search.enabled: