Compare commits

..

No commits in common. "12f8c3feac5d45976fa2055df2a9c49f0a546031" and "c207009d8a51f93a2617ecbdf90ec81a50865abd" have entirely different histories.

7 changed files with 164 additions and 302 deletions

View File

@ -822,9 +822,9 @@ async def vod_ts(camera_name: str, start_ts: float, end_ts: float):
dependencies=[Depends(require_camera_access)], dependencies=[Depends(require_camera_access)],
description="Returns an HLS playlist for the specified date-time on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.", description="Returns an HLS playlist for the specified date-time on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
) )
async def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str): def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str):
"""VOD for specific hour. Uses the default timezone (UTC).""" """VOD for specific hour. Uses the default timezone (UTC)."""
return await vod_hour( return vod_hour(
year_month, day, hour, camera_name, get_localzone_name().replace("/", ",") year_month, day, hour, camera_name, get_localzone_name().replace("/", ",")
) )
@ -834,9 +834,7 @@ async def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name
dependencies=[Depends(require_camera_access)], dependencies=[Depends(require_camera_access)],
description="Returns an HLS playlist for the specified date-time (with timezone) on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.", description="Returns an HLS playlist for the specified date-time (with timezone) on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
) )
async def vod_hour( def vod_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: str):
year_month: str, day: int, hour: int, camera_name: str, tz_name: str
):
parts = year_month.split("-") parts = year_month.split("-")
start_date = ( start_date = (
datetime(int(parts[0]), int(parts[1]), day, hour, tzinfo=timezone.utc) datetime(int(parts[0]), int(parts[1]), day, hour, tzinfo=timezone.utc)
@ -846,7 +844,7 @@ async def vod_hour(
start_ts = start_date.timestamp() start_ts = start_date.timestamp()
end_ts = end_date.timestamp() end_ts = end_date.timestamp()
return await vod_ts(camera_name, start_ts, end_ts) return vod_ts(camera_name, start_ts, end_ts)
@router.get( @router.get(
@ -877,7 +875,7 @@ async def vod_event(
if event.end_time is None if event.end_time is None
else (event.end_time + padding) else (event.end_time + padding)
) )
vod_response = await vod_ts(event.camera, event.start_time - padding, end_ts) vod_response = vod_ts(event.camera, event.start_time - padding, end_ts)
# If the recordings are not found and the event started more than 5 minutes ago, set has_clip to false # If the recordings are not found and the event started more than 5 minutes ago, set has_clip to false
if ( if (
@ -1250,7 +1248,7 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
@router.get("/events/{event_id}/clip.mp4") @router.get("/events/{event_id}/clip.mp4")
async def event_clip( def event_clip(
request: Request, request: Request,
event_id: str, event_id: str,
padding: int = Query(0, description="Padding to apply to clip."), padding: int = Query(0, description="Padding to apply to clip."),
@ -1272,9 +1270,7 @@ async def event_clip(
if event.end_time is None if event.end_time is None
else event.end_time + padding else event.end_time + padding
) )
return await recording_clip( return recording_clip(request, event.camera, event.start_time - padding, end_ts)
request, event.camera, event.start_time - padding, end_ts
)
@router.get("/events/{event_id}/preview.gif") @router.get("/events/{event_id}/preview.gif")
@ -1702,7 +1698,7 @@ def preview_thumbnail(file_name: str):
"/{camera_name}/{label}/thumbnail.jpg", "/{camera_name}/{label}/thumbnail.jpg",
dependencies=[Depends(require_camera_access)], dependencies=[Depends(require_camera_access)],
) )
async def label_thumbnail(request: Request, camera_name: str, label: str): def label_thumbnail(request: Request, camera_name: str, label: str):
label = unquote(label) label = unquote(label)
event_query = Event.select(fn.MAX(Event.id)).where(Event.camera == camera_name) event_query = Event.select(fn.MAX(Event.id)).where(Event.camera == camera_name)
if label != "any": if label != "any":
@ -1711,7 +1707,7 @@ async def label_thumbnail(request: Request, camera_name: str, label: str):
try: try:
event_id = event_query.scalar() event_id = event_query.scalar()
return await event_thumbnail(request, event_id, Extension.jpg, 60) return event_thumbnail(request, event_id, Extension.jpg, 60)
except DoesNotExist: except DoesNotExist:
frame = np.zeros((175, 175, 3), np.uint8) frame = np.zeros((175, 175, 3), np.uint8)
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
@ -1726,7 +1722,7 @@ async def label_thumbnail(request: Request, camera_name: str, label: str):
@router.get( @router.get(
"/{camera_name}/{label}/clip.mp4", dependencies=[Depends(require_camera_access)] "/{camera_name}/{label}/clip.mp4", dependencies=[Depends(require_camera_access)]
) )
async def label_clip(request: Request, camera_name: str, label: str): def label_clip(request: Request, camera_name: str, label: str):
label = unquote(label) label = unquote(label)
event_query = Event.select(fn.MAX(Event.id)).where( event_query = Event.select(fn.MAX(Event.id)).where(
Event.camera == camera_name, Event.has_clip == True Event.camera == camera_name, Event.has_clip == True
@ -1737,7 +1733,7 @@ async def label_clip(request: Request, camera_name: str, label: str):
try: try:
event = event_query.get() event = event_query.get()
return await event_clip(request, event.id) return event_clip(request, event.id)
except DoesNotExist: except DoesNotExist:
return JSONResponse( return JSONResponse(
content={"success": False, "message": "Event not found"}, status_code=404 content={"success": False, "message": "Event not found"}, status_code=404
@ -1747,7 +1743,7 @@ async def label_clip(request: Request, camera_name: str, label: str):
@router.get( @router.get(
"/{camera_name}/{label}/snapshot.jpg", dependencies=[Depends(require_camera_access)] "/{camera_name}/{label}/snapshot.jpg", dependencies=[Depends(require_camera_access)]
) )
async def label_snapshot(request: Request, camera_name: str, label: str): def label_snapshot(request: Request, camera_name: str, label: str):
"""Returns the snapshot image from the latest event for the given camera and label combo""" """Returns the snapshot image from the latest event for the given camera and label combo"""
label = unquote(label) label = unquote(label)
if label == "any": if label == "any":
@ -1768,7 +1764,7 @@ async def label_snapshot(request: Request, camera_name: str, label: str):
try: try:
event: Event = event_query.get() event: Event = event_query.get()
return await event_snapshot(request, event.id, MediaEventsSnapshotQueryParams()) return event_snapshot(request, event.id, MediaEventsSnapshotQueryParams())
except DoesNotExist: except DoesNotExist:
frame = np.zeros((720, 1280, 3), np.uint8) frame = np.zeros((720, 1280, 3), np.uint8)
_, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) _, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])

View File

@ -2,7 +2,6 @@
import logging import logging
from enum import Enum from enum import Enum
from typing import Any
from .zmq_proxy import Publisher, Subscriber from .zmq_proxy import Publisher, Subscriber
@ -11,21 +10,18 @@ logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum): class RecordingsDataTypeEnum(str, Enum):
all = "" all = ""
saved = "saved" # segment has been saved to db recordings_available_through = "recordings_available_through"
latest = "latest" # segment is in cache
valid = "valid" # segment is valid
invalid = "invalid" # segment is invalid
class RecordingsDataPublisher(Publisher[Any]): class RecordingsDataPublisher(Publisher[tuple[str, float]]):
"""Publishes latest recording data.""" """Publishes latest recording data."""
topic_base = "recordings/" topic_base = "recordings/"
def __init__(self) -> None: def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__() super().__init__(topic.value)
def publish(self, payload: Any, sub_topic: str = "") -> None: def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
super().publish(payload, sub_topic) super().publish(payload, sub_topic)
@ -36,11 +32,3 @@ class RecordingsDataSubscriber(Subscriber):
def __init__(self, topic: RecordingsDataTypeEnum) -> None: def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value) super().__init__(topic.value)
def _return_object(
self, topic: str, payload: tuple | None
) -> tuple[str, Any] | tuple[None, None]:
if payload is None:
return (None, None)
return (topic, payload)

View File

@ -2,6 +2,10 @@ import logging
import os import os
import numpy as np import numpy as np
from synap import Network
from synap.postprocessor import Detector
from synap.preprocessor import Preprocessor
from synap.types import Layout, Shape
from typing_extensions import Literal from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detection_api import DetectionApi
@ -11,16 +15,6 @@ from frigate.detectors.detector_config import (
ModelTypeEnum, ModelTypeEnum,
) )
try:
from synap import Network
from synap.postprocessor import Detector
from synap.preprocessor import Preprocessor
from synap.types import Layout, Shape
SYNAP_SUPPORT = True
except ImportError:
SYNAP_SUPPORT = False
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DETECTOR_KEY = "synaptics" DETECTOR_KEY = "synaptics"
@ -34,21 +28,15 @@ class SynapDetector(DetectionApi):
type_key = DETECTOR_KEY type_key = DETECTOR_KEY
def __init__(self, detector_config: SynapDetectorConfig): def __init__(self, detector_config: SynapDetectorConfig):
if not SYNAP_SUPPORT:
logger.error(
"Error importing Synaptics SDK modules. You must use the -synaptics Docker image variant for Synaptics detector support."
)
return
try: try:
_, ext = os.path.splitext(detector_config.model.path) _, ext = os.path.splitext(detector_config.model.path)
if ext and ext != ".synap": if ext and ext != ".synap":
raise ValueError("Model path config for Synap1680 is incorrect.") raise ValueError("Model path config for Synap1680 is wrong.")
synap_network = Network(detector_config.model.path) synap_network = Network(detector_config.model.path)
logger.info(f"Synap NPU loaded model: {detector_config.model.path}") logger.info(f"Synap NPU loaded model: {detector_config.model.path}")
except ValueError as ve: except ValueError as ve:
logger.error(f"Synap1680 setup has failed: {ve}") logger.error(f"Config to Synap1680 was Failed: {ve}")
raise raise
except Exception as e: except Exception as e:
logger.error(f"Failed to init Synap NPU: {e}") logger.error(f"Failed to init Synap NPU: {e}")

View File

@ -144,7 +144,7 @@ class EmbeddingMaintainer(threading.Thread):
EventMetadataTypeEnum.regenerate_description EventMetadataTypeEnum.regenerate_description
) )
self.recordings_subscriber = RecordingsDataSubscriber( self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.saved RecordingsDataTypeEnum.recordings_available_through
) )
self.review_subscriber = ReviewDataSubscriber("") self.review_subscriber = ReviewDataSubscriber("")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
@ -525,20 +525,12 @@ class EmbeddingMaintainer(threading.Thread):
def _process_recordings_updates(self) -> None: def _process_recordings_updates(self) -> None:
"""Process recordings updates.""" """Process recordings updates."""
while True: while True:
update = self.recordings_subscriber.check_for_update() recordings_data = self.recordings_subscriber.check_for_update()
if not update: if recordings_data == None:
break break
(raw_topic, payload) = update camera, recordings_available_through_timestamp = recordings_data
if not raw_topic or not payload:
break
topic = str(raw_topic)
if topic.endswith(RecordingsDataTypeEnum.saved.value):
camera, recordings_available_through_timestamp, _ = payload
self.recordings_available_through[camera] = ( self.recordings_available_through[camera] = (
recordings_available_through_timestamp recordings_available_through_timestamp

View File

@ -80,7 +80,9 @@ class RecordingMaintainer(threading.Thread):
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record], [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
) )
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
self.recordings_publisher = RecordingsDataPublisher() self.recordings_publisher = RecordingsDataPublisher(
RecordingsDataTypeEnum.recordings_available_through
)
self.stop_event = stop_event self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list) self.object_recordings_info: dict[str, list] = defaultdict(list)
@ -96,41 +98,6 @@ class RecordingMaintainer(threading.Thread):
and not d.startswith("preview_") and not d.startswith("preview_")
] ]
# publish newest cached segment per camera (including in use files)
newest_cache_segments: dict[str, dict[str, Any]] = {}
for cache in cache_files:
cache_path = os.path.join(CACHE_DIR, cache)
basename = os.path.splitext(cache)[0]
camera, date = basename.rsplit("@", maxsplit=1)
start_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
if (
camera not in newest_cache_segments
or start_time > newest_cache_segments[camera]["start_time"]
):
newest_cache_segments[camera] = {
"start_time": start_time,
"cache_path": cache_path,
}
for camera, newest in newest_cache_segments.items():
self.recordings_publisher.publish(
(
camera,
newest["start_time"].timestamp(),
newest["cache_path"],
),
RecordingsDataTypeEnum.latest.value,
)
# publish None for cameras with no cache files (but only if we know the camera exists)
for camera_name in self.config.cameras:
if camera_name not in newest_cache_segments:
self.recordings_publisher.publish(
(camera_name, None, None),
RecordingsDataTypeEnum.latest.value,
)
files_in_use = [] files_in_use = []
for process in psutil.process_iter(): for process in psutil.process_iter():
try: try:
@ -144,7 +111,7 @@ class RecordingMaintainer(threading.Thread):
except psutil.Error: except psutil.Error:
continue continue
# group recordings by camera (skip in-use for validation/moving) # group recordings by camera
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
for cache in cache_files: for cache in cache_files:
# Skip files currently in use # Skip files currently in use
@ -266,9 +233,7 @@ class RecordingMaintainer(threading.Thread):
recordings[0]["start_time"].timestamp() recordings[0]["start_time"].timestamp()
if self.config.cameras[camera].record.enabled if self.config.cameras[camera].record.enabled
else None, else None,
None, )
),
RecordingsDataTypeEnum.saved.value,
) )
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
@ -285,7 +250,7 @@ class RecordingMaintainer(threading.Thread):
async def validate_and_move_segment( async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any] self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
) -> Optional[Recordings]: ) -> None:
cache_path: str = recording["cache_path"] cache_path: str = recording["cache_path"]
start_time: datetime.datetime = recording["start_time"] start_time: datetime.datetime = recording["start_time"]
record_config = self.config.cameras[camera].record record_config = self.config.cameras[camera].record
@ -296,7 +261,7 @@ class RecordingMaintainer(threading.Thread):
or not self.config.cameras[camera].record.enabled or not self.config.cameras[camera].record.enabled
): ):
self.drop_segment(cache_path) self.drop_segment(cache_path)
return None return
if cache_path in self.end_time_cache: if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path] end_time, duration = self.end_time_cache[cache_path]
@ -305,18 +270,10 @@ class RecordingMaintainer(threading.Thread):
self.config.ffmpeg, cache_path, get_duration=True self.config.ffmpeg, cache_path, get_duration=True
) )
if not segment_info.get("has_valid_video", False): if segment_info["duration"]:
logger.warning( duration = float(segment_info["duration"])
f"Invalid or missing video stream in segment {cache_path}. Discarding." else:
) duration = -1
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.invalid.value,
)
self.drop_segment(cache_path)
return None
duration = float(segment_info.get("duration", -1))
# ensure duration is within expected length # ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION: if 0 < duration < MAX_SEGMENT_DURATION:
@ -327,18 +284,8 @@ class RecordingMaintainer(threading.Thread):
logger.warning(f"Failed to probe corrupt segment {cache_path}") logger.warning(f"Failed to probe corrupt segment {cache_path}")
logger.warning(f"Discarding a corrupt recording segment: {cache_path}") logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
self.recordings_publisher.publish( Path(cache_path).unlink(missing_ok=True)
(camera, start_time.timestamp(), cache_path), return
RecordingsDataTypeEnum.invalid.value,
)
self.drop_segment(cache_path)
return None
# this segment has a valid duration and has video data, so publish an update
self.recordings_publisher.publish(
(camera, start_time.timestamp(), cache_path),
RecordingsDataTypeEnum.valid.value,
)
record_config = self.config.cameras[camera].record record_config = self.config.cameras[camera].record
highest = None highest = None

