diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index d21affefa..faa1eabcd 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -9,7 +9,10 @@ import random import string import subprocess as sp import threading -from collections import defaultdict +import time +from bisect import bisect_left, bisect_right +from collections import defaultdict, deque +from concurrent.futures import ThreadPoolExecutor from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Any, Tuple @@ -41,21 +44,22 @@ class RecordingMaintainer(threading.Thread): self.recordings_info_queue = recordings_info_queue self.process_info = process_info self.stop_event = stop_event - self.recordings_info: dict[str, Any] = defaultdict(list) + self.recordings_info: dict[str, Any] = defaultdict(deque) self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} + self.lock = threading.Lock() + self.executor = ThreadPoolExecutor(max_workers=5) async def move_files(self) -> None: - cache_files = sorted( - [ - d - for d in os.listdir(CACHE_DIR) - if os.path.isfile(os.path.join(CACHE_DIR, d)) - and d.endswith(".mp4") - and not d.startswith("clip_") - ] - ) + with os.scandir(CACHE_DIR) as entries: + cache_files = sorted( + entry.name + for entry in entries + if entry.is_file() + and entry.name.endswith(".mp4") + and not entry.name.startswith("clip_") + ) - files_in_use = [] + files_in_use = set() for process in psutil.process_iter(): try: if process.name() != "ffmpeg": @@ -64,7 +68,7 @@ class RecordingMaintainer(threading.Thread): if flist: for nt in flist: if nt.path.startswith(CACHE_DIR): - files_in_use.append(nt.path.split("/")[-1]) + files_in_use.add(nt.path.split("/")[-1]) except psutil.Error: continue @@ -98,18 +102,19 @@ class RecordingMaintainer(threading.Thread): to_remove = grouped_recordings[camera][:-keep_count] for rec in to_remove: cache_path = rec["cache_path"] - Path(cache_path).unlink(missing_ok=True) - self.end_time_cache.pop(cache_path, None) + self.executor.submit(Path(cache_path).unlink, missing_ok=True) + with self.lock: + self.end_time_cache.pop(cache_path, None) grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] for camera, recordings in grouped_recordings.items(): - # clear out all the recording info for old frames - while ( - len(self.recordings_info[camera]) > 0 - and self.recordings_info[camera][0][0] - < recordings[0]["start_time"].timestamp() - ): - self.recordings_info[camera].pop(0) + with self.lock: + while ( + len(self.recordings_info[camera]) > 0 + and self.recordings_info[camera][0][0] + < recordings[0]["start_time"].timestamp() + ): + self.recordings_info[camera].popleft() # get all events with the end time after the start of the oldest cache file # or with end_time None @@ -223,23 +228,19 @@ class RecordingMaintainer(threading.Thread): ) -> Tuple[int, int]: active_count = 0 motion_count = 0 - for frame in self.recordings_info[camera]: - # frame is after end time of segment - if frame[0] > end_time.timestamp(): - break - # frame is before start time of segment - if frame[0] < start_time.timestamp(): - continue - active_count += len( - [ - o - for o in frame[1] - if not o["false_positive"] and o["motionless_count"] == 0 - ] + timestamps = [frame[0] for frame in self.recordings_info[camera]] + start_index = bisect_left(timestamps, start_time.timestamp()) + end_index = bisect_right(timestamps, end_time.timestamp()) + + for frame in list(self.recordings_info[camera])[start_index:end_index]: + active_count += sum( + 1 + for o in frame[1] + if not o["false_positive"] and o["motionless_count"] == 0 ) - motion_count += sum([area(box) for box in frame[2]]) + motion_count += sum(area(box) for box in frame[2]) return (motion_count, active_count) @@ -347,7 +348,7 @@ class RecordingMaintainer(threading.Thread): # Check for new files every 5 seconds wait_time = 5.0 while not self.stop_event.wait(wait_time): - run_start = datetime.datetime.now().timestamp() + run_start = time.perf_counter() # empty the recordings info queue while True: @@ -373,13 +374,14 @@ class RecordingMaintainer(threading.Thread): break try: - asyncio.run(self.move_files()) + threading.Thread(target=asyncio.run, args=(self.move_files(),)).start() except Exception as e: logger.error( "Error occurred when attempting to maintain recording cache" ) logger.error(e) - duration = datetime.datetime.now().timestamp() - run_start - wait_time = max(0, 5 - duration) + else: + duration = time.perf_counter() - run_start + wait_time = max(0, 5 - duration) logger.info("Exiting recording maintenance...")