Organize utils

This commit is contained in:
Nick Mowen 2023-07-04 10:53:16 -06:00
parent 6d192e4274
commit 8e224579d7
22 changed files with 650 additions and 612 deletions

View File

@ -7,7 +7,7 @@ from typing import Any, Callable
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.ptz import OnvifCommandEnum, OnvifController from frigate.ptz import OnvifCommandEnum, OnvifController
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util import restart_frigate from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -22,13 +22,13 @@ from frigate.ffmpeg_presets import (
parse_preset_output_rtmp, parse_preset_output_rtmp,
) )
from frigate.plus import PlusApi from frigate.plus import PlusApi
from frigate.util import ( from frigate.util.builtin import (
create_mask,
deep_merge, deep_merge,
escape_special_characters, escape_special_characters,
get_ffmpeg_arg_list, get_ffmpeg_arg_list,
load_config_with_no_duplicates, load_config_with_no_duplicates,
) )
from frigate.util.image import create_mask
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -11,7 +11,7 @@ from pydantic import BaseModel, Extra, Field
from pydantic.fields import PrivateAttr from pydantic.fields import PrivateAttr
from frigate.plus import PlusApi from frigate.plus import PlusApi
from frigate.util import load_labels from frigate.util.builtin import load_labels
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -26,7 +26,8 @@ from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe from frigate.log import LogPipe
from frigate.object_detection import load_labels from frigate.object_detection import load_labels
from frigate.types import FeatureMetricsTypes from frigate.types import FeatureMetricsTypes
from frigate.util import get_ffmpeg_arg_list, listen from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.util.services import listen
from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg from frigate.video import start_or_restart_ffmpeg, stop_ffmpeg
try: try:

View File

@ -14,7 +14,7 @@ from faster_fifo import Queue
from frigate.config import CameraConfig, FrigateConfig from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum from frigate.events.maintainer import EventTypeEnum
from frigate.util import draw_box_with_label from frigate.util.image import draw_box_with_label
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -11,7 +11,7 @@ from faster_fifo import Queue
from frigate.config import EventsConfig, FrigateConfig from frigate.config import EventsConfig, FrigateConfig
from frigate.models import Event from frigate.models import Event
from frigate.types import CameraMetricsTypes from frigate.types import CameraMetricsTypes
from frigate.util import to_relative_box from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -6,7 +6,7 @@ from enum import Enum
from typing import Any from typing import Any
from frigate.const import BTBN_PATH from frigate.const import BTBN_PATH
from frigate.util import vainfo_hwaccel from frigate.util.services import vainfo_hwaccel
from frigate.version import VERSION from frigate.version import VERSION
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -38,13 +38,8 @@ from frigate.ptz import OnvifController
from frigate.record.export import PlaybackFactorEnum, RecordingExporter from frigate.record.export import PlaybackFactorEnum, RecordingExporter
from frigate.stats import stats_snapshot from frigate.stats import stats_snapshot
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.util import ( from frigate.util.builtin import clean_camera_user_pass, get_tz_modifiers
clean_camera_user_pass, from frigate.util.services import ffprobe_stream, restart_frigate, vainfo_hwaccel
ffprobe_stream,
get_tz_modifiers,
restart_frigate,
vainfo_hwaccel,
)
from frigate.version import VERSION from frigate.version import VERSION
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -13,7 +13,7 @@ from typing import Deque, Optional
from faster_fifo import Queue from faster_fifo import Queue
from setproctitle import setproctitle from setproctitle import setproctitle
from frigate.util import clean_camera_user_pass from frigate.util.builtin import clean_camera_user_pass
def listener_configurer() -> None: def listener_configurer() -> None:

View File

