Refactored code to use deque for recordings_info and added a lock to ensure thread safety

This commit is contained in:
Sergey Krashevich 2023-07-06 21:08:33 +03:00
parent baf671b764
commit 556fcc72c2
No known key found for this signature in database
GPG Key ID: 625171324E7D3856

View File

@ -9,7 +9,10 @@ import random
import string import string
import subprocess as sp import subprocess as sp
import threading import threading
from collections import defaultdict import time
from bisect import bisect_left, bisect_right
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from typing import Any, Tuple from typing import Any, Tuple
@ -41,21 +44,22 @@ class RecordingMaintainer(threading.Thread):
self.recordings_info_queue = recordings_info_queue self.recordings_info_queue = recordings_info_queue
self.process_info = process_info self.process_info = process_info
self.stop_event = stop_event self.stop_event = stop_event
self.recordings_info: dict[str, Any] = defaultdict(list) self.recordings_info: dict[str, Any] = defaultdict(deque)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
self.lock = threading.Lock()
self.executor = ThreadPoolExecutor(max_workers=5)
async def move_files(self) -> None: async def move_files(self) -> None:
cache_files = sorted( with os.scandir(CACHE_DIR) as entries:
[ cache_files = sorted(
d entry.name
for d in os.listdir(CACHE_DIR) for entry in entries
if os.path.isfile(os.path.join(CACHE_DIR, d)) if entry.is_file()
and d.endswith(".mp4") and entry.name.endswith(".mp4")
and not d.startswith("clip_") and not entry.name.startswith("clip_")
] )
)
files_in_use = [] files_in_use = set()
for process in psutil.process_iter(): for process in psutil.process_iter():
try: try:
if process.name() != "ffmpeg": if process.name() != "ffmpeg":
@ -64,7 +68,7 @@ class RecordingMaintainer(threading.Thread):
if flist: if flist:
for nt in flist: for nt in flist:
if nt.path.startswith(CACHE_DIR): if nt.path.startswith(CACHE_DIR):
files_in_use.append(nt.path.split("/")[-1]) files_in_use.add(nt.path.split("/")[-1])
except psutil.Error: except psutil.Error:
continue continue
@ -98,18 +102,19 @@ class RecordingMaintainer(threading.Thread):
to_remove = grouped_recordings[camera][:-keep_count] to_remove = grouped_recordings[camera][:-keep_count]
for rec in to_remove: for rec in to_remove:
cache_path = rec["cache_path"] cache_path = rec["cache_path"]
Path(cache_path).unlink(missing_ok=True) self.executor.submit(Path(cache_path).unlink, missing_ok=True)
self.end_time_cache.pop(cache_path, None) with self.lock:
self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
for camera, recordings in grouped_recordings.items(): for camera, recordings in grouped_recordings.items():
# clear out all the recording info for old frames with self.lock:
while ( while (
len(self.recordings_info[camera]) > 0 len(self.recordings_info[camera]) > 0
and self.recordings_info[camera][0][0] and self.recordings_info[camera][0][0]
< recordings[0]["start_time"].timestamp() < recordings[0]["start_time"].timestamp()
): ):
self.recordings_info[camera].pop(0) self.recordings_info[camera].popleft()
# get all events with the end time after the start of the oldest cache file # get all events with the end time after the start of the oldest cache file
# or with end_time None # or with end_time None
@ -223,23 +228,19 @@ class RecordingMaintainer(threading.Thread):
) -> Tuple[int, int]: ) -> Tuple[int, int]:
active_count = 0 active_count = 0
motion_count = 0 motion_count = 0
for frame in self.recordings_info[camera]:
# frame is after end time of segment
if frame[0] > end_time.timestamp():
break
# frame is before start time of segment
if frame[0] < start_time.timestamp():
continue
active_count += len( timestamps = [frame[0] for frame in self.recordings_info[camera]]
[ start_index = bisect_left(timestamps, start_time.timestamp())
o end_index = bisect_right(timestamps, end_time.timestamp())
for o in frame[1]
if not o["false_positive"] and o["motionless_count"] == 0 for frame in list(self.recordings_info[camera])[start_index:end_index]:
] active_count += sum(
1
for o in frame[1]
if not o["false_positive"] and o["motionless_count"] == 0
) )
motion_count += sum([area(box) for box in frame[2]]) motion_count += sum(area(box) for box in frame[2])
return (motion_count, active_count) return (motion_count, active_count)
@ -347,7 +348,7 @@ class RecordingMaintainer(threading.Thread):
# Check for new files every 5 seconds # Check for new files every 5 seconds
wait_time = 5.0 wait_time = 5.0
while not self.stop_event.wait(wait_time): while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp() run_start = time.perf_counter()
# empty the recordings info queue # empty the recordings info queue
while True: while True:
@ -373,13 +374,14 @@ class RecordingMaintainer(threading.Thread):
break break
try: try:
asyncio.run(self.move_files()) threading.Thread(target=asyncio.run, args=(self.move_files(),)).start()
except Exception as e: except Exception as e:
logger.error( logger.error(
"Error occurred when attempting to maintain recording cache" "Error occurred when attempting to maintain recording cache"
) )
logger.error(e) logger.error(e)
duration = datetime.datetime.now().timestamp() - run_start else:
wait_time = max(0, 5 - duration) duration = time.perf_counter() - run_start
wait_time = max(0, 5 - duration)
logger.info("Exiting recording maintenance...") logger.info("Exiting recording maintenance...")