check for valid video data

- restart separate record ffmpeg process if no video data has been received in 120s
- refactor datetime import
This commit is contained in:
Josh Hawkins 2025-09-26 16:59:43 -05:00
parent f9861d4060
commit 8839f735f4

View File

@ -1,10 +1,10 @@
import datetime
import logging import logging
import os import os
import queue import queue
import subprocess as sp import subprocess as sp
import threading import threading
import time import time
from datetime import datetime, timedelta, timezone
from multiprocessing import Queue, Value from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Any from typing import Any
@ -13,6 +13,10 @@ import cv2
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config import CameraConfig, DetectConfig, ModelConfig
from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import ( from frigate.config.camera.updater import (
@ -129,7 +133,7 @@ def capture_frames(
fps.value = frame_rate.eps() fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps() skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.datetime.now().timestamp() current_frame.value = datetime.now().timestamp()
frame_name = f"{config.name}_frame{frame_index}" frame_name = f"{config.name}_frame{frame_index}"
frame_buffer = frame_manager.write(frame_name) frame_buffer = frame_manager.write(frame_name)
try: try:
@ -199,6 +203,13 @@ class CameraWatchdog(threading.Thread):
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.was_enabled = self.config.enabled self.was_enabled = self.config.enabled
self.segment_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.latest_valid_segment
)
self.latest_valid_segment_time: float = (
datetime.now().astimezone(timezone.utc).timestamp()
)
def _update_enabled_state(self) -> bool: def _update_enabled_state(self) -> bool:
"""Fetch the latest config and update enabled state.""" """Fetch the latest config and update enabled state."""
self.config_subscriber.check_for_updates() self.config_subscriber.check_for_updates()
@ -260,7 +271,26 @@ class CameraWatchdog(threading.Thread):
if not enabled: if not enabled:
continue continue
now = datetime.datetime.now().timestamp() update = self.segment_subscriber.check_for_update(timeout=0)
if update is not None:
raw_topic, payload = update
if raw_topic and payload:
topic = str(raw_topic)
if (
topic.endswith(
RecordingsDataTypeEnum.latest_valid_segment.value
)
and payload
):
camera, latest_valid_segment_time, _ = payload
if camera == self.config.name:
logger.debug(
f"Latest valid recording segment time on {camera}: {latest_valid_segment_time}"
)
self.latest_valid_segment_time = latest_valid_segment_time
now = datetime.now().timestamp()
if not self.capture_thread.is_alive(): if not self.capture_thread.is_alive():
self.requestor.send_data(f"{self.config.name}/status/detect", "offline") self.requestor.send_data(f"{self.config.name}/status/detect", "offline")
@ -298,18 +328,27 @@ class CameraWatchdog(threading.Thread):
poll = p["process"].poll() poll = p["process"].poll()
if self.config.record.enabled and "record" in p["roles"]: if self.config.record.enabled and "record" in p["roles"]:
now_utc = datetime.now().astimezone(timezone.utc)
latest_segment_time = self.get_latest_segment_datetime( latest_segment_time = self.get_latest_segment_datetime(
p.get( p.get(
"latest_segment_time", "latest_segment_time",
datetime.datetime.now().astimezone(datetime.timezone.utc), now_utc,
) )
) )
if datetime.datetime.now().astimezone(datetime.timezone.utc) > ( # ensure segments are still being created and that they have valid video data
latest_segment_time + datetime.timedelta(seconds=120) if now_utc > (latest_segment_time + timedelta(seconds=120)) or (
now_utc
> (
datetime.fromtimestamp(
self.latest_valid_segment_time, tz=timezone.utc
)
+ timedelta(seconds=120)
)
): ):
self.logger.error( self.logger.error(
f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." f"No new or valid recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..."
) )
p["process"] = start_or_restart_ffmpeg( p["process"] = start_or_restart_ffmpeg(
p["cmd"], p["cmd"],
@ -346,6 +385,7 @@ class CameraWatchdog(threading.Thread):
self.stop_all_ffmpeg() self.stop_all_ffmpeg()
self.logpipe.close() self.logpipe.close()
self.config_subscriber.stop() self.config_subscriber.stop()
self.segment_subscriber.stop()
def start_ffmpeg_detect(self): def start_ffmpeg_detect(self):
ffmpeg_cmd = [ ffmpeg_cmd = [
@ -405,9 +445,7 @@ class CameraWatchdog(threading.Thread):
p["logpipe"].close() p["logpipe"].close()
self.ffmpeg_other_processes.clear() self.ffmpeg_other_processes.clear()
def get_latest_segment_datetime( def get_latest_segment_datetime(self, latest_segment: datetime) -> datetime:
self, latest_segment: datetime.datetime
) -> datetime.datetime:
"""Checks if ffmpeg is still writing recording segments to cache.""" """Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted( cache_files = sorted(
[ [
@ -424,9 +462,9 @@ class CameraWatchdog(threading.Thread):
if self.config.name in file: if self.config.name in file:
basename = os.path.splitext(file)[0] basename = os.path.splitext(file)[0]
_, date = basename.rsplit("@", maxsplit=1) _, date = basename.rsplit("@", maxsplit=1)
segment_time = datetime.datetime.strptime( segment_time = datetime.strptime(date, CACHE_SEGMENT_FORMAT).astimezone(
date, CACHE_SEGMENT_FORMAT timezone.utc
).astimezone(datetime.timezone.utc) )
if segment_time > newest_segment_time: if segment_time > newest_segment_time:
newest_segment_time = segment_time newest_segment_time = segment_time
@ -727,10 +765,7 @@ def process_frames(
time.sleep(0.1) time.sleep(0.1)
continue continue
if ( if datetime.now().astimezone(timezone.utc) > next_region_update:
datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update
):
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name) region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
next_region_update = get_tomorrow_at_time(2) next_region_update = get_tomorrow_at_time(2)