refactor to move get_latest_segment_datetime logic to recordings maintainer

This commit is contained in:
Josh Hawkins 2025-09-26 19:37:28 -05:00
parent 36804de391
commit 13d06f9be3
4 changed files with 131 additions and 73 deletions

View File

@ -11,11 +11,13 @@ logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum): class RecordingsDataTypeEnum(str, Enum):
all = "" all = ""
recordings_available_through = "recordings_available_through" saved = "saved" # segment has been saved to db
latest_valid_segment = "latest_valid_segment" latest = "latest" # segment is in cache
valid = "valid" # segment is valid
invalid = "invalid" # segment is invalid
class RecordingsDataPublisher(Publisher[tuple[str, float]]): class RecordingsDataPublisher(Publisher[Any]):
"""Publishes latest recording data.""" """Publishes latest recording data."""
topic_base = "recordings/" topic_base = "recordings/"

View File

@ -144,7 +144,7 @@ class EmbeddingMaintainer(threading.Thread):
EventMetadataTypeEnum.regenerate_description EventMetadataTypeEnum.regenerate_description
) )
self.recordings_subscriber = RecordingsDataSubscriber( self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through RecordingsDataTypeEnum.saved
) )
self.review_subscriber = ReviewDataSubscriber("") self.review_subscriber = ReviewDataSubscriber("")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
@ -536,10 +536,8 @@ class EmbeddingMaintainer(threading.Thread):
topic = str(raw_topic) topic = str(raw_topic)
if topic.endswith( if topic.endswith(RecordingsDataTypeEnum.saved.value):
RecordingsDataTypeEnum.recordings_available_through.value camera, recordings_available_through_timestamp, _ = payload
):
camera, recordings_available_through_timestamp = payload
self.recordings_available_through[camera] = ( self.recordings_available_through[camera] = (
recordings_available_through_timestamp recordings_available_through_timestamp

View File

@ -96,6 +96,41 @@ class RecordingMaintainer(threading.Thread):
and not d.startswith("preview_") and not d.startswith("preview_")
] ]
# publish newest cached segment per camera (including in use files)
newest_cache_segments: dict[str, dict[str, Any]] = {}
for cache in cache_files:
cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("@", maxsplit=1)
start_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
if (
camera not in newest_cache_segments
or start_time > newest_cache_segments[camera]["start_time"]
):
newest_cache_segments[camera] = {
"start_time": start_time,
"cache_path": cache_path,
}
for camera, newest in newest_cache_segments.items():
self.recordings_publisher.publish(
(
camera,
newest["start_time"].timestamp(),
newest["cache_path"],
),
RecordingsDataTypeEnum.latest.value,
)
# publish None for cameras with no cache files (but only if we know the camera exists)
for camera_name in self.config.cameras:
if camera_name not in newest_cache_segments:
self.recordings_publisher.publish(
(camera_name, None, None),
RecordingsDataTypeEnum.latest.value,
)
files_in_use = [] files_in_use = []
for process in psutil.process_iter(): for process in psutil.process_iter():
try: try:
@ -109,7 +144,7 @@ class RecordingMaintainer(threading.Thread):
except psutil.Error: except psutil.Error:
continue continue
# group recordings by camera # group recordings by camera (skip in-use for validation/moving)
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for cache in cache_files: for cache in cache_files:
# Skip files currently in use # Skip files currently in use
@ -231,8 +266,9 @@ class RecordingMaintainer(threading.Thread):
recordings[0]["start_time"].timestamp() recordings[0]["start_time"].timestamp()
if self.config.cameras[camera].record.enabled if self.config.cameras[camera].record.enabled
else None, else None,
None,
), ),
RecordingsDataTypeEnum.recordings_available_through.value, RecordingsDataTypeEnum.saved.value,
) )
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
@ -273,6 +309,10 @@ class RecordingMaintainer(threading.Thread):
logger.warning( logger.warning(
f"Invalid or missing video stream in segment {cache_path}. Discarding." f"Invalid or missing video stream in segment {cache_path}. Discarding."
) )
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.invalid.value,
)
self.drop_segment(cache_path) self.drop_segment(cache_path)
return None return None
@ -287,13 +327,17 @@ class RecordingMaintainer(threading.Thread):
logger.warning(f"Failed to probe corrupt segment {cache_path}") logger.warning(f"Failed to probe corrupt segment {cache_path}")
logger.warning(f"Discarding a corrupt recording segment: {cache_path}") logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.invalid.value,
)
self.drop_segment(cache_path) self.drop_segment(cache_path)
return None return None
# this segment has a valid duration and has video data, so publish an update # this segment has a valid duration and has video data, so publish an update
self.recordings_publisher.publish( self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path), (camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.latest_valid_segment.value, RecordingsDataTypeEnum.valid.value,
) )
record_config = self.config.cameras[camera].record record_config = self.config.cameras[camera].record

