diff --git a/frigate/comms/recordings_updater.py b/frigate/comms/recordings_updater.py index f5acbd004..249c2f607 100644 --- a/frigate/comms/recordings_updater.py +++ b/frigate/comms/recordings_updater.py @@ -11,11 +11,13 @@ logger = logging.getLogger(__name__) class RecordingsDataTypeEnum(str, Enum): all = "" - recordings_available_through = "recordings_available_through" - latest_valid_segment = "latest_valid_segment" + saved = "saved" # segment has been saved to db + latest = "latest" # segment is in cache + valid = "valid" # segment is valid + invalid = "invalid" # segment is invalid -class RecordingsDataPublisher(Publisher[tuple[str, float]]): +class RecordingsDataPublisher(Publisher[Any]): """Publishes latest recording data.""" topic_base = "recordings/" diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 7cfbf5333..7b3c79883 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -144,7 +144,7 @@ class EmbeddingMaintainer(threading.Thread): EventMetadataTypeEnum.regenerate_description ) self.recordings_subscriber = RecordingsDataSubscriber( - RecordingsDataTypeEnum.recordings_available_through + RecordingsDataTypeEnum.saved ) self.review_subscriber = ReviewDataSubscriber("") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value) @@ -536,10 +536,8 @@ class EmbeddingMaintainer(threading.Thread): topic = str(raw_topic) - if topic.endswith( - RecordingsDataTypeEnum.recordings_available_through.value - ): - camera, recordings_available_through_timestamp = payload + if topic.endswith(RecordingsDataTypeEnum.saved.value): + camera, recordings_available_through_timestamp, _ = payload self.recordings_available_through[camera] = ( recordings_available_through_timestamp diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 073867882..7938df31f 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -96,6 +96,41 @@ class RecordingMaintainer(threading.Thread): and not d.startswith("preview_") ] + # publish newest cached segment per camera (including in use files) + newest_cache_segments: dict[str, dict[str, Any]] = {} + for cache in cache_files: + cache_path = os.path.join(CACHE_DIR, cache) + basename = os.path.splitext(cache)[0] + camera, date = basename.rsplit("@", maxsplit=1) + start_time = datetime.datetime.strptime( + date, CACHE_SEGMENT_FORMAT + ).astimezone(datetime.timezone.utc) + if ( + camera not in newest_cache_segments + or start_time > newest_cache_segments[camera]["start_time"] + ): + newest_cache_segments[camera] = { + "start_time": start_time, + "cache_path": cache_path, + } + + for camera, newest in newest_cache_segments.items(): + self.recordings_publisher.publish( + ( + camera, + newest["start_time"].timestamp(), + newest["cache_path"], + ), + RecordingsDataTypeEnum.latest.value, + ) + # publish None for cameras with no cache files (but only if we know the camera exists) + for camera_name in self.config.cameras: + if camera_name not in newest_cache_segments: + self.recordings_publisher.publish( + (camera_name, None, None), + RecordingsDataTypeEnum.latest.value, + ) + files_in_use = [] for process in psutil.process_iter(): try: @@ -109,7 +144,7 @@ class RecordingMaintainer(threading.Thread): except psutil.Error: continue - # group recordings by camera + # group recordings by camera (skip in-use for validation/moving) grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) for cache in cache_files: # Skip files currently in use @@ -231,8 +266,9 @@ class RecordingMaintainer(threading.Thread): recordings[0]["start_time"].timestamp() if self.config.cameras[camera].record.enabled else None, + None, ), - RecordingsDataTypeEnum.recordings_available_through.value, + RecordingsDataTypeEnum.saved.value, ) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) @@ -273,6 +309,10 @@ class RecordingMaintainer(threading.Thread): logger.warning( f"Invalid or missing video stream in segment {cache_path}. Discarding." ) + self.recordings_publisher.publish( + (camera, start_time.timestamp(), cache_path), + RecordingsDataTypeEnum.invalid.value, + ) self.drop_segment(cache_path) return None @@ -287,13 +327,17 @@ class RecordingMaintainer(threading.Thread): logger.warning(f"Failed to probe corrupt segment {cache_path}") logger.warning(f"Discarding a corrupt recording segment: {cache_path}") + self.recordings_publisher.publish( + (camera, start_time.timestamp(), cache_path), + RecordingsDataTypeEnum.invalid.value, + ) self.drop_segment(cache_path) return None # this segment has a valid duration and has video data, so publish an update self.recordings_publisher.publish( (camera, start_time.timestamp(), cache_path), - RecordingsDataTypeEnum.latest_valid_segment.value, + RecordingsDataTypeEnum.valid.value, ) record_config = self.config.cameras[camera].record diff --git a/frigate/video.py b/frigate/video.py index 1284b7ecf..7d88da24c 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,5 +1,4 @@ import logging -import os import queue import subprocess as sp import threading @@ -24,8 +23,6 @@ from frigate.config.camera.updater import ( CameraConfigUpdateSubscriber, ) from frigate.const import ( - CACHE_DIR, - CACHE_SEGMENT_FORMAT, PROCESS_PRIORITY_HIGH, REQUEST_REGION_GRID, ) @@ -203,12 +200,10 @@ class CameraWatchdog(threading.Thread): self.requestor = InterProcessRequestor() self.was_enabled = self.config.enabled - self.segment_subscriber = RecordingsDataSubscriber( - RecordingsDataTypeEnum.latest_valid_segment - ) - self.latest_valid_segment_time: float = ( - datetime.now().astimezone(timezone.utc).timestamp() - ) + self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all) + self.latest_valid_segment_time: float = 0 + self.latest_invalid_segment_time: float = 0 + self.latest_cache_segment_time: float = 0 def _update_enabled_state(self) -> bool: """Fetch the latest config and update enabled state.""" @@ -254,6 +249,11 @@ class CameraWatchdog(threading.Thread): if enabled: self.logger.debug(f"Enabling camera {self.config.name}") self.start_all_ffmpeg() + + # reset all timestamps + self.latest_valid_segment_time = 0 + self.latest_invalid_segment_time = 0 + self.latest_cache_segment_time = 0 else: self.logger.debug(f"Disabling camera {self.config.name}") self.stop_all_ffmpeg() @@ -271,24 +271,35 @@ class CameraWatchdog(threading.Thread): if not enabled: continue - update = self.segment_subscriber.check_for_update(timeout=0) + while True: + update = self.segment_subscriber.check_for_update(timeout=0) + + if update == (None, None): + break - if update is not None: raw_topic, payload = update if raw_topic and payload: topic = str(raw_topic) - if ( - topic.endswith( - RecordingsDataTypeEnum.latest_valid_segment.value + camera, segment_time, _ = payload + + if camera != self.config.name: + continue + + if topic.endswith(RecordingsDataTypeEnum.valid.value): + self.logger.info( + f"Latest valid recording segment time on {camera}: {segment_time}" ) - and payload - ): - camera, latest_valid_segment_time, _ = payload - if camera == self.config.name: - logger.debug( - f"Latest valid recording segment time on {camera}: {latest_valid_segment_time}" - ) - self.latest_valid_segment_time = latest_valid_segment_time + self.latest_valid_segment_time = segment_time + elif topic.endswith(RecordingsDataTypeEnum.invalid.value): + self.logger.warning( + f"Invalid recording segment detected for {camera} at {segment_time}" + ) + self.latest_invalid_segment_time = segment_time + elif topic.endswith(RecordingsDataTypeEnum.latest.value): + if segment_time is not None: + self.latest_cache_segment_time = segment_time + else: + self.latest_cache_segment_time = 0 now = datetime.now().timestamp() @@ -330,25 +341,53 @@ class CameraWatchdog(threading.Thread): if self.config.record.enabled and "record" in p["roles"]: now_utc = datetime.now().astimezone(timezone.utc) - latest_segment_time = self.get_latest_segment_datetime( - p.get( - "latest_segment_time", - now_utc, + latest_cache_dt = ( + datetime.fromtimestamp( + self.latest_cache_segment_time, tz=timezone.utc ) + if self.latest_cache_segment_time > 0 + else now_utc - timedelta(seconds=1) + ) + + latest_valid_dt = ( + datetime.fromtimestamp( + self.latest_valid_segment_time, tz=timezone.utc + ) + if self.latest_valid_segment_time > 0 + else now_utc - timedelta(seconds=1) + ) + + latest_invalid_dt = ( + datetime.fromtimestamp( + self.latest_invalid_segment_time, tz=timezone.utc + ) + if self.latest_invalid_segment_time > 0 + else now_utc - timedelta(seconds=1) ) # ensure segments are still being created and that they have valid video data - if now_utc > (latest_segment_time + timedelta(seconds=120)) or ( - now_utc - > ( - datetime.fromtimestamp( - self.latest_valid_segment_time, tz=timezone.utc + cache_stale = now_utc > (latest_cache_dt + timedelta(seconds=120)) + valid_stale = now_utc > (latest_valid_dt + timedelta(seconds=120)) + invalid_stale_condition = ( + self.latest_invalid_segment_time > 0 + and now_utc > (latest_invalid_dt + timedelta(seconds=120)) + and self.latest_valid_segment_time + <= self.latest_invalid_segment_time + ) + invalid_stale = invalid_stale_condition + + if cache_stale or valid_stale or invalid_stale: + if cache_stale: + reason = "No new recording segments were created" + elif valid_stale: + reason = "No new valid recording segments were created" + else: # invalid_stale + reason = ( + "No valid segments created since last invalid segment" ) - + timedelta(seconds=120) - ) - ): + self.logger.error( - f"No new or valid recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." + f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..." ) p["process"] = start_or_restart_ffmpeg( p["cmd"], @@ -367,7 +406,7 @@ class CameraWatchdog(threading.Thread): self.requestor.send_data( f"{self.config.name}/status/record", "online" ) - p["latest_segment_time"] = latest_segment_time + p["latest_segment_time"] = self.latest_cache_segment_time if poll is None: continue @@ -445,31 +484,6 @@ class CameraWatchdog(threading.Thread): p["logpipe"].close() self.ffmpeg_other_processes.clear() - def get_latest_segment_datetime(self, latest_segment: datetime) -> datetime: - """Checks if ffmpeg is still writing recording segments to cache.""" - cache_files = sorted( - [ - d - for d in os.listdir(CACHE_DIR) - if os.path.isfile(os.path.join(CACHE_DIR, d)) - and d.endswith(".mp4") - and not d.startswith("preview_") - ] - ) - newest_segment_time = latest_segment - - for file in cache_files: - if self.config.name in file: - basename = os.path.splitext(file)[0] - _, date = basename.rsplit("@", maxsplit=1) - segment_time = datetime.strptime(date, CACHE_SEGMENT_FORMAT).astimezone( - timezone.utc - ) - if segment_time > newest_segment_time: - newest_segment_time = segment_time - - return newest_segment_time - class CameraCaptureRunner(threading.Thread): def __init__( @@ -680,7 +694,7 @@ def process_frames( camera_metrics: CameraMetrics, stop_event: MpEvent, ptz_metrics: PTZMetrics, - region_grid: list[list[dict[str, Any]]], + region_grid: list[list, Any], exit_on_empty: bool = False, ): next_region_update = get_tomorrow_at_time(2)