From 12f8c3feac5d45976fa2055df2a9c49f0a546031 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sun, 28 Sep 2025 11:52:14 -0500 Subject: [PATCH] Watchdog enhancements (#20237) * refactor get_video_properties and use json output from ffprobe * add zmq topic * publish valid segment data in recording maintainer * check for valid video data - restart separate record ffmpeg process if no video data has been received in 120s - refactor datetime import * listen to correct topic in embeddings maintainer * refactor to move get_latest_segment_datetime logic to recordings maintainer * debug logging * cleanup --- frigate/comms/recordings_updater.py | 22 ++++- frigate/embeddings/maintainer.py | 28 ++++-- frigate/record/maintainer.py | 79 +++++++++++++--- frigate/util/services.py | 142 ++++++++++++++-------------- frigate/video.py | 141 ++++++++++++++++++--------- 5 files changed, 267 insertions(+), 145 deletions(-) diff --git a/frigate/comms/recordings_updater.py b/frigate/comms/recordings_updater.py index 0db4ad289..249c2f607 100644 --- a/frigate/comms/recordings_updater.py +++ b/frigate/comms/recordings_updater.py @@ -2,6 +2,7 @@ import logging from enum import Enum +from typing import Any from .zmq_proxy import Publisher, Subscriber @@ -10,18 +11,21 @@ logger = logging.getLogger(__name__) class RecordingsDataTypeEnum(str, Enum): all = "" - recordings_available_through = "recordings_available_through" + saved = "saved" # segment has been saved to db + latest = "latest" # segment is in cache + valid = "valid" # segment is valid + invalid = "invalid" # segment is invalid -class RecordingsDataPublisher(Publisher[tuple[str, float]]): +class RecordingsDataPublisher(Publisher[Any]): """Publishes latest recording data.""" topic_base = "recordings/" - def __init__(self, topic: RecordingsDataTypeEnum) -> None: - super().__init__(topic.value) + def __init__(self) -> None: + super().__init__() - def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None: + def publish(self, payload: Any, sub_topic: str = "") -> None: super().publish(payload, sub_topic) @@ -32,3 +36,11 @@ class RecordingsDataSubscriber(Subscriber): def __init__(self, topic: RecordingsDataTypeEnum) -> None: super().__init__(topic.value) + + def _return_object( + self, topic: str, payload: tuple | None + ) -> tuple[str, Any] | tuple[None, None]: + if payload is None: + return (None, None) + + return (topic, payload) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index dfb0eef61..d06d3c481 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -144,7 +144,7 @@ class EmbeddingMaintainer(threading.Thread): EventMetadataTypeEnum.regenerate_description ) self.recordings_subscriber = RecordingsDataSubscriber( - RecordingsDataTypeEnum.recordings_available_through + RecordingsDataTypeEnum.saved ) self.review_subscriber = ReviewDataSubscriber("") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value) @@ -525,20 +525,28 @@ class EmbeddingMaintainer(threading.Thread): def _process_recordings_updates(self) -> None: """Process recordings updates.""" while True: - recordings_data = self.recordings_subscriber.check_for_update() + update = self.recordings_subscriber.check_for_update() - if recordings_data == None: + if not update: break - camera, recordings_available_through_timestamp = recordings_data + (raw_topic, payload) = update - self.recordings_available_through[camera] = ( - recordings_available_through_timestamp - ) + if not raw_topic or not payload: + break - logger.debug( - f"{camera} now has recordings available through {recordings_available_through_timestamp}" - ) + topic = str(raw_topic) + + if topic.endswith(RecordingsDataTypeEnum.saved.value): + camera, recordings_available_through_timestamp, _ = payload + + self.recordings_available_through[camera] = ( + recordings_available_through_timestamp + ) + + logger.debug( + f"{camera} now has recordings available through {recordings_available_through_timestamp}" + ) def _process_review_updates(self) -> None: """Process review updates.""" diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 20f1eb289..7938df31f 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -80,9 +80,7 @@ class RecordingMaintainer(threading.Thread): [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record], ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value) - self.recordings_publisher = RecordingsDataPublisher( - RecordingsDataTypeEnum.recordings_available_through - ) + self.recordings_publisher = RecordingsDataPublisher() self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) @@ -98,6 +96,41 @@ class RecordingMaintainer(threading.Thread): and not d.startswith("preview_") ] + # publish newest cached segment per camera (including in use files) + newest_cache_segments: dict[str, dict[str, Any]] = {} + for cache in cache_files: + cache_path = os.path.join(CACHE_DIR, cache) + basename = os.path.splitext(cache)[0] + camera, date = basename.rsplit("@", maxsplit=1) + start_time = datetime.datetime.strptime( + date, CACHE_SEGMENT_FORMAT + ).astimezone(datetime.timezone.utc) + if ( + camera not in newest_cache_segments + or start_time > newest_cache_segments[camera]["start_time"] + ): + newest_cache_segments[camera] = { + "start_time": start_time, + "cache_path": cache_path, + } + + for camera, newest in newest_cache_segments.items(): + self.recordings_publisher.publish( + ( + camera, + newest["start_time"].timestamp(), + newest["cache_path"], + ), + RecordingsDataTypeEnum.latest.value, + ) + # publish None for cameras with no cache files (but only if we know the camera exists) + for camera_name in self.config.cameras: + if camera_name not in newest_cache_segments: + self.recordings_publisher.publish( + (camera_name, None, None), + RecordingsDataTypeEnum.latest.value, + ) + files_in_use = [] for process in psutil.process_iter(): try: @@ -111,7 +144,7 @@ class RecordingMaintainer(threading.Thread): except psutil.Error: continue - # group recordings by camera + # group recordings by camera (skip in-use for validation/moving) grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) for cache in cache_files: # Skip files currently in use @@ -233,7 +266,9 @@ class RecordingMaintainer(threading.Thread): recordings[0]["start_time"].timestamp() if self.config.cameras[camera].record.enabled else None, - ) + None, + ), + RecordingsDataTypeEnum.saved.value, ) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) @@ -250,7 +285,7 @@ class RecordingMaintainer(threading.Thread): async def validate_and_move_segment( self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any] - ) -> None: + ) -> Optional[Recordings]: cache_path: str = recording["cache_path"] start_time: datetime.datetime = recording["start_time"] record_config = self.config.cameras[camera].record @@ -261,7 +296,7 @@ class RecordingMaintainer(threading.Thread): or not self.config.cameras[camera].record.enabled ): self.drop_segment(cache_path) - return + return None if cache_path in self.end_time_cache: end_time, duration = self.end_time_cache[cache_path] @@ -270,10 +305,18 @@ class RecordingMaintainer(threading.Thread): self.config.ffmpeg, cache_path, get_duration=True ) - if segment_info["duration"]: - duration = float(segment_info["duration"]) - else: - duration = -1 + if not segment_info.get("has_valid_video", False): + logger.warning( + f"Invalid or missing video stream in segment {cache_path}. Discarding." + ) + self.recordings_publisher.publish( + (camera, start_time.timestamp(), cache_path), + RecordingsDataTypeEnum.invalid.value, + ) + self.drop_segment(cache_path) + return None + + duration = float(segment_info.get("duration", -1)) # ensure duration is within expected length if 0 < duration < MAX_SEGMENT_DURATION: @@ -284,8 +327,18 @@ class RecordingMaintainer(threading.Thread): logger.warning(f"Failed to probe corrupt segment {cache_path}") logger.warning(f"Discarding a corrupt recording segment: {cache_path}") - Path(cache_path).unlink(missing_ok=True) - return + self.recordings_publisher.publish( + (camera, start_time.timestamp(), cache_path), + RecordingsDataTypeEnum.invalid.value, + ) + self.drop_segment(cache_path) + return None + + # this segment has a valid duration and has video data, so publish an update + self.recordings_publisher.publish( + (camera, start_time.timestamp(), cache_path), + RecordingsDataTypeEnum.valid.value, + ) record_config = self.config.cameras[camera].record highest = None diff --git a/frigate/util/services.py b/frigate/util/services.py index c5eacf620..28497e803 100644 --- a/frigate/util/services.py +++ b/frigate/util/services.py @@ -603,87 +603,87 @@ def auto_detect_hwaccel() -> str: async def get_video_properties( ffmpeg, url: str, get_duration: bool = False ) -> dict[str, Any]: - async def calculate_duration(video: Optional[Any]) -> float: - duration = None - - if video is not None: - # Get the frames per second (fps) of the video stream - fps = video.get(cv2.CAP_PROP_FPS) - total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) - - if fps and total_frames: - duration = total_frames / fps - - # if cv2 failed need to use ffprobe - if duration is None: - p = await asyncio.create_subprocess_exec( - ffmpeg.ffprobe_path, - "-v", - "error", - "-show_entries", - "format=duration", - "-of", - "default=noprint_wrappers=1:nokey=1", - f"{url}", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, + async def probe_with_ffprobe( + url: str, + ) -> tuple[bool, int, int, Optional[str], float]: + """Fallback using ffprobe: returns (valid, width, height, codec, duration).""" + cmd = [ + ffmpeg.ffprobe_path, + "-v", + "quiet", + "-print_format", + "json", + "-show_format", + "-show_streams", + url, + ] + try: + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) - await p.wait() + stdout, _ = await proc.communicate() + if proc.returncode != 0: + return False, 0, 0, None, -1 - if p.returncode == 0: - result = (await p.stdout.read()).decode() - else: - result = None + data = json.loads(stdout.decode()) + video_streams = [ + s for s in data.get("streams", []) if s.get("codec_type") == "video" + ] + if not video_streams: + return False, 0, 0, None, -1 - if result: - try: - duration = float(result.strip()) - except ValueError: - duration = -1 - else: - duration = -1 + v = video_streams[0] + width = int(v.get("width", 0)) + height = int(v.get("height", 0)) + codec = v.get("codec_name") - return duration + duration_str = data.get("format", {}).get("duration") + duration = float(duration_str) if duration_str else -1.0 - width = height = 0 + return True, width, height, codec, duration + except (json.JSONDecodeError, ValueError, KeyError, asyncio.SubprocessError): + return False, 0, 0, None, -1 - try: - # Open the video stream using OpenCV - video = cv2.VideoCapture(url) + def probe_with_cv2(url: str) -> tuple[bool, int, int, Optional[str], float]: + """Primary attempt using cv2: returns (valid, width, height, fourcc, duration).""" + cap = cv2.VideoCapture(url) + if not cap.isOpened(): + cap.release() + return False, 0, 0, None, -1 - # Check if the video stream was opened successfully - if not video.isOpened(): - video = None - except Exception: - video = None + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + valid = width > 0 and height > 0 + fourcc = None + duration = -1.0 - result = {} + if valid: + fourcc_int = int(cap.get(cv2.CAP_PROP_FOURCC)) + fourcc = fourcc_int.to_bytes(4, "little").decode("latin-1").strip() + if get_duration: + fps = cap.get(cv2.CAP_PROP_FPS) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + if fps > 0 and total_frames > 0: + duration = total_frames / fps + + cap.release() + return valid, width, height, fourcc, duration + + # try cv2 first + has_video, width, height, fourcc, duration = probe_with_cv2(url) + + # fallback to ffprobe if needed + if not has_video or (get_duration and duration < 0): + has_video, width, height, fourcc, duration = await probe_with_ffprobe(url) + + result: dict[str, Any] = {"has_valid_video": has_video} + if has_video: + result.update({"width": width, "height": height}) + if fourcc: + result["fourcc"] = fourcc if get_duration: - result["duration"] = await calculate_duration(video) - - if video is not None: - # Get the width of frames in the video stream - width = video.get(cv2.CAP_PROP_FRAME_WIDTH) - - # Get the height of frames in the video stream - height = video.get(cv2.CAP_PROP_FRAME_HEIGHT) - - # Get the stream encoding - fourcc_int = int(video.get(cv2.CAP_PROP_FOURCC)) - fourcc = ( - chr((fourcc_int >> 0) & 255) - + chr((fourcc_int >> 8) & 255) - + chr((fourcc_int >> 16) & 255) - + chr((fourcc_int >> 24) & 255) - ) - - # Release the video stream - video.release() - - result["width"] = round(width) - result["height"] = round(height) - result["fourcc"] = fourcc + result["duration"] = duration return result diff --git a/frigate/video.py b/frigate/video.py index 57b620e3a..2b88b24ff 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,10 +1,9 @@ -import datetime import logging -import os import queue import subprocess as sp import threading import time +from datetime import datetime, timedelta, timezone from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent from typing import Any @@ -13,6 +12,10 @@ import cv2 from frigate.camera import CameraMetrics, PTZMetrics from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataSubscriber, + RecordingsDataTypeEnum, +) from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.updater import ( @@ -20,8 +23,6 @@ from frigate.config.camera.updater import ( CameraConfigUpdateSubscriber, ) from frigate.const import ( - CACHE_DIR, - CACHE_SEGMENT_FORMAT, PROCESS_PRIORITY_HIGH, REQUEST_REGION_GRID, ) @@ -129,7 +130,7 @@ def capture_frames( fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() - current_frame.value = datetime.datetime.now().timestamp() + current_frame.value = datetime.now().timestamp() frame_name = f"{config.name}_frame{frame_index}" frame_buffer = frame_manager.write(frame_name) try: @@ -199,6 +200,11 @@ class CameraWatchdog(threading.Thread): self.requestor = InterProcessRequestor() self.was_enabled = self.config.enabled + self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all) + self.latest_valid_segment_time: float = 0 + self.latest_invalid_segment_time: float = 0 + self.latest_cache_segment_time: float = 0 + def _update_enabled_state(self) -> bool: """Fetch the latest config and update enabled state.""" self.config_subscriber.check_for_updates() @@ -243,6 +249,11 @@ class CameraWatchdog(threading.Thread): if enabled: self.logger.debug(f"Enabling camera {self.config.name}") self.start_all_ffmpeg() + + # reset all timestamps + self.latest_valid_segment_time = 0 + self.latest_invalid_segment_time = 0 + self.latest_cache_segment_time = 0 else: self.logger.debug(f"Disabling camera {self.config.name}") self.stop_all_ffmpeg() @@ -260,7 +271,37 @@ class CameraWatchdog(threading.Thread): if not enabled: continue - now = datetime.datetime.now().timestamp() + while True: + update = self.segment_subscriber.check_for_update(timeout=0) + + if update == (None, None): + break + + raw_topic, payload = update + if raw_topic and payload: + topic = str(raw_topic) + camera, segment_time, _ = payload + + if camera != self.config.name: + continue + + if topic.endswith(RecordingsDataTypeEnum.valid.value): + self.logger.debug( + f"Latest valid recording segment time on {camera}: {segment_time}" + ) + self.latest_valid_segment_time = segment_time + elif topic.endswith(RecordingsDataTypeEnum.invalid.value): + self.logger.warning( + f"Invalid recording segment detected for {camera} at {segment_time}" + ) + self.latest_invalid_segment_time = segment_time + elif topic.endswith(RecordingsDataTypeEnum.latest.value): + if segment_time is not None: + self.latest_cache_segment_time = segment_time + else: + self.latest_cache_segment_time = 0 + + now = datetime.now().timestamp() if not self.capture_thread.is_alive(): self.requestor.send_data(f"{self.config.name}/status/detect", "offline") @@ -298,18 +339,55 @@ class CameraWatchdog(threading.Thread): poll = p["process"].poll() if self.config.record.enabled and "record" in p["roles"]: - latest_segment_time = self.get_latest_segment_datetime( - p.get( - "latest_segment_time", - datetime.datetime.now().astimezone(datetime.timezone.utc), + now_utc = datetime.now().astimezone(timezone.utc) + + latest_cache_dt = ( + datetime.fromtimestamp( + self.latest_cache_segment_time, tz=timezone.utc ) + if self.latest_cache_segment_time > 0 + else now_utc - timedelta(seconds=1) ) - if datetime.datetime.now().astimezone(datetime.timezone.utc) > ( - latest_segment_time + datetime.timedelta(seconds=120) - ): + latest_valid_dt = ( + datetime.fromtimestamp( + self.latest_valid_segment_time, tz=timezone.utc + ) + if self.latest_valid_segment_time > 0 + else now_utc - timedelta(seconds=1) + ) + + latest_invalid_dt = ( + datetime.fromtimestamp( + self.latest_invalid_segment_time, tz=timezone.utc + ) + if self.latest_invalid_segment_time > 0 + else now_utc - timedelta(seconds=1) + ) + + # ensure segments are still being created and that they have valid video data + cache_stale = now_utc > (latest_cache_dt + timedelta(seconds=120)) + valid_stale = now_utc > (latest_valid_dt + timedelta(seconds=120)) + invalid_stale_condition = ( + self.latest_invalid_segment_time > 0 + and now_utc > (latest_invalid_dt + timedelta(seconds=120)) + and self.latest_valid_segment_time + <= self.latest_invalid_segment_time + ) + invalid_stale = invalid_stale_condition + + if cache_stale or valid_stale or invalid_stale: + if cache_stale: + reason = "No new recording segments were created" + elif valid_stale: + reason = "No new valid recording segments were created" + else: # invalid_stale + reason = ( + "No valid segments created since last invalid segment" + ) + self.logger.error( - f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." + f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..." ) p["process"] = start_or_restart_ffmpeg( p["cmd"], @@ -328,7 +406,7 @@ class CameraWatchdog(threading.Thread): self.requestor.send_data( f"{self.config.name}/status/record", "online" ) - p["latest_segment_time"] = latest_segment_time + p["latest_segment_time"] = self.latest_cache_segment_time if poll is None: continue @@ -346,6 +424,7 @@ class CameraWatchdog(threading.Thread): self.stop_all_ffmpeg() self.logpipe.close() self.config_subscriber.stop() + self.segment_subscriber.stop() def start_ffmpeg_detect(self): ffmpeg_cmd = [ @@ -405,33 +484,6 @@ class CameraWatchdog(threading.Thread): p["logpipe"].close() self.ffmpeg_other_processes.clear() - def get_latest_segment_datetime( - self, latest_segment: datetime.datetime - ) -> datetime.datetime: - """Checks if ffmpeg is still writing recording segments to cache.""" - cache_files = sorted( - [ - d - for d in os.listdir(CACHE_DIR) - if os.path.isfile(os.path.join(CACHE_DIR, d)) - and d.endswith(".mp4") - and not d.startswith("preview_") - ] - ) - newest_segment_time = latest_segment - - for file in cache_files: - if self.config.name in file: - basename = os.path.splitext(file)[0] - _, date = basename.rsplit("@", maxsplit=1) - segment_time = datetime.datetime.strptime( - date, CACHE_SEGMENT_FORMAT - ).astimezone(datetime.timezone.utc) - if segment_time > newest_segment_time: - newest_segment_time = segment_time - - return newest_segment_time - class CameraCaptureRunner(threading.Thread): def __init__( @@ -727,10 +779,7 @@ def process_frames( time.sleep(0.1) continue - if ( - datetime.datetime.now().astimezone(datetime.timezone.utc) - > next_region_update - ): + if datetime.now().astimezone(timezone.utc) > next_region_update: region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name) next_region_update = get_tomorrow_at_time(2)