diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 2d2d93f08..6e4562030 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -1,5 +1,6 @@ """Maintain recording segments in cache.""" +import asyncio import datetime import logging import multiprocessing as mp @@ -13,6 +14,7 @@ from collections import defaultdict from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Any, Tuple +import traceback import psutil @@ -42,7 +44,7 @@ class RecordingMaintainer(threading.Thread): self.recordings_info: dict[str, Any] = defaultdict(list) self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} - def move_files(self) -> None: + async def move_files(self) -> None: cache_files = sorted( [ d @@ -121,115 +123,111 @@ class RecordingMaintainer(threading.Thread): ) .order_by(Event.start_time) ) - for r in recordings: - cache_path = r["cache_path"] - start_time = r["start_time"] - # Just delete files if recordings are turned off - if ( - camera not in self.config.cameras - or not self.process_info[camera]["record_enabled"].value - ): + await asyncio.gather( + *(self.validate_segment(camera, events, r) for r in recordings) + ) + + async def validate_segment( + self, camera: str, events: Event, recording: dict[str, any] + ) -> None: + cache_path = recording["cache_path"] + start_time = recording["start_time"] + + # Just delete files if recordings are turned off + if ( + camera not in self.config.cameras + or not self.process_info[camera]["record_enabled"].value + ): + Path(cache_path).unlink(missing_ok=True) + self.end_time_cache.pop(cache_path, None) + return + + if cache_path in self.end_time_cache: + end_time, duration = self.end_time_cache[cache_path] + else: + ffprobe_cmd = [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + f"{cache_path}", + ] + p = sp.run(ffprobe_cmd, capture_output=True) + if p.returncode == 0 and p.stdout.decode(): + duration = float(p.stdout.decode().strip()) + else: + duration = -1 + + # ensure duration is within expected length + if 0 < duration < MAX_SEGMENT_DURATION: + end_time = start_time + datetime.timedelta(seconds=duration) + self.end_time_cache[cache_path] = (end_time, duration) + else: + if duration == -1: + logger.warning( + f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}" + ) + + logger.warning(f"Discarding a corrupt recording segment: {cache_path}") + Path(cache_path).unlink(missing_ok=True) + return + + # if cached file's start_time is earlier than the retain days for the camera + if start_time <= ( + ( + datetime.datetime.now() + - datetime.timedelta( + days=self.config.cameras[camera].record.retain.days + ) + ) + ): + # if the cached segment overlaps with the events: + overlaps = False + for event in events: + # if the event starts in the future, stop checking events + # and remove this segment + if event.start_time > end_time.timestamp(): + overlaps = False Path(cache_path).unlink(missing_ok=True) self.end_time_cache.pop(cache_path, None) - continue + break - if cache_path in self.end_time_cache: - end_time, duration = self.end_time_cache[cache_path] - else: - ffprobe_cmd = [ - "ffprobe", - "-v", - "error", - "-show_entries", - "format=duration", - "-of", - "default=noprint_wrappers=1:nokey=1", - f"{cache_path}", - ] - p = sp.run(ffprobe_cmd, capture_output=True) - if p.returncode == 0 and p.stdout.decode(): - duration = float(p.stdout.decode().strip()) - else: - duration = -1 + # if the event is in progress or ends after the recording starts, keep it + # and stop looking at events + if event.end_time is None or event.end_time >= start_time.timestamp(): + overlaps = True + break - # ensure duration is within expected length - if 0 < duration < MAX_SEGMENT_DURATION: - end_time = start_time + datetime.timedelta(seconds=duration) - self.end_time_cache[cache_path] = (end_time, duration) - else: - if duration == -1: - logger.warning( - f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}" - ) - - logger.warning( - f"Discarding a corrupt recording segment: {cache_path}" - ) - Path(cache_path).unlink(missing_ok=True) - continue - - # if cached file's start_time is earlier than the retain days for the camera - if start_time <= ( - ( - datetime.datetime.now() - - datetime.timedelta( - days=self.config.cameras[camera].record.retain.days - ) - ) - ): - # if the cached segment overlaps with the events: - overlaps = False - for event in events: - # if the event starts in the future, stop checking events - # and remove this segment - if event.start_time > end_time.timestamp(): - overlaps = False - Path(cache_path).unlink(missing_ok=True) - self.end_time_cache.pop(cache_path, None) - break - - # if the event is in progress or ends after the recording starts, keep it - # and stop looking at events - if ( - event.end_time is None - or event.end_time >= start_time.timestamp() - ): - overlaps = True - break - - if overlaps: - record_mode = self.config.cameras[ - camera - ].record.events.retain.mode - # move from cache to recordings immediately - self.store_segment( - camera, - start_time, - end_time, - duration, - cache_path, - record_mode, - ) - # if it doesn't overlap with an event, go ahead and drop the segment - # if it ends more than the configured pre_capture for the camera - else: - pre_capture = self.config.cameras[ - camera - ].record.events.pre_capture - most_recently_processed_frame_time = self.recordings_info[ - camera - ][-1][0] - retain_cutoff = most_recently_processed_frame_time - pre_capture - if end_time.timestamp() < retain_cutoff: - Path(cache_path).unlink(missing_ok=True) - self.end_time_cache.pop(cache_path, None) - # else retain days includes this segment - else: - record_mode = self.config.cameras[camera].record.retain.mode - self.store_segment( - camera, start_time, end_time, duration, cache_path, record_mode - ) + if overlaps: + record_mode = self.config.cameras[camera].record.events.retain.mode + # move from cache to recordings immediately + self.store_segment( + camera, + start_time, + end_time, + duration, + cache_path, + record_mode, + ) + # if it doesn't overlap with an event, go ahead and drop the segment + # if it ends more than the configured pre_capture for the camera + else: + pre_capture = self.config.cameras[camera].record.events.pre_capture + most_recently_processed_frame_time = self.recordings_info[camera][-1][0] + retain_cutoff = most_recently_processed_frame_time - pre_capture + if end_time.timestamp() < retain_cutoff: + Path(cache_path).unlink(missing_ok=True) + self.end_time_cache.pop(cache_path, None) + # else retain days includes this segment + else: + record_mode = self.config.cameras[camera].record.retain.mode + self.store_segment( + camera, start_time, end_time, duration, cache_path, record_mode + ) def segment_stats( self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime @@ -386,12 +384,13 @@ class RecordingMaintainer(threading.Thread): break try: - self.move_files() + asyncio.run(self.move_files()) except Exception as e: logger.error( "Error occurred when attempting to maintain recording cache" ) logger.error(e) + logger.error(str(traceback.print_exc())) duration = datetime.datetime.now().timestamp() - run_start wait_time = max(0, 5 - duration)