View File

@ -603,87 +603,87 @@ def auto_detect_hwaccel() -> str:
async def get_video_properties( async def get_video_properties(
ffmpeg, url: str, get_duration: bool = False ffmpeg, url: str, get_duration: bool = False
) -> dict[str, Any]: ) -> dict[str, Any]:
async def probe_with_ffprobe( async def calculate_duration(video: Optional[Any]) -> float:
url: str, duration = None
) -> tuple[bool, int, int, Optional[str], float]:
"""Fallback using ffprobe: returns (valid, width, height, codec, duration)."""
cmd = [
ffmpeg.ffprobe_path,
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
url,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
if proc.returncode != 0:
return False, 0, 0, None, -1
data = json.loads(stdout.decode()) if video is not None:
video_streams = [ # Get the frames per second (fps) of the video stream
s for s in data.get("streams", []) if s.get("codec_type") == "video" fps = video.get(cv2.CAP_PROP_FPS)
] total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if not video_streams:
return False, 0, 0, None, -1
v = video_streams[0] if fps and total_frames:
width = int(v.get("width", 0))
height = int(v.get("height", 0))
codec = v.get("codec_name")
duration_str = data.get("format", {}).get("duration")
duration = float(duration_str) if duration_str else -1.0
return True, width, height, codec, duration
except (json.JSONDecodeError, ValueError, KeyError, asyncio.SubprocessError):
return False, 0, 0, None, -1
def probe_with_cv2(url: str) -> tuple[bool, int, int, Optional[str], float]:
"""Primary attempt using cv2: returns (valid, width, height, fourcc, duration)."""
cap = cv2.VideoCapture(url)
if not cap.isOpened():
cap.release()
return False, 0, 0, None, -1
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
valid = width > 0 and height > 0
fourcc = None
duration = -1.0
if valid:
fourcc_int = int(cap.get(cv2.CAP_PROP_FOURCC))
fourcc = fourcc_int.to_bytes(4, "little").decode("latin-1").strip()
if get_duration:
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps > 0 and total_frames > 0:
duration = total_frames / fps duration = total_frames / fps
cap.release() # if cv2 failed need to use ffprobe
return valid, width, height, fourcc, duration if duration is None:
p = await asyncio.create_subprocess_exec(
ffmpeg.ffprobe_path,
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
await p.wait()
# try cv2 first if p.returncode == 0:
has_video, width, height, fourcc, duration = probe_with_cv2(url) result = (await p.stdout.read()).decode()
else:
result = None
# fallback to ffprobe if needed if result:
if not has_video or (get_duration and duration < 0): try:
has_video, width, height, fourcc, duration = await probe_with_ffprobe(url) duration = float(result.strip())
except ValueError:
duration = -1
else:
duration = -1
return duration
width = height = 0
try:
# Open the video stream using OpenCV
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
result = {}
result: dict[str, Any] = {"has_valid_video": has_video}
if has_video:
result.update({"width": width, "height": height})
if fourcc:
result["fourcc"] = fourcc
if get_duration: if get_duration:
result["duration"] = duration result["duration"] = await calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Get the stream encoding
fourcc_int = int(video.get(cv2.CAP_PROP_FOURCC))
fourcc = (
chr((fourcc_int >> 0) & 255)
+ chr((fourcc_int >> 8) & 255)
+ chr((fourcc_int >> 16) & 255)
+ chr((fourcc_int >> 24) & 255)
)
# Release the video stream
video.release()
result["width"] = round(width)
result["height"] = round(height)
result["fourcc"] = fourcc
return result return result

View File

@ -1,9 +1,10 @@
import datetime
import logging import logging
import os
import queue import queue
import subprocess as sp import subprocess as sp
import threading import threading
import time import time
from datetime import datetime, timedelta, timezone
from multiprocessing import Queue, Value from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Any from typing import Any
@ -12,10 +13,6 @@ import cv2
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config import CameraConfig, DetectConfig, ModelConfig
from frigate.config.camera.camera import CameraTypeEnum from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.updater import ( from frigate.config.camera.updater import (
@ -23,6 +20,8 @@ from frigate.config.camera.updater import (
CameraConfigUpdateSubscriber, CameraConfigUpdateSubscriber,
) )
from frigate.const import ( from frigate.const import (
CACHE_DIR,
CACHE_SEGMENT_FORMAT,
PROCESS_PRIORITY_HIGH, PROCESS_PRIORITY_HIGH,
REQUEST_REGION_GRID, REQUEST_REGION_GRID,
) )
@ -130,7 +129,7 @@ def capture_frames(
fps.value = frame_rate.eps() fps.value = frame_rate.eps()
skipped_fps.value = skipped_eps.eps() skipped_fps.value = skipped_eps.eps()
current_frame.value = datetime.now().timestamp() current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{config.name}_frame{frame_index}" frame_name = f"{config.name}_frame{frame_index}"
frame_buffer = frame_manager.write(frame_name) frame_buffer = frame_manager.write(frame_name)
try: try:
@ -200,11 +199,6 @@ class CameraWatchdog(threading.Thread):
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.was_enabled = self.config.enabled self.was_enabled = self.config.enabled
self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all)
self.latest_valid_segment_time: float = 0
self.latest_invalid_segment_time: float = 0
self.latest_cache_segment_time: float = 0
def _update_enabled_state(self) -> bool: def _update_enabled_state(self) -> bool:
"""Fetch the latest config and update enabled state.""" """Fetch the latest config and update enabled state."""
self.config_subscriber.check_for_updates() self.config_subscriber.check_for_updates()
@ -249,11 +243,6 @@ class CameraWatchdog(threading.Thread):
if enabled: if enabled:
self.logger.debug(f"Enabling camera {self.config.name}") self.logger.debug(f"Enabling camera {self.config.name}")
self.start_all_ffmpeg() self.start_all_ffmpeg()
# reset all timestamps
self.latest_valid_segment_time = 0
self.latest_invalid_segment_time = 0
self.latest_cache_segment_time = 0
else: else:
self.logger.debug(f"Disabling camera {self.config.name}") self.logger.debug(f"Disabling camera {self.config.name}")
self.stop_all_ffmpeg() self.stop_all_ffmpeg()
@ -271,37 +260,7 @@ class CameraWatchdog(threading.Thread):
if not enabled: if not enabled:
continue continue
while True: now = datetime.datetime.now().timestamp()
update = self.segment_subscriber.check_for_update(timeout=0)
if update == (None, None):
break
raw_topic, payload = update
if raw_topic and payload:
topic = str(raw_topic)
camera, segment_time, _ = payload
if camera != self.config.name:
continue
if topic.endswith(RecordingsDataTypeEnum.valid.value):
self.logger.debug(
f"Latest valid recording segment time on {camera}: {segment_time}"
)
self.latest_valid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.invalid.value):
self.logger.warning(
f"Invalid recording segment detected for {camera} at {segment_time}"
)
self.latest_invalid_segment_time = segment_time
elif topic.endswith(RecordingsDataTypeEnum.latest.value):
if segment_time is not None:
self.latest_cache_segment_time = segment_time
else:
self.latest_cache_segment_time = 0
now = datetime.now().timestamp()
if not self.capture_thread.is_alive(): if not self.capture_thread.is_alive():
self.requestor.send_data(f"{self.config.name}/status/detect", "offline") self.requestor.send_data(f"{self.config.name}/status/detect", "offline")
@ -339,55 +298,18 @@ class CameraWatchdog(threading.Thread):
poll = p["process"].poll() poll = p["process"].poll()
if self.config.record.enabled and "record" in p["roles"]: if self.config.record.enabled and "record" in p["roles"]:
now_utc = datetime.now().astimezone(timezone.utc) latest_segment_time = self.get_latest_segment_datetime(
p.get(
latest_cache_dt = ( "latest_segment_time",
datetime.fromtimestamp( datetime.datetime.now().astimezone(datetime.timezone.utc),
self.latest_cache_segment_time, tz=timezone.utc
) )
if self.latest_cache_segment_time > 0
else now_utc - timedelta(seconds=1)
)
latest_valid_dt = (
datetime.fromtimestamp(
self.latest_valid_segment_time, tz=timezone.utc
)
if self.latest_valid_segment_time > 0
else now_utc - timedelta(seconds=1)
)
latest_invalid_dt = (
datetime.fromtimestamp(
self.latest_invalid_segment_time, tz=timezone.utc
)
if self.latest_invalid_segment_time > 0
else now_utc - timedelta(seconds=1)
)
# ensure segments are still being created and that they have valid video data
cache_stale = now_utc > (latest_cache_dt + timedelta(seconds=120))
valid_stale = now_utc > (latest_valid_dt + timedelta(seconds=120))
invalid_stale_condition = (
self.latest_invalid_segment_time > 0
and now_utc > (latest_invalid_dt + timedelta(seconds=120))
and self.latest_valid_segment_time
<= self.latest_invalid_segment_time
)
invalid_stale = invalid_stale_condition
if cache_stale or valid_stale or invalid_stale:
if cache_stale:
reason = "No new recording segments were created"
elif valid_stale:
reason = "No new valid recording segments were created"
else: # invalid_stale
reason = (
"No valid segments created since last invalid segment"
) )
if datetime.datetime.now().astimezone(datetime.timezone.utc) > (
latest_segment_time + datetime.timedelta(seconds=120)
):
self.logger.error( self.logger.error(
f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..." f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..."
) )
p["process"] = start_or_restart_ffmpeg( p["process"] = start_or_restart_ffmpeg(
p["cmd"], p["cmd"],
@ -406,7 +328,7 @@ class CameraWatchdog(threading.Thread):
self.requestor.send_data( self.requestor.send_data(
f"{self.config.name}/status/record", "online" f"{self.config.name}/status/record", "online"
) )
p["latest_segment_time"] = self.latest_cache_segment_time p["latest_segment_time"] = latest_segment_time
if poll is None: if poll is None:
continue continue
@ -424,7 +346,6 @@ class CameraWatchdog(threading.Thread):
self.stop_all_ffmpeg() self.stop_all_ffmpeg()
self.logpipe.close() self.logpipe.close()
self.config_subscriber.stop() self.config_subscriber.stop()
self.segment_subscriber.stop()
def start_ffmpeg_detect(self): def start_ffmpeg_detect(self):
ffmpeg_cmd = [ ffmpeg_cmd = [
@ -484,6 +405,33 @@ class CameraWatchdog(threading.Thread):
p["logpipe"].close() p["logpipe"].close()
self.ffmpeg_other_processes.clear() self.ffmpeg_other_processes.clear()
def get_latest_segment_datetime(
self, latest_segment: datetime.datetime
) -> datetime.datetime:
"""Checks if ffmpeg is still writing recording segments to cache."""
cache_files = sorted(
[
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
and not d.startswith("preview_")
]
)
newest_segment_time = latest_segment
for file in cache_files:
if self.config.name in file:
basename = os.path.splitext(file)[0]
_, date = basename.rsplit("@", maxsplit=1)
segment_time = datetime.datetime.strptime(
date, CACHE_SEGMENT_FORMAT
).astimezone(datetime.timezone.utc)
if segment_time > newest_segment_time:
newest_segment_time = segment_time
return newest_segment_time
class CameraCaptureRunner(threading.Thread): class CameraCaptureRunner(threading.Thread):
def __init__( def __init__(
@ -779,7 +727,10 @@ def process_frames(
time.sleep(0.1) time.sleep(0.1)
continue continue
if datetime.now().astimezone(timezone.utc) > next_region_update: if (
datetime.datetime.now().astimezone(datetime.timezone.utc)
> next_region_update
):
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name) region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
next_region_update = get_tomorrow_at_time(2) next_region_update = get_tomorrow_at_time(2)