mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-01 00:35:25 +03:00
Compare commits
2 Commits
c207009d8a
...
12f8c3feac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12f8c3feac | ||
|
|
b6552987b0 |
@ -822,9 +822,9 @@ async def vod_ts(camera_name: str, start_ts: float, end_ts: float):
|
|||||||
dependencies=[Depends(require_camera_access)],
|
dependencies=[Depends(require_camera_access)],
|
||||||
description="Returns an HLS playlist for the specified date-time on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
|
description="Returns an HLS playlist for the specified date-time on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
|
||||||
)
|
)
|
||||||
def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str):
|
async def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str):
|
||||||
"""VOD for specific hour. Uses the default timezone (UTC)."""
|
"""VOD for specific hour. Uses the default timezone (UTC)."""
|
||||||
return vod_hour(
|
return await vod_hour(
|
||||||
year_month, day, hour, camera_name, get_localzone_name().replace("/", ",")
|
year_month, day, hour, camera_name, get_localzone_name().replace("/", ",")
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -834,7 +834,9 @@ def vod_hour_no_timezone(year_month: str, day: int, hour: int, camera_name: str)
|
|||||||
dependencies=[Depends(require_camera_access)],
|
dependencies=[Depends(require_camera_access)],
|
||||||
description="Returns an HLS playlist for the specified date-time (with timezone) on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
|
description="Returns an HLS playlist for the specified date-time (with timezone) on the specified camera. Append /master.m3u8 or /index.m3u8 for HLS playback.",
|
||||||
)
|
)
|
||||||
def vod_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: str):
|
async def vod_hour(
|
||||||
|
year_month: str, day: int, hour: int, camera_name: str, tz_name: str
|
||||||
|
):
|
||||||
parts = year_month.split("-")
|
parts = year_month.split("-")
|
||||||
start_date = (
|
start_date = (
|
||||||
datetime(int(parts[0]), int(parts[1]), day, hour, tzinfo=timezone.utc)
|
datetime(int(parts[0]), int(parts[1]), day, hour, tzinfo=timezone.utc)
|
||||||
@ -844,7 +846,7 @@ def vod_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: st
|
|||||||
start_ts = start_date.timestamp()
|
start_ts = start_date.timestamp()
|
||||||
end_ts = end_date.timestamp()
|
end_ts = end_date.timestamp()
|
||||||
|
|
||||||
return vod_ts(camera_name, start_ts, end_ts)
|
return await vod_ts(camera_name, start_ts, end_ts)
|
||||||
|
|
||||||
|
|
||||||
@router.get(
|
@router.get(
|
||||||
@ -875,7 +877,7 @@ async def vod_event(
|
|||||||
if event.end_time is None
|
if event.end_time is None
|
||||||
else (event.end_time + padding)
|
else (event.end_time + padding)
|
||||||
)
|
)
|
||||||
vod_response = vod_ts(event.camera, event.start_time - padding, end_ts)
|
vod_response = await vod_ts(event.camera, event.start_time - padding, end_ts)
|
||||||
|
|
||||||
# If the recordings are not found and the event started more than 5 minutes ago, set has_clip to false
|
# If the recordings are not found and the event started more than 5 minutes ago, set has_clip to false
|
||||||
if (
|
if (
|
||||||
@ -1248,7 +1250,7 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
|
|||||||
|
|
||||||
|
|
||||||
@router.get("/events/{event_id}/clip.mp4")
|
@router.get("/events/{event_id}/clip.mp4")
|
||||||
def event_clip(
|
async def event_clip(
|
||||||
request: Request,
|
request: Request,
|
||||||
event_id: str,
|
event_id: str,
|
||||||
padding: int = Query(0, description="Padding to apply to clip."),
|
padding: int = Query(0, description="Padding to apply to clip."),
|
||||||
@ -1270,7 +1272,9 @@ def event_clip(
|
|||||||
if event.end_time is None
|
if event.end_time is None
|
||||||
else event.end_time + padding
|
else event.end_time + padding
|
||||||
)
|
)
|
||||||
return recording_clip(request, event.camera, event.start_time - padding, end_ts)
|
return await recording_clip(
|
||||||
|
request, event.camera, event.start_time - padding, end_ts
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/events/{event_id}/preview.gif")
|
@router.get("/events/{event_id}/preview.gif")
|
||||||
@ -1698,7 +1702,7 @@ def preview_thumbnail(file_name: str):
|
|||||||
"/{camera_name}/{label}/thumbnail.jpg",
|
"/{camera_name}/{label}/thumbnail.jpg",
|
||||||
dependencies=[Depends(require_camera_access)],
|
dependencies=[Depends(require_camera_access)],
|
||||||
)
|
)
|
||||||
def label_thumbnail(request: Request, camera_name: str, label: str):
|
async def label_thumbnail(request: Request, camera_name: str, label: str):
|
||||||
label = unquote(label)
|
label = unquote(label)
|
||||||
event_query = Event.select(fn.MAX(Event.id)).where(Event.camera == camera_name)
|
event_query = Event.select(fn.MAX(Event.id)).where(Event.camera == camera_name)
|
||||||
if label != "any":
|
if label != "any":
|
||||||
@ -1707,7 +1711,7 @@ def label_thumbnail(request: Request, camera_name: str, label: str):
|
|||||||
try:
|
try:
|
||||||
event_id = event_query.scalar()
|
event_id = event_query.scalar()
|
||||||
|
|
||||||
return event_thumbnail(request, event_id, Extension.jpg, 60)
|
return await event_thumbnail(request, event_id, Extension.jpg, 60)
|
||||||
except DoesNotExist:
|
except DoesNotExist:
|
||||||
frame = np.zeros((175, 175, 3), np.uint8)
|
frame = np.zeros((175, 175, 3), np.uint8)
|
||||||
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
|
ret, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
|
||||||
@ -1722,7 +1726,7 @@ def label_thumbnail(request: Request, camera_name: str, label: str):
|
|||||||
@router.get(
|
@router.get(
|
||||||
"/{camera_name}/{label}/clip.mp4", dependencies=[Depends(require_camera_access)]
|
"/{camera_name}/{label}/clip.mp4", dependencies=[Depends(require_camera_access)]
|
||||||
)
|
)
|
||||||
def label_clip(request: Request, camera_name: str, label: str):
|
async def label_clip(request: Request, camera_name: str, label: str):
|
||||||
label = unquote(label)
|
label = unquote(label)
|
||||||
event_query = Event.select(fn.MAX(Event.id)).where(
|
event_query = Event.select(fn.MAX(Event.id)).where(
|
||||||
Event.camera == camera_name, Event.has_clip == True
|
Event.camera == camera_name, Event.has_clip == True
|
||||||
@ -1733,7 +1737,7 @@ def label_clip(request: Request, camera_name: str, label: str):
|
|||||||
try:
|
try:
|
||||||
event = event_query.get()
|
event = event_query.get()
|
||||||
|
|
||||||
return event_clip(request, event.id)
|
return await event_clip(request, event.id)
|
||||||
except DoesNotExist:
|
except DoesNotExist:
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
content={"success": False, "message": "Event not found"}, status_code=404
|
content={"success": False, "message": "Event not found"}, status_code=404
|
||||||
@ -1743,7 +1747,7 @@ def label_clip(request: Request, camera_name: str, label: str):
|
|||||||
@router.get(
|
@router.get(
|
||||||
"/{camera_name}/{label}/snapshot.jpg", dependencies=[Depends(require_camera_access)]
|
"/{camera_name}/{label}/snapshot.jpg", dependencies=[Depends(require_camera_access)]
|
||||||
)
|
)
|
||||||
def label_snapshot(request: Request, camera_name: str, label: str):
|
async def label_snapshot(request: Request, camera_name: str, label: str):
|
||||||
"""Returns the snapshot image from the latest event for the given camera and label combo"""
|
"""Returns the snapshot image from the latest event for the given camera and label combo"""
|
||||||
label = unquote(label)
|
label = unquote(label)
|
||||||
if label == "any":
|
if label == "any":
|
||||||
@ -1764,7 +1768,7 @@ def label_snapshot(request: Request, camera_name: str, label: str):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
event: Event = event_query.get()
|
event: Event = event_query.get()
|
||||||
return event_snapshot(request, event.id, MediaEventsSnapshotQueryParams())
|
return await event_snapshot(request, event.id, MediaEventsSnapshotQueryParams())
|
||||||
except DoesNotExist:
|
except DoesNotExist:
|
||||||
frame = np.zeros((720, 1280, 3), np.uint8)
|
frame = np.zeros((720, 1280, 3), np.uint8)
|
||||||
_, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
|
_, jpg = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
|
||||||
|
|||||||
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from .zmq_proxy import Publisher, Subscriber
|
from .zmq_proxy import Publisher, Subscriber
|
||||||
|
|
||||||
@ -10,18 +11,21 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
class RecordingsDataTypeEnum(str, Enum):
|
class RecordingsDataTypeEnum(str, Enum):
|
||||||
all = ""
|
all = ""
|
||||||
recordings_available_through = "recordings_available_through"
|
saved = "saved" # segment has been saved to db
|
||||||
|
latest = "latest" # segment is in cache
|
||||||
|
valid = "valid" # segment is valid
|
||||||
|
invalid = "invalid" # segment is invalid
|
||||||
|
|
||||||
|
|
||||||
class RecordingsDataPublisher(Publisher[tuple[str, float]]):
|
class RecordingsDataPublisher(Publisher[Any]):
|
||||||
"""Publishes latest recording data."""
|
"""Publishes latest recording data."""
|
||||||
|
|
||||||
topic_base = "recordings/"
|
topic_base = "recordings/"
|
||||||
|
|
||||||
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
|
def __init__(self) -> None:
|
||||||
super().__init__(topic.value)
|
super().__init__()
|
||||||
|
|
||||||
def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
|
def publish(self, payload: Any, sub_topic: str = "") -> None:
|
||||||
super().publish(payload, sub_topic)
|
super().publish(payload, sub_topic)
|
||||||
|
|
||||||
|
|
||||||
@ -32,3 +36,11 @@ class RecordingsDataSubscriber(Subscriber):
|
|||||||
|
|
||||||
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
|
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
|
||||||
super().__init__(topic.value)
|
super().__init__(topic.value)
|
||||||
|
|
||||||
|
def _return_object(
|
||||||
|
self, topic: str, payload: tuple | None
|
||||||
|
) -> tuple[str, Any] | tuple[None, None]:
|
||||||
|
if payload is None:
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
|
return (topic, payload)
|
||||||
|
|||||||
@ -2,10 +2,6 @@ import logging
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from synap import Network
|
|
||||||
from synap.postprocessor import Detector
|
|
||||||
from synap.preprocessor import Preprocessor
|
|
||||||
from synap.types import Layout, Shape
|
|
||||||
from typing_extensions import Literal
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from frigate.detectors.detection_api import DetectionApi
|
from frigate.detectors.detection_api import DetectionApi
|
||||||
@ -15,6 +11,16 @@ from frigate.detectors.detector_config import (
|
|||||||
ModelTypeEnum,
|
ModelTypeEnum,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from synap import Network
|
||||||
|
from synap.postprocessor import Detector
|
||||||
|
from synap.preprocessor import Preprocessor
|
||||||
|
from synap.types import Layout, Shape
|
||||||
|
|
||||||
|
SYNAP_SUPPORT = True
|
||||||
|
except ImportError:
|
||||||
|
SYNAP_SUPPORT = False
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
DETECTOR_KEY = "synaptics"
|
DETECTOR_KEY = "synaptics"
|
||||||
@ -28,15 +34,21 @@ class SynapDetector(DetectionApi):
|
|||||||
type_key = DETECTOR_KEY
|
type_key = DETECTOR_KEY
|
||||||
|
|
||||||
def __init__(self, detector_config: SynapDetectorConfig):
|
def __init__(self, detector_config: SynapDetectorConfig):
|
||||||
|
if not SYNAP_SUPPORT:
|
||||||
|
logger.error(
|
||||||
|
"Error importing Synaptics SDK modules. You must use the -synaptics Docker image variant for Synaptics detector support."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_, ext = os.path.splitext(detector_config.model.path)
|
_, ext = os.path.splitext(detector_config.model.path)
|
||||||
if ext and ext != ".synap":
|
if ext and ext != ".synap":
|
||||||
raise ValueError("Model path config for Synap1680 is wrong.")
|
raise ValueError("Model path config for Synap1680 is incorrect.")
|
||||||
|
|
||||||
synap_network = Network(detector_config.model.path)
|
synap_network = Network(detector_config.model.path)
|
||||||
logger.info(f"Synap NPU loaded model: {detector_config.model.path}")
|
logger.info(f"Synap NPU loaded model: {detector_config.model.path}")
|
||||||
except ValueError as ve:
|
except ValueError as ve:
|
||||||
logger.error(f"Config to Synap1680 was Failed: {ve}")
|
logger.error(f"Synap1680 setup has failed: {ve}")
|
||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to init Synap NPU: {e}")
|
logger.error(f"Failed to init Synap NPU: {e}")
|
||||||
|
|||||||
@ -144,7 +144,7 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
EventMetadataTypeEnum.regenerate_description
|
EventMetadataTypeEnum.regenerate_description
|
||||||
)
|
)
|
||||||
self.recordings_subscriber = RecordingsDataSubscriber(
|
self.recordings_subscriber = RecordingsDataSubscriber(
|
||||||
RecordingsDataTypeEnum.recordings_available_through
|
RecordingsDataTypeEnum.saved
|
||||||
)
|
)
|
||||||
self.review_subscriber = ReviewDataSubscriber("")
|
self.review_subscriber = ReviewDataSubscriber("")
|
||||||
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
|
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
|
||||||
@ -525,20 +525,28 @@ class EmbeddingMaintainer(threading.Thread):
|
|||||||
def _process_recordings_updates(self) -> None:
|
def _process_recordings_updates(self) -> None:
|
||||||
"""Process recordings updates."""
|
"""Process recordings updates."""
|
||||||
while True:
|
while True:
|
||||||
recordings_data = self.recordings_subscriber.check_for_update()
|
update = self.recordings_subscriber.check_for_update()
|
||||||
|
|
||||||
if recordings_data == None:
|
if not update:
|
||||||
break
|
break
|
||||||
|
|
||||||
camera, recordings_available_through_timestamp = recordings_data
|
(raw_topic, payload) = update
|
||||||
|
|
||||||
self.recordings_available_through[camera] = (
|
if not raw_topic or not payload:
|
||||||
recordings_available_through_timestamp
|
break
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug(
|
topic = str(raw_topic)
|
||||||
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
|
|
||||||
)
|
if topic.endswith(RecordingsDataTypeEnum.saved.value):
|
||||||
|
camera, recordings_available_through_timestamp, _ = payload
|
||||||
|
|
||||||
|
self.recordings_available_through[camera] = (
|
||||||
|
recordings_available_through_timestamp
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
|
||||||
|
)
|
||||||
|
|
||||||
def _process_review_updates(self) -> None:
|
def _process_review_updates(self) -> None:
|
||||||
"""Process review updates."""
|
"""Process review updates."""
|
||||||
|
|||||||
@ -80,9 +80,7 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
|
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.record],
|
||||||
)
|
)
|
||||||
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
|
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all.value)
|
||||||
self.recordings_publisher = RecordingsDataPublisher(
|
self.recordings_publisher = RecordingsDataPublisher()
|
||||||
RecordingsDataTypeEnum.recordings_available_through
|
|
||||||
)
|
|
||||||
|
|
||||||
self.stop_event = stop_event
|
self.stop_event = stop_event
|
||||||
self.object_recordings_info: dict[str, list] = defaultdict(list)
|
self.object_recordings_info: dict[str, list] = defaultdict(list)
|
||||||
@ -98,6 +96,41 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
and not d.startswith("preview_")
|
and not d.startswith("preview_")
|
||||||
]
|
]
|
||||||
|
|
||||||
|
# publish newest cached segment per camera (including in use files)
|
||||||
|
newest_cache_segments: dict[str, dict[str, Any]] = {}
|
||||||
|
for cache in cache_files:
|
||||||
|
cache_path = os.path.join(CACHE_DIR, cache)
|
||||||
|
basename = os.path.splitext(cache)[0]
|
||||||
|
camera, date = basename.rsplit("@", maxsplit=1)
|
||||||
|
start_time = datetime.datetime.strptime(
|
||||||
|
date, CACHE_SEGMENT_FORMAT
|
||||||
|
).astimezone(datetime.timezone.utc)
|
||||||
|
if (
|
||||||
|
camera not in newest_cache_segments
|
||||||
|
or start_time > newest_cache_segments[camera]["start_time"]
|
||||||
|
):
|
||||||
|
newest_cache_segments[camera] = {
|
||||||
|
"start_time": start_time,
|
||||||
|
"cache_path": cache_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
for camera, newest in newest_cache_segments.items():
|
||||||
|
self.recordings_publisher.publish(
|
||||||
|
(
|
||||||
|
camera,
|
||||||
|
newest["start_time"].timestamp(),
|
||||||
|
newest["cache_path"],
|
||||||
|
),
|
||||||
|
RecordingsDataTypeEnum.latest.value,
|
||||||
|
)
|
||||||
|
# publish None for cameras with no cache files (but only if we know the camera exists)
|
||||||
|
for camera_name in self.config.cameras:
|
||||||
|
if camera_name not in newest_cache_segments:
|
||||||
|
self.recordings_publisher.publish(
|
||||||
|
(camera_name, None, None),
|
||||||
|
RecordingsDataTypeEnum.latest.value,
|
||||||
|
)
|
||||||
|
|
||||||
files_in_use = []
|
files_in_use = []
|
||||||
for process in psutil.process_iter():
|
for process in psutil.process_iter():
|
||||||
try:
|
try:
|
||||||
@ -111,7 +144,7 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
except psutil.Error:
|
except psutil.Error:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# group recordings by camera
|
# group recordings by camera (skip in-use for validation/moving)
|
||||||
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
|
grouped_recordings: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
|
||||||
for cache in cache_files:
|
for cache in cache_files:
|
||||||
# Skip files currently in use
|
# Skip files currently in use
|
||||||
@ -233,7 +266,9 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
recordings[0]["start_time"].timestamp()
|
recordings[0]["start_time"].timestamp()
|
||||||
if self.config.cameras[camera].record.enabled
|
if self.config.cameras[camera].record.enabled
|
||||||
else None,
|
else None,
|
||||||
)
|
None,
|
||||||
|
),
|
||||||
|
RecordingsDataTypeEnum.saved.value,
|
||||||
)
|
)
|
||||||
|
|
||||||
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
|
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
|
||||||
@ -250,7 +285,7 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
|
|
||||||
async def validate_and_move_segment(
|
async def validate_and_move_segment(
|
||||||
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
|
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any]
|
||||||
) -> None:
|
) -> Optional[Recordings]:
|
||||||
cache_path: str = recording["cache_path"]
|
cache_path: str = recording["cache_path"]
|
||||||
start_time: datetime.datetime = recording["start_time"]
|
start_time: datetime.datetime = recording["start_time"]
|
||||||
record_config = self.config.cameras[camera].record
|
record_config = self.config.cameras[camera].record
|
||||||
@ -261,7 +296,7 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
or not self.config.cameras[camera].record.enabled
|
or not self.config.cameras[camera].record.enabled
|
||||||
):
|
):
|
||||||
self.drop_segment(cache_path)
|
self.drop_segment(cache_path)
|
||||||
return
|
return None
|
||||||
|
|
||||||
if cache_path in self.end_time_cache:
|
if cache_path in self.end_time_cache:
|
||||||
end_time, duration = self.end_time_cache[cache_path]
|
end_time, duration = self.end_time_cache[cache_path]
|
||||||
@ -270,10 +305,18 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
self.config.ffmpeg, cache_path, get_duration=True
|
self.config.ffmpeg, cache_path, get_duration=True
|
||||||
)
|
)
|
||||||
|
|
||||||
if segment_info["duration"]:
|
if not segment_info.get("has_valid_video", False):
|
||||||
duration = float(segment_info["duration"])
|
logger.warning(
|
||||||
else:
|
f"Invalid or missing video stream in segment {cache_path}. Discarding."
|
||||||
duration = -1
|
)
|
||||||
|
self.recordings_publisher.publish(
|
||||||
|
(camera, start_time.timestamp(), cache_path),
|
||||||
|
RecordingsDataTypeEnum.invalid.value,
|
||||||
|
)
|
||||||
|
self.drop_segment(cache_path)
|
||||||
|
return None
|
||||||
|
|
||||||
|
duration = float(segment_info.get("duration", -1))
|
||||||
|
|
||||||
# ensure duration is within expected length
|
# ensure duration is within expected length
|
||||||
if 0 < duration < MAX_SEGMENT_DURATION:
|
if 0 < duration < MAX_SEGMENT_DURATION:
|
||||||
@ -284,8 +327,18 @@ class RecordingMaintainer(threading.Thread):
|
|||||||
logger.warning(f"Failed to probe corrupt segment {cache_path}")
|
logger.warning(f"Failed to probe corrupt segment {cache_path}")
|
||||||
|
|
||||||
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
|
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
|
||||||
Path(cache_path).unlink(missing_ok=True)
|
self.recordings_publisher.publish(
|
||||||
return
|
(camera, start_time.timestamp(), cache_path),
|
||||||
|
RecordingsDataTypeEnum.invalid.value,
|
||||||
|
)
|
||||||
|
self.drop_segment(cache_path)
|
||||||
|
return None
|
||||||
|
|
||||||
|
# this segment has a valid duration and has video data, so publish an update
|
||||||
|
self.recordings_publisher.publish(
|
||||||
|
(camera, start_time.timestamp(), cache_path),
|
||||||
|
RecordingsDataTypeEnum.valid.value,
|
||||||
|
)
|
||||||
|
|
||||||
record_config = self.config.cameras[camera].record
|
record_config = self.config.cameras[camera].record
|
||||||
highest = None
|
highest = None
|
||||||
|
|||||||
@ -603,87 +603,87 @@ def auto_detect_hwaccel() -> str:
|
|||||||
async def get_video_properties(
|
async def get_video_properties(
|
||||||
ffmpeg, url: str, get_duration: bool = False
|
ffmpeg, url: str, get_duration: bool = False
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
async def calculate_duration(video: Optional[Any]) -> float:
|
async def probe_with_ffprobe(
|
||||||
duration = None
|
url: str,
|
||||||
|
) -> tuple[bool, int, int, Optional[str], float]:
|
||||||
if video is not None:
|
"""Fallback using ffprobe: returns (valid, width, height, codec, duration)."""
|
||||||
# Get the frames per second (fps) of the video stream
|
cmd = [
|
||||||
fps = video.get(cv2.CAP_PROP_FPS)
|
ffmpeg.ffprobe_path,
|
||||||
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
|
"-v",
|
||||||
|
"quiet",
|
||||||
if fps and total_frames:
|
"-print_format",
|
||||||
duration = total_frames / fps
|
"json",
|
||||||
|
"-show_format",
|
||||||
# if cv2 failed need to use ffprobe
|
"-show_streams",
|
||||||
if duration is None:
|
url,
|
||||||
p = await asyncio.create_subprocess_exec(
|
]
|
||||||
ffmpeg.ffprobe_path,
|
try:
|
||||||
"-v",
|
proc = await asyncio.create_subprocess_exec(
|
||||||
"error",
|
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
||||||
"-show_entries",
|
|
||||||
"format=duration",
|
|
||||||
"-of",
|
|
||||||
"default=noprint_wrappers=1:nokey=1",
|
|
||||||
f"{url}",
|
|
||||||
stdout=asyncio.subprocess.PIPE,
|
|
||||||
stderr=asyncio.subprocess.PIPE,
|
|
||||||
)
|
)
|
||||||
await p.wait()
|
stdout, _ = await proc.communicate()
|
||||||
|
if proc.returncode != 0:
|
||||||
|
return False, 0, 0, None, -1
|
||||||
|
|
||||||
if p.returncode == 0:
|
data = json.loads(stdout.decode())
|
||||||
result = (await p.stdout.read()).decode()
|
video_streams = [
|
||||||
else:
|
s for s in data.get("streams", []) if s.get("codec_type") == "video"
|
||||||
result = None
|
]
|
||||||
|
if not video_streams:
|
||||||
|
return False, 0, 0, None, -1
|
||||||
|
|
||||||
if result:
|
v = video_streams[0]
|
||||||
try:
|
width = int(v.get("width", 0))
|
||||||
duration = float(result.strip())
|
height = int(v.get("height", 0))
|
||||||
except ValueError:
|
codec = v.get("codec_name")
|
||||||
duration = -1
|
|
||||||
else:
|
|
||||||
duration = -1
|
|
||||||
|
|
||||||
return duration
|
duration_str = data.get("format", {}).get("duration")
|
||||||
|
duration = float(duration_str) if duration_str else -1.0
|
||||||
|
|
||||||
width = height = 0
|
return True, width, height, codec, duration
|
||||||
|
except (json.JSONDecodeError, ValueError, KeyError, asyncio.SubprocessError):
|
||||||
|
return False, 0, 0, None, -1
|
||||||
|
|
||||||
try:
|
def probe_with_cv2(url: str) -> tuple[bool, int, int, Optional[str], float]:
|
||||||
# Open the video stream using OpenCV
|
"""Primary attempt using cv2: returns (valid, width, height, fourcc, duration)."""
|
||||||
video = cv2.VideoCapture(url)
|
cap = cv2.VideoCapture(url)
|
||||||
|
if not cap.isOpened():
|
||||||
|
cap.release()
|
||||||
|
return False, 0, 0, None, -1
|
||||||
|
|
||||||
# Check if the video stream was opened successfully
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||||
if not video.isOpened():
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||||
video = None
|
valid = width > 0 and height > 0
|
||||||
except Exception:
|
fourcc = None
|
||||||
video = None
|
duration = -1.0
|
||||||
|
|
||||||
result = {}
|
if valid:
|
||||||
|
fourcc_int = int(cap.get(cv2.CAP_PROP_FOURCC))
|
||||||
|
fourcc = fourcc_int.to_bytes(4, "little").decode("latin-1").strip()
|
||||||
|
|
||||||
|
if get_duration:
|
||||||
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||||
|
if fps > 0 and total_frames > 0:
|
||||||
|
duration = total_frames / fps
|
||||||
|
|
||||||
|
cap.release()
|
||||||
|
return valid, width, height, fourcc, duration
|
||||||
|
|
||||||
|
# try cv2 first
|
||||||
|
has_video, width, height, fourcc, duration = probe_with_cv2(url)
|
||||||
|
|
||||||
|
# fallback to ffprobe if needed
|
||||||
|
if not has_video or (get_duration and duration < 0):
|
||||||
|
has_video, width, height, fourcc, duration = await probe_with_ffprobe(url)
|
||||||
|
|
||||||
|
result: dict[str, Any] = {"has_valid_video": has_video}
|
||||||
|
if has_video:
|
||||||
|
result.update({"width": width, "height": height})
|
||||||
|
if fourcc:
|
||||||
|
result["fourcc"] = fourcc
|
||||||
if get_duration:
|
if get_duration:
|
||||||
result["duration"] = await calculate_duration(video)
|
result["duration"] = duration
|
||||||
|
|
||||||
if video is not None:
|
|
||||||
# Get the width of frames in the video stream
|
|
||||||
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
|
|
||||||
|
|
||||||
# Get the height of frames in the video stream
|
|
||||||
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
|
|
||||||
|
|
||||||
# Get the stream encoding
|
|
||||||
fourcc_int = int(video.get(cv2.CAP_PROP_FOURCC))
|
|
||||||
fourcc = (
|
|
||||||
chr((fourcc_int >> 0) & 255)
|
|
||||||
+ chr((fourcc_int >> 8) & 255)
|
|
||||||
+ chr((fourcc_int >> 16) & 255)
|
|
||||||
+ chr((fourcc_int >> 24) & 255)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Release the video stream
|
|
||||||
video.release()
|
|
||||||
|
|
||||||
result["width"] = round(width)
|
|
||||||
result["height"] = round(height)
|
|
||||||
result["fourcc"] = fourcc
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|||||||
141
frigate/video.py
141
frigate/video.py
@ -1,10 +1,9 @@
|
|||||||
import datetime
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import queue
|
import queue
|
||||||
import subprocess as sp
|
import subprocess as sp
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
from multiprocessing import Queue, Value
|
from multiprocessing import Queue, Value
|
||||||
from multiprocessing.synchronize import Event as MpEvent
|
from multiprocessing.synchronize import Event as MpEvent
|
||||||
from typing import Any
|
from typing import Any
|
||||||
@ -13,6 +12,10 @@ import cv2
|
|||||||
|
|
||||||
from frigate.camera import CameraMetrics, PTZMetrics
|
from frigate.camera import CameraMetrics, PTZMetrics
|
||||||
from frigate.comms.inter_process import InterProcessRequestor
|
from frigate.comms.inter_process import InterProcessRequestor
|
||||||
|
from frigate.comms.recordings_updater import (
|
||||||
|
RecordingsDataSubscriber,
|
||||||
|
RecordingsDataTypeEnum,
|
||||||
|
)
|
||||||
from frigate.config import CameraConfig, DetectConfig, ModelConfig
|
from frigate.config import CameraConfig, DetectConfig, ModelConfig
|
||||||
from frigate.config.camera.camera import CameraTypeEnum
|
from frigate.config.camera.camera import CameraTypeEnum
|
||||||
from frigate.config.camera.updater import (
|
from frigate.config.camera.updater import (
|
||||||
@ -20,8 +23,6 @@ from frigate.config.camera.updater import (
|
|||||||
CameraConfigUpdateSubscriber,
|
CameraConfigUpdateSubscriber,
|
||||||
)
|
)
|
||||||
from frigate.const import (
|
from frigate.const import (
|
||||||
CACHE_DIR,
|
|
||||||
CACHE_SEGMENT_FORMAT,
|
|
||||||
PROCESS_PRIORITY_HIGH,
|
PROCESS_PRIORITY_HIGH,
|
||||||
REQUEST_REGION_GRID,
|
REQUEST_REGION_GRID,
|
||||||
)
|
)
|
||||||
@ -129,7 +130,7 @@ def capture_frames(
|
|||||||
|
|
||||||
fps.value = frame_rate.eps()
|
fps.value = frame_rate.eps()
|
||||||
skipped_fps.value = skipped_eps.eps()
|
skipped_fps.value = skipped_eps.eps()
|
||||||
current_frame.value = datetime.datetime.now().timestamp()
|
current_frame.value = datetime.now().timestamp()
|
||||||
frame_name = f"{config.name}_frame{frame_index}"
|
frame_name = f"{config.name}_frame{frame_index}"
|
||||||
frame_buffer = frame_manager.write(frame_name)
|
frame_buffer = frame_manager.write(frame_name)
|
||||||
try:
|
try:
|
||||||
@ -199,6 +200,11 @@ class CameraWatchdog(threading.Thread):
|
|||||||
self.requestor = InterProcessRequestor()
|
self.requestor = InterProcessRequestor()
|
||||||
self.was_enabled = self.config.enabled
|
self.was_enabled = self.config.enabled
|
||||||
|
|
||||||
|
self.segment_subscriber = RecordingsDataSubscriber(RecordingsDataTypeEnum.all)
|
||||||
|
self.latest_valid_segment_time: float = 0
|
||||||
|
self.latest_invalid_segment_time: float = 0
|
||||||
|
self.latest_cache_segment_time: float = 0
|
||||||
|
|
||||||
def _update_enabled_state(self) -> bool:
|
def _update_enabled_state(self) -> bool:
|
||||||
"""Fetch the latest config and update enabled state."""
|
"""Fetch the latest config and update enabled state."""
|
||||||
self.config_subscriber.check_for_updates()
|
self.config_subscriber.check_for_updates()
|
||||||
@ -243,6 +249,11 @@ class CameraWatchdog(threading.Thread):
|
|||||||
if enabled:
|
if enabled:
|
||||||
self.logger.debug(f"Enabling camera {self.config.name}")
|
self.logger.debug(f"Enabling camera {self.config.name}")
|
||||||
self.start_all_ffmpeg()
|
self.start_all_ffmpeg()
|
||||||
|
|
||||||
|
# reset all timestamps
|
||||||
|
self.latest_valid_segment_time = 0
|
||||||
|
self.latest_invalid_segment_time = 0
|
||||||
|
self.latest_cache_segment_time = 0
|
||||||
else:
|
else:
|
||||||
self.logger.debug(f"Disabling camera {self.config.name}")
|
self.logger.debug(f"Disabling camera {self.config.name}")
|
||||||
self.stop_all_ffmpeg()
|
self.stop_all_ffmpeg()
|
||||||
@ -260,7 +271,37 @@ class CameraWatchdog(threading.Thread):
|
|||||||
if not enabled:
|
if not enabled:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
now = datetime.datetime.now().timestamp()
|
while True:
|
||||||
|
update = self.segment_subscriber.check_for_update(timeout=0)
|
||||||
|
|
||||||
|
if update == (None, None):
|
||||||
|
break
|
||||||
|
|
||||||
|
raw_topic, payload = update
|
||||||
|
if raw_topic and payload:
|
||||||
|
topic = str(raw_topic)
|
||||||
|
camera, segment_time, _ = payload
|
||||||
|
|
||||||
|
if camera != self.config.name:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if topic.endswith(RecordingsDataTypeEnum.valid.value):
|
||||||
|
self.logger.debug(
|
||||||
|
f"Latest valid recording segment time on {camera}: {segment_time}"
|
||||||
|
)
|
||||||
|
self.latest_valid_segment_time = segment_time
|
||||||
|
elif topic.endswith(RecordingsDataTypeEnum.invalid.value):
|
||||||
|
self.logger.warning(
|
||||||
|
f"Invalid recording segment detected for {camera} at {segment_time}"
|
||||||
|
)
|
||||||
|
self.latest_invalid_segment_time = segment_time
|
||||||
|
elif topic.endswith(RecordingsDataTypeEnum.latest.value):
|
||||||
|
if segment_time is not None:
|
||||||
|
self.latest_cache_segment_time = segment_time
|
||||||
|
else:
|
||||||
|
self.latest_cache_segment_time = 0
|
||||||
|
|
||||||
|
now = datetime.now().timestamp()
|
||||||
|
|
||||||
if not self.capture_thread.is_alive():
|
if not self.capture_thread.is_alive():
|
||||||
self.requestor.send_data(f"{self.config.name}/status/detect", "offline")
|
self.requestor.send_data(f"{self.config.name}/status/detect", "offline")
|
||||||
@ -298,18 +339,55 @@ class CameraWatchdog(threading.Thread):
|
|||||||
poll = p["process"].poll()
|
poll = p["process"].poll()
|
||||||
|
|
||||||
if self.config.record.enabled and "record" in p["roles"]:
|
if self.config.record.enabled and "record" in p["roles"]:
|
||||||
latest_segment_time = self.get_latest_segment_datetime(
|
now_utc = datetime.now().astimezone(timezone.utc)
|
||||||
p.get(
|
|
||||||
"latest_segment_time",
|
latest_cache_dt = (
|
||||||
datetime.datetime.now().astimezone(datetime.timezone.utc),
|
datetime.fromtimestamp(
|
||||||
|
self.latest_cache_segment_time, tz=timezone.utc
|
||||||
)
|
)
|
||||||
|
if self.latest_cache_segment_time > 0
|
||||||
|
else now_utc - timedelta(seconds=1)
|
||||||
)
|
)
|
||||||
|
|
||||||
if datetime.datetime.now().astimezone(datetime.timezone.utc) > (
|
latest_valid_dt = (
|
||||||
latest_segment_time + datetime.timedelta(seconds=120)
|
datetime.fromtimestamp(
|
||||||
):
|
self.latest_valid_segment_time, tz=timezone.utc
|
||||||
|
)
|
||||||
|
if self.latest_valid_segment_time > 0
|
||||||
|
else now_utc - timedelta(seconds=1)
|
||||||
|
)
|
||||||
|
|
||||||
|
latest_invalid_dt = (
|
||||||
|
datetime.fromtimestamp(
|
||||||
|
self.latest_invalid_segment_time, tz=timezone.utc
|
||||||
|
)
|
||||||
|
if self.latest_invalid_segment_time > 0
|
||||||
|
else now_utc - timedelta(seconds=1)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ensure segments are still being created and that they have valid video data
|
||||||
|
cache_stale = now_utc > (latest_cache_dt + timedelta(seconds=120))
|
||||||
|
valid_stale = now_utc > (latest_valid_dt + timedelta(seconds=120))
|
||||||
|
invalid_stale_condition = (
|
||||||
|
self.latest_invalid_segment_time > 0
|
||||||
|
and now_utc > (latest_invalid_dt + timedelta(seconds=120))
|
||||||
|
and self.latest_valid_segment_time
|
||||||
|
<= self.latest_invalid_segment_time
|
||||||
|
)
|
||||||
|
invalid_stale = invalid_stale_condition
|
||||||
|
|
||||||
|
if cache_stale or valid_stale or invalid_stale:
|
||||||
|
if cache_stale:
|
||||||
|
reason = "No new recording segments were created"
|
||||||
|
elif valid_stale:
|
||||||
|
reason = "No new valid recording segments were created"
|
||||||
|
else: # invalid_stale
|
||||||
|
reason = (
|
||||||
|
"No valid segments created since last invalid segment"
|
||||||
|
)
|
||||||
|
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..."
|
f"{reason} for {self.config.name} in the last 120s. Restarting the ffmpeg record process..."
|
||||||
)
|
)
|
||||||
p["process"] = start_or_restart_ffmpeg(
|
p["process"] = start_or_restart_ffmpeg(
|
||||||
p["cmd"],
|
p["cmd"],
|
||||||
@ -328,7 +406,7 @@ class CameraWatchdog(threading.Thread):
|
|||||||
self.requestor.send_data(
|
self.requestor.send_data(
|
||||||
f"{self.config.name}/status/record", "online"
|
f"{self.config.name}/status/record", "online"
|
||||||
)
|
)
|
||||||
p["latest_segment_time"] = latest_segment_time
|
p["latest_segment_time"] = self.latest_cache_segment_time
|
||||||
|
|
||||||
if poll is None:
|
if poll is None:
|
||||||
continue
|
continue
|
||||||
@ -346,6 +424,7 @@ class CameraWatchdog(threading.Thread):
|
|||||||
self.stop_all_ffmpeg()
|
self.stop_all_ffmpeg()
|
||||||
self.logpipe.close()
|
self.logpipe.close()
|
||||||
self.config_subscriber.stop()
|
self.config_subscriber.stop()
|
||||||
|
self.segment_subscriber.stop()
|
||||||
|
|
||||||
def start_ffmpeg_detect(self):
|
def start_ffmpeg_detect(self):
|
||||||
ffmpeg_cmd = [
|
ffmpeg_cmd = [
|
||||||
@ -405,33 +484,6 @@ class CameraWatchdog(threading.Thread):
|
|||||||
p["logpipe"].close()
|
p["logpipe"].close()
|
||||||
self.ffmpeg_other_processes.clear()
|
self.ffmpeg_other_processes.clear()
|
||||||
|
|
||||||
def get_latest_segment_datetime(
|
|
||||||
self, latest_segment: datetime.datetime
|
|
||||||
) -> datetime.datetime:
|
|
||||||
"""Checks if ffmpeg is still writing recording segments to cache."""
|
|
||||||
cache_files = sorted(
|
|
||||||
[
|
|
||||||
d
|
|
||||||
for d in os.listdir(CACHE_DIR)
|
|
||||||
if os.path.isfile(os.path.join(CACHE_DIR, d))
|
|
||||||
and d.endswith(".mp4")
|
|
||||||
and not d.startswith("preview_")
|
|
||||||
]
|
|
||||||
)
|
|
||||||
newest_segment_time = latest_segment
|
|
||||||
|
|
||||||
for file in cache_files:
|
|
||||||
if self.config.name in file:
|
|
||||||
basename = os.path.splitext(file)[0]
|
|
||||||
_, date = basename.rsplit("@", maxsplit=1)
|
|
||||||
segment_time = datetime.datetime.strptime(
|
|
||||||
date, CACHE_SEGMENT_FORMAT
|
|
||||||
).astimezone(datetime.timezone.utc)
|
|
||||||
if segment_time > newest_segment_time:
|
|
||||||
newest_segment_time = segment_time
|
|
||||||
|
|
||||||
return newest_segment_time
|
|
||||||
|
|
||||||
|
|
||||||
class CameraCaptureRunner(threading.Thread):
|
class CameraCaptureRunner(threading.Thread):
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -727,10 +779,7 @@ def process_frames(
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if (
|
if datetime.now().astimezone(timezone.utc) > next_region_update:
|
||||||
datetime.datetime.now().astimezone(datetime.timezone.utc)
|
|
||||||
> next_region_update
|
|
||||||
):
|
|
||||||
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
|
region_grid = requestor.send_data(REQUEST_REGION_GRID, camera_config.name)
|
||||||
next_region_update = get_tomorrow_at_time(2)
|
next_region_update = get_tomorrow_at_time(2)
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user