This commit is contained in:
Nick Mowen 2023-04-25 09:32:49 -06:00
parent 06b08e1a77
commit bab1e502a8
2 changed files with 25 additions and 21 deletions

View File

@ -14,6 +14,7 @@ import psutil
from collections import defaultdict from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from typing import Any, Tuple
from frigate.config import RetainModeEnum, FrigateConfig from frigate.config import RetainModeEnum, FrigateConfig
from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR
@ -24,7 +25,6 @@ logger = logging.getLogger(__name__)
class RecordingMaintainer(threading.Thread): class RecordingMaintainer(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
@ -36,10 +36,10 @@ class RecordingMaintainer(threading.Thread):
self.config = config self.config = config
self.recordings_info_queue = recordings_info_queue self.recordings_info_queue = recordings_info_queue
self.stop_event = stop_event self.stop_event = stop_event
self.recordings_info = defaultdict(list) self.recordings_info: dict[str, Any] = defaultdict(list)
self.end_time_cache = {} self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
def move_files(self): def move_files(self) -> None:
cache_files = sorted( cache_files = sorted(
[ [
d d
@ -64,14 +64,14 @@ class RecordingMaintainer(threading.Thread):
continue continue
# group recordings by camera # group recordings by camera
grouped_recordings = defaultdict(list) grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for f in cache_files: for cache in cache_files:
# Skip files currently in use # Skip files currently in use
if f in files_in_use: if cache in files_in_use:
continue continue
cache_path = os.path.join(CACHE_DIR, f) cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(f)[0] basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("-", maxsplit=1) camera, date = basename.rsplit("-", maxsplit=1)
start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S") start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
@ -91,8 +91,8 @@ class RecordingMaintainer(threading.Thread):
f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {segment_count} and discarding the rest..." f"Unable to keep up with recording segments in cache for {camera}. Keeping the {keep_count} most recent segments out of {segment_count} and discarding the rest..."
) )
to_remove = grouped_recordings[camera][:-keep_count] to_remove = grouped_recordings[camera][:-keep_count]
for f in to_remove: for rec in to_remove:
cache_path = f["cache_path"] cache_path = rec["cache_path"]
Path(cache_path).unlink(missing_ok=True) Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
@ -157,7 +157,7 @@ class RecordingMaintainer(threading.Thread):
else: else:
if duration == -1: if duration == -1:
logger.warning( logger.warning(
f"Failed to probe corrupt segment {cache_path}: {p.returncode} - {p.stderr}" f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}"
) )
logger.warning( logger.warning(
@ -228,7 +228,9 @@ class RecordingMaintainer(threading.Thread):
camera, start_time, end_time, duration, cache_path, record_mode camera, start_time, end_time, duration, cache_path, record_mode
) )
def segment_stats(self, camera, start_time, end_time): def segment_stats(
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
) -> Tuple[int, int]:
active_count = 0 active_count = 0
motion_count = 0 motion_count = 0
for frame in self.recordings_info[camera]: for frame in self.recordings_info[camera]:
@ -253,13 +255,13 @@ class RecordingMaintainer(threading.Thread):
def store_segment( def store_segment(
self, self,
camera, camera: str,
start_time: datetime.datetime, start_time: datetime.datetime,
end_time: datetime.datetime, end_time: datetime.datetime,
duration, duration: float,
cache_path, cache_path: str,
store_mode: RetainModeEnum, store_mode: RetainModeEnum,
): ) -> None:
motion_count, active_count = self.segment_stats(camera, start_time, end_time) motion_count, active_count = self.segment_stats(camera, start_time, end_time)
# check if the segment shouldn't be stored # check if the segment shouldn't be stored
@ -350,9 +352,9 @@ class RecordingMaintainer(threading.Thread):
# clear end_time cache # clear end_time cache
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
def run(self): def run(self) -> None:
# Check for new files every 5 seconds # Check for new files every 5 seconds
wait_time = 5 wait_time = 5.0
while not self.stop_event.wait(wait_time): while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp() run_start = datetime.datetime.now().timestamp()

View File

@ -6,6 +6,8 @@ import signal
import threading import threading
from setproctitle import setproctitle from setproctitle import setproctitle
from types import FrameType
from typing import Optional
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
@ -21,10 +23,10 @@ logger = logging.getLogger(__name__)
def manage_recordings( def manage_recordings(
config: FrigateConfig, config: FrigateConfig,
recordings_info_queue: mp.Queue, recordings_info_queue: mp.Queue,
): ) -> None:
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)