@ -12,7 +12,9 @@ from setproctitle import setproctitle
from frigate.detectors import create_detector from frigate.detectors import create_detector
from frigate.detectors.detector_config import InputTensorEnum from frigate.detectors.detector_config import InputTensorEnum
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels from frigate.util.builtin import EventsPerSecond, load_labels
from frigate.util.image import SharedMemoryFrameManager
from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -22,7 +22,7 @@ from frigate.config import (
) )
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.events.maintainer import EventTypeEnum from frigate.events.maintainer import EventTypeEnum
from frigate.util import ( from frigate.util.image import (
SharedMemoryFrameManager, SharedMemoryFrameManager,
area, area,
calculate_region, calculate_region,

View File

@ -24,7 +24,11 @@ from ws4py.websocket import WebSocket
from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import BASE_DIR, BIRDSEYE_PIPE from frigate.const import BASE_DIR, BIRDSEYE_PIPE
from frigate.util import SharedMemoryFrameManager, copy_yuv_to_position, get_yuv_crop from frigate.util.image import (
SharedMemoryFrameManager,
copy_yuv_to_position,
get_yuv_crop,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -385,9 +389,20 @@ class BirdsEyeFrameManager:
def get_standard_aspect_ratio(camera_width, camera_height) -> tuple[int, int]: def get_standard_aspect_ratio(camera_width, camera_height) -> tuple[int, int]:
"""Ensure that only standard aspect ratios are used.""" """Ensure that only standard aspect ratios are used."""
known_aspects = [(16, 9), (9, 16), (32, 9), (12, 9), (9, 12)] # aspects are scaled to be same ratio known_aspects = [
known_aspects_ratios = list(map(lambda aspect: aspect[0] / aspect[1], known_aspects)) (16, 9),
closest = min(known_aspects_ratios, key=lambda x:abs(x-(camera_width / camera_height))) (9, 16),
(32, 9),
(12, 9),
(9, 12),
] # aspects are scaled to be same ratio
known_aspects_ratios = list(
map(lambda aspect: aspect[0] / aspect[1], known_aspects)
)
closest = min(
known_aspects_ratios,
key=lambda x: abs(x - (camera_width / camera_height)),
)
return known_aspects[known_aspects_ratios.index(closest)] return known_aspects[known_aspects_ratios.index(closest)]
def map_layout(row_height: int): def map_layout(row_height: int):
@ -447,7 +462,9 @@ class BirdsEyeFrameManager:
for camera in cameras_to_add: for camera in cameras_to_add:
camera_dims = self.cameras[camera]["dimensions"].copy() camera_dims = self.cameras[camera]["dimensions"].copy()
camera_gcd = math.gcd(camera_dims[0], camera_dims[1]) camera_gcd = math.gcd(camera_dims[0], camera_dims[1])
camera_aspect_x, camera_aspect_y = get_standard_aspect_ratio(camera_dims[0] / camera_gcd, camera_dims[1] / camera_gcd) camera_aspect_x, camera_aspect_y = get_standard_aspect_ratio(
camera_dims[0] / camera_gcd, camera_dims[1] / camera_gcd
)
if camera_dims[1] > camera_dims[0]: if camera_dims[1] > camera_dims[0]:
portrait = True portrait = True

View File

@ -21,7 +21,8 @@ from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.types import FeatureMetricsTypes from frigate.types import FeatureMetricsTypes
from frigate.util import area, get_video_properties from frigate.util.image import area
from frigate.util.services import get_video_properties
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -15,7 +15,7 @@ from frigate.models import Event, Recordings, RecordingsToDelete, Timeline
from frigate.record.cleanup import RecordingCleanup from frigate.record.cleanup import RecordingCleanup
from frigate.record.maintainer import RecordingMaintainer from frigate.record.maintainer import RecordingMaintainer
from frigate.types import FeatureMetricsTypes from frigate.types import FeatureMetricsTypes
from frigate.util import listen from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -17,7 +17,7 @@ from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, DRIVER_AMD, DRIVER_ENV_VAR, RECORD_DIR from frigate.const import CACHE_DIR, CLIPS_DIR, DRIVER_AMD, DRIVER_ENV_VAR, RECORD_DIR
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
from frigate.types import CameraMetricsTypes, StatsTrackingTypes from frigate.types import CameraMetricsTypes, StatsTrackingTypes
from frigate.util import ( from frigate.util.services import (
get_amd_gpu_stats, get_amd_gpu_stats,
get_bandwidth_stats, get_bandwidth_stats,
get_cpu_stats, get_cpu_stats,

View File

@ -10,7 +10,7 @@ from faster_fifo import Queue
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.events.maintainer import EventTypeEnum from frigate.events.maintainer import EventTypeEnum
from frigate.models import Timeline from frigate.models import Timeline
from frigate.util import to_relative_box from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -7,7 +7,7 @@ from norfair.drawing.drawer import Drawer
from frigate.config import DetectConfig from frigate.config import DetectConfig
from frigate.track import ObjectTracker from frigate.track import ObjectTracker
from frigate.util import intersection_over_union from frigate.util.image import intersection_over_union
# Normalizes distance from estimate relative to object size # Normalizes distance from estimate relative to object size

177
frigate/util/builtin.py Normal file
View File

@ -0,0 +1,177 @@
"""Utilities for builtin types manipulation."""
import copy
import datetime
import logging
import pytz
import re
import shlex
import urllib.parse
from collections import Counter
from collections.abc import Mapping
from typing import Any, Tuple
import yaml
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
logger = logging.getLogger(__name__)
class EventsPerSecond:
def __init__(self, max_events=1000, last_n_seconds=10):
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
def start(self):
self._start = datetime.datetime.now().timestamp()
def update(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
self.expire_timestamps(now)
seconds = min(now - self._start, self._last_n_seconds)
# avoid divide by zero
if seconds == 0:
seconds = 1
return len(self._timestamps) / seconds
# remove aged out timestamps
def expire_timestamps(self, now):
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
"""
:param dct1: First dict to merge
:param dct2: Second dict to merge
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
:return: The merge dictionary
"""
merged = copy.deepcopy(dct1)
for k, v2 in dct2.items():
if k in merged:
v1 = merged[k]
if isinstance(v1, dict) and isinstance(v2, Mapping):
merged[k] = deep_merge(v1, v2, override)
elif isinstance(v1, list) and isinstance(v2, list):
if merge_lists:
merged[k] = v1 + v2
else:
if override:
merged[k] = copy.deepcopy(v2)
else:
merged[k] = copy.deepcopy(v2)
return merged
def load_config_with_no_duplicates(raw_config) -> dict:
"""Get config ensuring duplicate keys are not allowed."""
# https://stackoverflow.com/a/71751051
class PreserveDuplicatesLoader(yaml.loader.Loader):
pass
def map_constructor(loader, node, deep=False):
keys = [loader.construct_object(node, deep=deep) for node, _ in node.value]
vals = [loader.construct_object(node, deep=deep) for _, node in node.value]
key_count = Counter(keys)
data = {}
for key, val in zip(keys, vals):
if key_count[key] > 1:
raise ValueError(
f"Config input {key} is defined multiple times for the same field, this is not allowed."
)
else:
data[key] = val
return data
PreserveDuplicatesLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor
)
return yaml.load(raw_config, PreserveDuplicatesLoader)
def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line."""
if "rtsp://" in line:
return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
else:
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line)
def escape_special_characters(path: str) -> str:
"""Cleans reserved characters to encodings for ffmpeg."""
try:
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
pw = found[(found.index(":") + 1) :]
return path.replace(pw, urllib.parse.quote_plus(pw))
except AttributeError:
# path does not have user:pass
return path
def get_ffmpeg_arg_list(arg: Any) -> list:
"""Use arg if list or convert to list format."""
return arg if isinstance(arg, list) else shlex.split(arg)
def load_labels(path, encoding="utf-8"):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, "r", encoding=encoding) as f:
labels = {index: "unknown" for index in range(91)}
lines = f.readlines()
if not lines:
return {}
if lines[0].split(" ", maxsplit=1)[0].isdigit():
pairs = [line.split(" ", maxsplit=1) for line in lines]
labels.update({int(index): label.strip() for index, label in pairs})
else:
labels.update({index: line.strip() for index, line in enumerate(lines)})
return labels
def get_tz_modifiers(tz_name: str) -> Tuple[str, str]:
seconds_offset = (
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds()
)
hours_offset = int(seconds_offset / 60 / 60)
minutes_offset = int(seconds_offset / 60 - hours_offset * 60)
hour_modifier = f"{hours_offset} hour"
minute_modifier = f"{minutes_offset} minute"
return hour_modifier, minute_modifier
def to_relative_box(
width: int, height: int, box: Tuple[int, int, int, int]
) -> Tuple[int, int, int, int]:
return (
box[0] / width, # x
box[1] / height, # y
(box[2] - box[0]) / width, # w
(box[3] - box[1]) / height, # h
)

601
frigate/util.py → frigate/util/image.py Executable file → Normal file
View File

@ -1,83 +1,18 @@
import copy """Utilities for creating and manipulating image frames."""
import datetime import datetime
import json
import logging import logging
import os
import re
import shlex
import signal
import subprocess as sp
import traceback
import urllib.parse
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import Counter
from collections.abc import Mapping
from multiprocessing import shared_memory from multiprocessing import shared_memory
from typing import Any, AnyStr, Optional, Tuple from typing import AnyStr, Optional
import cv2 import cv2
import numpy as np import numpy as np
import psutil
import py3nvml.py3nvml as nvml
import pytz
import yaml
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
"""
:param dct1: First dict to merge
:param dct2: Second dict to merge
:param override: if same key exists in both dictionaries, should override? otherwise ignore. (default=True)
:return: The merge dictionary
"""
merged = copy.deepcopy(dct1)
for k, v2 in dct2.items():
if k in merged:
v1 = merged[k]
if isinstance(v1, dict) and isinstance(v2, Mapping):
merged[k] = deep_merge(v1, v2, override)
elif isinstance(v1, list) and isinstance(v2, list):
if merge_lists:
merged[k] = v1 + v2
else:
if override:
merged[k] = copy.deepcopy(v2)
else:
merged[k] = copy.deepcopy(v2)
return merged
def load_config_with_no_duplicates(raw_config) -> dict:
"""Get config ensuring duplicate keys are not allowed."""
# https://stackoverflow.com/a/71751051
class PreserveDuplicatesLoader(yaml.loader.Loader):
pass
def map_constructor(loader, node, deep=False):
keys = [loader.construct_object(node, deep=deep) for node, _ in node.value]
vals = [loader.construct_object(node, deep=deep) for _, node in node.value]
key_count = Counter(keys)
data = {}
for key, val in zip(keys, vals):
if key_count[key] > 1:
raise ValueError(
f"Config input {key} is defined multiple times for the same field, this is not allowed."
)
else:
data[key] = val
return data
PreserveDuplicatesLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor
)
return yaml.load(raw_config, PreserveDuplicatesLoader)
def draw_timestamp( def draw_timestamp(
frame, frame,
timestamp, timestamp,
@ -638,433 +573,6 @@ def clipped(obj, frame_shape):
else: else:
return False return False
def restart_frigate():
proc = psutil.Process(1)
# if this is running via s6, sigterm pid 1
if proc.name() == "s6-svscan":
proc.terminate()
# otherwise, just try and exit frigate
else:
os.kill(os.getpid(), signal.SIGTERM)
class EventsPerSecond:
def __init__(self, max_events=1000, last_n_seconds=10):
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
def start(self):
self._start = datetime.datetime.now().timestamp()
def update(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self):
now = datetime.datetime.now().timestamp()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
self.expire_timestamps(now)
seconds = min(now - self._start, self._last_n_seconds)
# avoid divide by zero
if seconds == 0:
seconds = 1
return len(self._timestamps) / seconds
# remove aged out timestamps
def expire_timestamps(self, now):
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
def print_stack(sig, frame):
traceback.print_stack(frame)
def listen():
signal.signal(signal.SIGUSR1, print_stack)
def create_mask(frame_shape, mask):
mask_img = np.zeros(frame_shape, np.uint8)
mask_img[:] = 255
if isinstance(mask, list):
for m in mask:
add_mask(m, mask_img)
elif isinstance(mask, str):
add_mask(mask, mask_img)
return mask_img
def add_mask(mask, mask_img):
points = mask.split(",")
contour = np.array(
[[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)]
)
cv2.fillPoly(mask_img, pts=[contour], color=(0))
def load_labels(path, encoding="utf-8"):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, "r", encoding=encoding) as f:
labels = {index: "unknown" for index in range(91)}
lines = f.readlines()
if not lines:
return {}
if lines[0].split(" ", maxsplit=1)[0].isdigit():
pairs = [line.split(" ", maxsplit=1) for line in lines]
labels.update({int(index): label.strip() for index, label in pairs})
else:
labels.update({index: line.strip() for index, line in enumerate(lines)})
return labels
def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line."""
if "rtsp://" in line:
return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
else:
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line)
def escape_special_characters(path: str) -> str:
"""Cleans reserved characters to encodings for ffmpeg."""
try:
found = re.search(REGEX_RTSP_CAMERA_USER_PASS, path).group(0)[3:-1]
pw = found[(found.index(":") + 1) :]
return path.replace(pw, urllib.parse.quote_plus(pw))
except AttributeError:
# path does not have user:pass
return path
def get_cgroups_version() -> str:
"""Determine what version of cgroups is enabled."""
cgroup_path = "/sys/fs/cgroup"
if not os.path.ismount(cgroup_path):
logger.debug(f"{cgroup_path} is not a mount point.")
return "unknown"
try:
with open("/proc/mounts", "r") as f:
mounts = f.readlines()
for mount in mounts:
mount_info = mount.split()
if mount_info[1] == cgroup_path:
fs_type = mount_info[2]
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
return "cgroup2"
elif fs_type == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
)
break
except Exception as e:
logger.debug(f"Could not determine cgroups version: {e}")
return "unknown"
def get_docker_memlimit_bytes() -> int:
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
# check running a supported cgroups version
if get_cgroups_version() == "cgroup2":
memlimit_path = "/sys/fs/cgroup/memory.max"
try:
with open(memlimit_path, "r") as f:
value = f.read().strip()
if value.isnumeric():
return int(value)
elif value.lower() == "max":
return -1
except Exception as e:
logger.debug(f"Unable to get docker memlimit: {e}")
return -1
def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
docker_memlimit = get_docker_memlimit_bytes() / 1024
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
pid = process.info["pid"]
try:
cpu_percent = process.info["cpu_percent"]
cmdline = process.info["cmdline"]
with open(f"/proc/{pid}/stat", "r") as f:
stats = f.readline().split()
utime = int(stats[13])
stime = int(stats[14])
starttime = int(stats[21])
with open("/proc/uptime") as f:
system_uptime_sec = int(float(f.read().split()[0]))
clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
process_utime_sec = utime // clk_tck
process_stime_sec = stime // clk_tck
process_starttime_sec = starttime // clk_tck
process_elapsed_sec = system_uptime_sec - process_starttime_sec
process_usage_sec = process_utime_sec + process_stime_sec
cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
with open(f"/proc/{pid}/statm", "r") as f:
mem_stats = f.readline().split()
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
if docker_memlimit > 0:
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
else:
mem_pct = round((mem_res / total_mem) * 100, 1)
usages[pid] = {
"cpu": str(cpu_percent),
"cpu_average": str(round(cpu_average_usage, 2)),
"mem": f"{mem_pct}",
"cmdline": " ".join(cmdline),
}
except Exception:
continue
return usages
def get_physical_interfaces(interfaces) -> list:
with open("/proc/net/dev", "r") as file:
lines = file.readlines()
physical_interfaces = []
for line in lines:
if ":" in line:
interface = line.split(":")[0].strip()
for int in interfaces:
if interface.startswith(int):
physical_interfaces.append(interface)
return physical_interfaces
def get_bandwidth_stats(config) -> dict[str, dict]:
"""Get bandwidth usages for each ffmpeg process id"""
usages = {}
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
config.telemetry.network_interfaces
)
p = sp.run(
top_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
return usages
else:
lines = p.stdout.split("\n")
for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split("\t")))
try:
if re.search(
r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
):
process = stats[0].split("/")
usages[process[len(process) - 2]] = {
"bandwidth": round(float(stats[1]) + float(stats[2]), 1),
}
except (IndexError, ValueError):
continue
return usages
def get_amd_gpu_stats() -> dict[str, str]:
"""Get stats using radeontop."""
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
p = sp.run(
radeontop_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
return None
else:
usages = p.stdout.split(",")
results: dict[str, str] = {}
for hw in usages:
if "gpu" in hw:
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
elif "vram" in hw:
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
return results
def get_intel_gpu_stats() -> dict[str, str]:
"""Get stats using intel_gpu_top."""
intel_gpu_top_command = [
"timeout",
"0.5s",
"intel_gpu_top",
"-J",
"-o",
"-",
"-s",
"1",
]
p = sp.run(
intel_gpu_top_command,
encoding="ascii",
capture_output=True,
)
# timeout has a non-zero returncode when timeout is reached
if p.returncode != 124:
logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
return None
else:
reading = "".join(p.stdout.split())
results: dict[str, str] = {}
# render is used for qsv
render = []
for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[14:])
single = packet.get("busy", 0.0)
render.append(float(single))
if render:
render_avg = sum(render) / len(render)
else:
render_avg = 1
# video is used for vaapi
video = []
for result in re.findall('"Video/\d":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[10:])
single = packet.get("busy", 0.0)
video.append(float(single))
if video:
video_avg = sum(video) / len(video)
else:
video_avg = 1
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
results["mem"] = "-%"
return results
def try_get_info(f, h, default="N/A"):
try:
v = f(h)
except nvml.NVMLError_NotSupported:
v = default
return v
def get_nvidia_gpu_stats() -> dict[int, dict]:
results = {}
try:
nvml.nvmlInit()
deviceCount = nvml.nvmlDeviceGetCount()
for i in range(deviceCount):
handle = nvml.nvmlDeviceGetHandleByIndex(i)
meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
if util != "N/A":
gpu_util = util.gpu
else:
gpu_util = 0
if meminfo != "N/A":
gpu_mem_util = meminfo.used / meminfo.total * 100
else:
gpu_mem_util = -1
results[i] = {
"name": nvml.nvmlDeviceGetName(handle),
"gpu": gpu_util,
"mem": gpu_mem_util,
}
except Exception:
pass
finally:
return results
def ffprobe_stream(path: str) -> sp.CompletedProcess:
"""Run ffprobe on stream."""
clean_path = escape_special_characters(path)
ffprobe_cmd = [
"ffprobe",
"-timeout",
"1000000",
"-print_format",
"json",
"-show_entries",
"stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
"-loglevel",
"quiet",
clean_path,
]
return sp.run(ffprobe_cmd, capture_output=True)
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
ffprobe_cmd = (
["vainfo"]
if not device_name
else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
)
return sp.run(ffprobe_cmd, capture_output=True)
def get_ffmpeg_arg_list(arg: Any) -> list:
"""Use arg if list or convert to list format."""
return arg if isinstance(arg, list) else shlex.split(arg)
class FrameManager(ABC): class FrameManager(ABC):
@abstractmethod @abstractmethod
def create(self, name, size) -> AnyStr: def create(self, name, size) -> AnyStr:
@ -1131,90 +639,23 @@ class SharedMemoryFrameManager(FrameManager):
self.shm_store[name].unlink() self.shm_store[name].unlink()
del self.shm_store[name] del self.shm_store[name]
def create_mask(frame_shape, mask):
mask_img = np.zeros(frame_shape, np.uint8)
mask_img[:] = 255
def get_tz_modifiers(tz_name: str) -> Tuple[str, str]: if isinstance(mask, list):
seconds_offset = ( for m in mask:
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds() add_mask(m, mask_img)
elif isinstance(mask, str):
add_mask(mask, mask_img)
return mask_img
def add_mask(mask, mask_img):
points = mask.split(",")
contour = np.array(
[[int(points[i]), int(points[i + 1])] for i in range(0, len(points), 2)]
) )
hours_offset = int(seconds_offset / 60 / 60) cv2.fillPoly(mask_img, pts=[contour], color=(0))
minutes_offset = int(seconds_offset / 60 - hours_offset * 60)
hour_modifier = f"{hours_offset} hour"
minute_modifier = f"{minutes_offset} minute"
return hour_modifier, minute_modifier
def to_relative_box(
width: int, height: int, box: Tuple[int, int, int, int]
) -> Tuple[int, int, int, int]:
return (
box[0] / width, # x
box[1] / height, # y
(box[2] - box[0]) / width, # w
(box[3] - box[1]) / height, # h
)
def get_video_properties(url, get_duration=False):
def calculate_duration(video: Optional[any]) -> float:
duration = None
if video is not None:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
return duration
width = height = 0
try:
# Open the video stream
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
result = {}
if get_duration:
result["duration"] = calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Release the video stream
video.release()
result["width"] = round(width)
result["height"] = round(height)
return result

404
frigate/util/services.py Normal file
View File

@ -0,0 +1,404 @@
"""Utilities for services."""
import json
import logging
import os
import re
import signal
import subprocess as sp
import traceback
from typing import Optional
import cv2
import psutil
import py3nvml.py3nvml as nvml
from frigate.util.builtin import escape_special_characters
logger = logging.getLogger(__name__)
def restart_frigate():
proc = psutil.Process(1)
# if this is running via s6, sigterm pid 1
if proc.name() == "s6-svscan":
proc.terminate()
# otherwise, just try and exit frigate
else:
os.kill(os.getpid(), signal.SIGTERM)
def print_stack(sig, frame):
traceback.print_stack(frame)
def listen():
signal.signal(signal.SIGUSR1, print_stack)
def get_cgroups_version() -> str:
"""Determine what version of cgroups is enabled."""
cgroup_path = "/sys/fs/cgroup"
if not os.path.ismount(cgroup_path):
logger.debug(f"{cgroup_path} is not a mount point.")
return "unknown"
try:
with open("/proc/mounts", "r") as f:
mounts = f.readlines()
for mount in mounts:
mount_info = mount.split()
if mount_info[1] == cgroup_path:
fs_type = mount_info[2]
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
return "cgroup2"
elif fs_type == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
)
break
except Exception as e:
logger.debug(f"Could not determine cgroups version: {e}")
return "unknown"
def get_docker_memlimit_bytes() -> int:
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
# check running a supported cgroups version
if get_cgroups_version() == "cgroup2":
memlimit_path = "/sys/fs/cgroup/memory.max"
try:
with open(memlimit_path, "r") as f:
value = f.read().strip()
if value.isnumeric():
return int(value)
elif value.lower() == "max":
return -1
except Exception as e:
logger.debug(f"Unable to get docker memlimit: {e}")
return -1
def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
docker_memlimit = get_docker_memlimit_bytes() / 1024
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
pid = process.info["pid"]
try:
cpu_percent = process.info["cpu_percent"]
cmdline = process.info["cmdline"]
with open(f"/proc/{pid}/stat", "r") as f:
stats = f.readline().split()
utime = int(stats[13])
stime = int(stats[14])
starttime = int(stats[21])
with open("/proc/uptime") as f:
system_uptime_sec = int(float(f.read().split()[0]))
clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
process_utime_sec = utime // clk_tck
process_stime_sec = stime // clk_tck
process_starttime_sec = starttime // clk_tck
process_elapsed_sec = system_uptime_sec - process_starttime_sec
process_usage_sec = process_utime_sec + process_stime_sec
cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
with open(f"/proc/{pid}/statm", "r") as f:
mem_stats = f.readline().split()
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
if docker_memlimit > 0:
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
else:
mem_pct = round((mem_res / total_mem) * 100, 1)
usages[pid] = {
"cpu": str(cpu_percent),
"cpu_average": str(round(cpu_average_usage, 2)),
"mem": f"{mem_pct}",
"cmdline": " ".join(cmdline),
}
except Exception:
continue
return usages
def get_physical_interfaces(interfaces) -> list:
with open("/proc/net/dev", "r") as file:
lines = file.readlines()
physical_interfaces = []
for line in lines:
if ":" in line:
interface = line.split(":")[0].strip()
for int in interfaces:
if interface.startswith(int):
physical_interfaces.append(interface)
return physical_interfaces
def get_bandwidth_stats(config) -> dict[str, dict]:
"""Get bandwidth usages for each ffmpeg process id"""
usages = {}
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
config.telemetry.network_interfaces
)
p = sp.run(
top_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
return usages
else:
lines = p.stdout.split("\n")
for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split("\t")))
try:
if re.search(
r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
):
process = stats[0].split("/")
usages[process[len(process) - 2]] = {
"bandwidth": round(float(stats[1]) + float(stats[2]), 1),
}
except (IndexError, ValueError):
continue
return usages
def get_amd_gpu_stats() -> dict[str, str]:
"""Get stats using radeontop."""
radeontop_command = ["radeontop", "-d", "-", "-l", "1"]
p = sp.run(
radeontop_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
return None
else:
usages = p.stdout.split(",")
results: dict[str, str] = {}
for hw in usages:
if "gpu" in hw:
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
elif "vram" in hw:
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
return results
def get_intel_gpu_stats() -> dict[str, str]:
"""Get stats using intel_gpu_top."""
intel_gpu_top_command = [
"timeout",
"0.5s",
"intel_gpu_top",
"-J",
"-o",
"-",
"-s",
"1",
]
p = sp.run(
intel_gpu_top_command,
encoding="ascii",
capture_output=True,
)
# timeout has a non-zero returncode when timeout is reached
if p.returncode != 124:
logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
return None
else:
reading = "".join(p.stdout.split())
results: dict[str, str] = {}
# render is used for qsv
render = []
for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[14:])
single = packet.get("busy", 0.0)
render.append(float(single))
if render:
render_avg = sum(render) / len(render)
else:
render_avg = 1
# video is used for vaapi
video = []
for result in re.findall('"Video/\d":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[10:])
single = packet.get("busy", 0.0)
video.append(float(single))
if video:
video_avg = sum(video) / len(video)
else:
video_avg = 1
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
results["mem"] = "-%"
return results
def try_get_info(f, h, default="N/A"):
try:
v = f(h)
except nvml.NVMLError_NotSupported:
v = default
return v
def get_nvidia_gpu_stats() -> dict[int, dict]:
results = {}
try:
nvml.nvmlInit()
deviceCount = nvml.nvmlDeviceGetCount()
for i in range(deviceCount):
handle = nvml.nvmlDeviceGetHandleByIndex(i)
meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
if util != "N/A":
gpu_util = util.gpu
else:
gpu_util = 0
if meminfo != "N/A":
gpu_mem_util = meminfo.used / meminfo.total * 100
else:
gpu_mem_util = -1
results[i] = {
"name": nvml.nvmlDeviceGetName(handle),
"gpu": gpu_util,
"mem": gpu_mem_util,
}
except Exception:
pass
finally:
return results
def ffprobe_stream(path: str) -> sp.CompletedProcess:
"""Run ffprobe on stream."""
clean_path = escape_special_characters(path)
ffprobe_cmd = [
"ffprobe",
"-timeout",
"1000000",
"-print_format",
"json",
"-show_entries",
"stream=codec_long_name,width,height,bit_rate,duration,display_aspect_ratio,avg_frame_rate",
"-loglevel",
"quiet",
clean_path,
]
return sp.run(ffprobe_cmd, capture_output=True)
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
ffprobe_cmd = (
["vainfo"]
if not device_name
else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
)
return sp.run(ffprobe_cmd, capture_output=True)
def get_video_properties(url, get_duration=False):
def calculate_duration(video: Optional[any]) -> float:
duration = None
if video is not None:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
return duration
width = height = 0
try:
# Open the video stream
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
result = {}
if get_duration:
result["duration"] = calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Release the video stream
video.release()
result["width"] = round(width)
result["height"] = round(height)
return result

View File

@ -23,8 +23,8 @@ from frigate.motion.improved_motion import ImprovedMotionDetector
from frigate.object_detection import RemoteObjectDetector from frigate.object_detection import RemoteObjectDetector
from frigate.track import ObjectTracker from frigate.track import ObjectTracker
from frigate.track.norfair_tracker import NorfairTracker from frigate.track.norfair_tracker import NorfairTracker
from frigate.util import ( from frigate.util.builtin import EventsPerSecond
EventsPerSecond, from frigate.util.image import (
FrameManager, FrameManager,
SharedMemoryFrameManager, SharedMemoryFrameManager,
area, area,
@ -32,11 +32,11 @@ from frigate.util import (
draw_box_with_label, draw_box_with_label,
intersection, intersection,
intersection_over_union, intersection_over_union,
listen,
yuv_region_2_bgr, yuv_region_2_bgr,
yuv_region_2_rgb, yuv_region_2_rgb,
yuv_region_2_yuv, yuv_region_2_yuv,
) )
from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@ -5,7 +5,7 @@ import time
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
from frigate.util import restart_frigate from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)