From 8839f735f4ab33206e68b5db7c6d50326dd0ef49 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 26 Sep 2025 16:59:43 -0500 Subject: [PATCH] check for valid video data - restart separate record ffmpeg process if no video data has been received in 120s - refactor datetime import --- frigate/video.py | 69 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/frigate/video.py b/frigate/video.py index 57b620e3a..1284b7ecf 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,10 +1,10 @@ -import datetime import logging import os import queue import subprocess as sp import threading import time +from datetime import datetime, timedelta, timezone from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent from typing import Any @@ -13,6 +13,10 @@ import cv2 from frigate.camera import CameraMetrics, PTZMetrics from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataSubscriber, + RecordingsDataTypeEnum, +) from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.updater import ( @@ -129,7 +133,7 @@ def capture_frames( fps.value = frame_rate.eps() skipped_fps.value = skipped_eps.eps() - current_frame.value = datetime.datetime.now().timestamp() + current_frame.value = datetime.now().timestamp() frame_name = f"{config.name}_frame{frame_index}" frame_buffer = frame_manager.write(frame_name) try: @@ -199,6 +203,13 @@ class CameraWatchdog(threading.Thread): self.requestor = InterProcessRequestor() self.was_enabled = self.config.enabled + self.segment_subscriber = RecordingsDataSubscriber( + RecordingsDataTypeEnum.latest_valid_segment + ) + self.latest_valid_segment_time: float = ( + datetime.now().astimezone(timezone.utc).timestamp() + ) + def _update_enabled_state(self) -> bool: """Fetch the latest config and update enabled state.""" self.config_subscriber.check_for_updates() @@ -260,7 +271,26 @@ class CameraWatchdog(threading.Thread): if not enabled: continue - now = datetime.datetime.now().timestamp() + update = self.segment_subscriber.check_for_update(timeout=0) + + if update is not None: + raw_topic, payload = update + if raw_topic and payload: + topic = str(raw_topic) + if ( + topic.endswith( + RecordingsDataTypeEnum.latest_valid_segment.value + ) + and payload + ): + camera, latest_valid_segment_time, _ = payload + if camera == self.config.name: + logger.debug( + f"Latest valid recording segment time on {camera}: {latest_valid_segment_time}" + ) + self.latest_valid_segment_time = latest_valid_segment_time + + now = datetime.now().timestamp() if not self.capture_thread.is_alive(): self.requestor.send_data(f"{self.config.name}/status/detect", "offline") @@ -298,18 +328,27 @@ class CameraWatchdog(threading.Thread): poll = p["process"].poll() if self.config.record.enabled and "record" in p["roles"]: + now_utc = datetime.now().astimezone(timezone.utc) + latest_segment_time = self.get_latest_segment_datetime( p.get( "latest_segment_time", - datetime.datetime.now().astimezone(datetime.timezone.utc), + now_utc, ) ) - if datetime.datetime.now().astimezone(datetime.timezone.utc) > ( - latest_segment_time + datetime.timedelta(seconds=120) + # ensure segments are still being created and that they have valid video data + if now_utc > (latest_segment_time + timedelta(seconds=120)) or ( + now_utc + > ( + datetime.fromtimestamp( + self.latest_valid_segment_time, tz=timezone.utc + ) + + timedelta(seconds=120) + ) ): self.logger.error( - f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." + f"No new or valid recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." ) p["process"] = start_or_restart_ffmpeg( p["cmd"], @@ -346,6 +385,7 @@ class CameraWatchdog(threading.Thread): self.stop_all_ffmpeg() self.logpipe.close() self.config_subscriber.stop() + self.segment_subscriber.stop() def start_ffmpeg_detect(self): ffmpeg_cmd = [ @@ -405,9 +445,7 @@ class CameraWatchdog(threading.Thread): p["logpipe"].close() self.ffmpeg_other_processes.clear() - def get_latest_segment_datetime( - self, latest_segment: datetime.datetime - ) -> datetime.datetime: + def get_latest_segment_datetime(self, latest_segment: datetime) -> datetime: """Checks if ffmpeg is still writing recording segments to cache.""" cache_files = sorted( [ @@ -424,9 +462,9 @@ class CameraWatchdog(threading.Thread): if self.config.name in file: basename = os.path.splitext(file)[0] _, date = basename.rsplit("@", maxsplit=1) - segment_time = datetime.datetime.strptime( - date, CACHE_SEGMENT_FORMAT - ).astimezone(datetime.timezone.utc) + segment_time = datetime.strptime(date, CACHE_SEGMENT_FORMAT).astimezone( + timezone.utc + ) if segment_time > newest_segment_time: newest_segment_time = segment_time @@ -727,10 +765,7 @@ def process_frames( time.sleep(0.1) continue - if ( - datetime.datetime.now().astimezone(datetime.timezone.utc) - > next_region_update - ): + if datetime.now().astimezone(timezone.utc) > next_region_update: region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name) next_region_update = get_tomorrow_at_time(2)