View File

@ -1,5 +1,4 @@
import logging import logging
import os
import queue import queue
import subprocess as sp import subprocess as sp
import threading import threading
@ -24,8 +23,6 @@ from frigate.config.camera.updater import (
CameraConfigUpdateSubscriber, CameraConfigUpdateSubscriber,
) )
from frigate.const import ( from frigate.const import (
CACHE_DIR,
CACHE_SEGMENT_FORMAT,
PROCESS_PRIORITY_HIGH, PROCESS_PRIORITY_HIGH,
REQUEST_REGION_GRID, REQUEST_REGION_GRID,
) )
@ -203,12 +200,10 @@ class CameraWatchdog(threading.Thread):
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.was_enabled = self.config.enabled self.was_enabled = self.config.enabled
self.segment_subscriber = RecordingsDataSubscriber( self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all)
RecordingsDataTypeEnum.latest_valid_segment self.latest_valid_segment_time: float = 0
) self.latest_invalid_segment_time: float = 0
self.latest_valid_segment_time: float = ( self.latest_cache_segment_time: float = 0
datetime.now().astimezone(timezone.utc).timestamp()
)
def _update_enabled_state(self) -> bool: def _update_enabled_state(self) -> bool:
"""Fetch the latest config and update enabled state.""" """Fetch the latest config and update enabled state."""
@ -254,6 +249,11 @@ class CameraWatchdog(threading.Thread):
if enabled: if enabled:
self.logger.debug(f"Enabling camera {self.config.name}") self.logger.debug(f"Enabling camera {self.config.name}")
self.start_all_ffmpeg() self.start_all_ffmpeg()
# reset all timestamps
self.latest_valid_segment_time = 0
self.latest_invalid_segment_time = 0
self.latest_cache_segment_time = 0
else: else:
self.logger.debug(f"Disabling camera {self.config.name}") self.logger.debug(f"Disabling camera {self.config.name}")
self.stop_all_ffmpeg() self.stop_all_ffmpeg()
@ -271,24 +271,35 @@ class CameraWatchdog(threading.Thread):
if not enabled: if not enabled:
continue continue
update = self.segment_subscriber.check_for_update(timeout=0) while True:
update = self.segment_subscriber.check_for_update(timeout=0)
if update == (None, None):
break
if update is not None:
raw_topic, payload = update raw_topic, payload = update
if raw_topic and payload: if raw_topic and payload:
topic = str(raw_topic) topic = str(raw_topic)
if ( camera, segment_time, _ = payload
topic.endswith(
RecordingsDataTypeEnum.latest_valid_segment.value if camera != self.config.name:
continue
if topic.endswith(RecordingsDataTypeEnum.valid.value):
self.logger.info(
f"Latest valid recording segment time on {camera}: {segment_time}"
) )
and payload self.latest_valid_segment_time = segment_time
): elif topic.endswith(RecordingsDataTypeEnum.invalid.value):
camera, latest_valid_segment_time, _ = payload self.logger.warning(
if camera == self.config.name: f"Invalid recording segment detected for {camera} at {segment_time}"
logger.debug( )
f"Latest valid recording segment time on {camera}: {latest_valid_segment_time}" self.latest_invalid_segment_time = segment_time
) elif topic.endswith(RecordingsDataTypeEnum.latest.value):
self.latest_valid_segment_time = latest_valid_segment_time if segment_time is not None:
self.latest_cache_segment_time = segment_time
else:
self.latest_cache_segment_time = 0
now = datetime.now().timestamp() now = datetime.now().timestamp()
@ -330,25 +341,53 @@ class CameraWatchdog(threading.Thread):
if self.config.record.enabled and "record" in p["roles"]: if self.config.record.enabled and "record" in p["roles"]:
now_utc = datetime.now().astimezone(timezone.utc) now_utc = datetime.now().astimezone(timezone.utc)
latest_segment_time = self.get_latest_segment_datetime( latest_cache_dt = (
p.get( datetime.fromtimestamp(
"latest_segment_time", self.latest_cache_segment_time, tz=timezone.utc
now_utc,
) )
if self.latest_cache_segment_time > 0
else now_utc - timedelta(seconds=1)
)
latest_valid_dt = (
datetime.fromtimestamp(
self.latest_valid_segment_time, tz=timezone.utc
)
if self.latest_valid_segment_time > 0
else now_utc - timedelta(seconds=1)
)
latest_invalid_dt = (
datetime.fromtimestamp(
self.latest_invalid_segment_time, tz=timezone.utc
)
if self.latest_invalid_segment_time > 0
else now_utc - timedelta(seconds=1)
) )
# ensure segments are still being created and that they have valid video data # ensure segments are still being created and that they have valid video data
if now_utc > (latest_segment_time + timedelta(seconds=120)) or ( cache_stale = now_utc > (latest_cache_dt + timedelta(seconds=120))
now_utc valid_stale = now_utc > (latest_valid_dt + timedelta(seconds=120))
> ( invalid_stale_condition = (
datetime.fromtimestamp( self.latest_invalid_segment_time > 0
self.latest_valid_segment_time, tz=timezone.utc and now_utc > (latest_invalid_dt + timedelta(seconds=120))
and self.latest_valid_segment_time
<= self.latest_invalid_segment_time
)
invalid_stale = invalid_stale_condition
if cache_stale or valid_stale or invalid_stale:
if cache_stale:
reason = "No new recording segments were created"
elif valid_stale:
reason = "No new valid recording segments were created"
else: # invalid_stale
reason = (
"No valid segments created since last invalid segment"
) )
+ timedelta(seconds=120)
)
):
self.logger.error( self.logger.error(
f"No new or valid recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..."
) )
p["process"] = start_or_restart_ffmpeg( p["process"] = start_or_restart_ffmpeg(
p["cmd"], p["cmd"],
@ -367,7 +406,7 @@ class CameraWatchdog(threading.Thread):
self.requestor.send_data( self.requestor.send_data(
f"{self.config.name}/status/record", "online" f"{self.config.name}/status/record", "online"
) )
p["latest_segment_time"] = latest_segment_time p["latest_segment_time"] = self.latest_cache_segment_time
if poll is None: if poll is None:
continue continue
@ -445,31 +484,6 @@ class CameraWatchdog(threading.Thread):
p["logpipe"].close() p["logpipe"].close()
self.ffmpeg_other_processes.clear() self.ffmpeg_other_processes.clear()
def get_latest_segment_datetime(self, latest_segment: datetime) -> datetime:
"""Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("preview_")
]
)
newest_segment_time = latest_segment
for file in cache_files:
if self.config.name in file:
basename = os.path.splitext(file)[0]
_, date = basename.rsplit("@", maxsplit=1)
segment_time = datetime.strptime(date, CACHE_SEGMENT_FORMAT).astimezone(
timezone.utc
)
if segment_time > newest_segment_time:
newest_segment_time = segment_time
return newest_segment_time
class CameraCaptureRunner(threading.Thread): class CameraCaptureRunner(threading.Thread):
def __init__( def __init__(
@ -680,7 +694,7 @@ def process_frames(
camera_metrics: CameraMetrics, camera_metrics: CameraMetrics,
stop_event: MpEvent, stop_event: MpEvent,
ptz_metrics: PTZMetrics, ptz_metrics: PTZMetrics,
region_grid: list[list[dict[str, Any]]], region_grid: list[list, Any],
exit_on_empty: bool = False, exit_on_empty: bool = False,
): ):
next_region_update = get_tomorrow_at_time(2) next_region_update = get_tomorrow_at_time(2)