This commit is contained in:
Nick Mowen 2023-06-17 09:54:29 -06:00
parent 51e1e5b7a5
commit 021ed5c141
9 changed files with 754 additions and 441 deletions

View File

@ -1,40 +1,53 @@
import datetime
import logging
import multiprocessing as mp
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event as MpEvent
import os
import shutil
import signal
import sys
from typing import Optional
from types import FrameType
import traceback
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event as MpEvent
from types import FrameType
from typing import Optional
import psutil
from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.audio import capture_audio, process_audio
from frigate.comms.dispatcher import Communicator, Dispatcher
from frigate.comms.mqtt import MqttClient
from frigate.comms.ws import WebSocketClient
from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.object_detection import ObjectDetectProcess
from frigate.events import EventCleanup, EventProcessor
from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
CONFIG_DIR,
DEFAULT_DB_PATH,
EXPORT_DIR,
MODEL_CACHE_DIR,
RECORD_DIR,
)
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.http import create_app
from frigate.log import log_process, root_configurer
from frigate.models import Event, Recordings
from frigate.models import Event, Recordings, Timeline
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.output import output_frames
from frigate.plus import PlusApi
from frigate.record import RecordingCleanup, RecordingMaintainer
from frigate.restream import RestreamApi
from frigate.ptz import OnvifController
from frigate.record.record import manage_recordings
from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, RecordMetricsTypes
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog
from frigate.types import CameraMetricsTypes
logger = logging.getLogger(__name__)
@ -43,20 +56,28 @@ class FrigateApp:
def __init__(self) -> None:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.audio_detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.record_metrics: dict[str, RecordMetricsTypes] = {}
self.processes: dict[str, int] = {}
def set_environment_vars(self) -> None:
for key, value in self.config.environment_vars.items():
os.environ[key] = value
def ensure_dirs(self) -> None:
for d in [RECORD_DIR, CLIPS_DIR, CACHE_DIR]:
for d in [
CONFIG_DIR,
RECORD_DIR,
CLIPS_DIR,
CACHE_DIR,
MODEL_CACHE_DIR,
EXPORT_DIR,
]:
if not os.path.exists(d) and not os.path.islink(d):
logger.info(f"Creating directory: {d}")
os.makedirs(d)
@ -69,6 +90,7 @@ class FrigateApp:
)
self.log_process.daemon = True
self.log_process.start()
self.processes["logger"] = self.log_process.pid or 0
root_configurer(self.log_queue)
def init_config(self) -> None:
@ -80,7 +102,7 @@ class FrigateApp:
config_file = config_file_yaml
user_config = FrigateConfig.parse_file(config_file)
self.config = user_config.runtime_config
self.config = user_config.runtime_config(self.plus_api)
for camera_name in self.config.cameras.keys():
# create camera_metrics
@ -106,21 +128,26 @@ class FrigateApp:
"read_start": mp.Value("d", 0.0),
"ffmpeg_pid": mp.Value("i", 0),
"frame_queue": mp.Queue(maxsize=2),
"audio_queue": mp.Queue(maxsize=2),
"capture_process": None,
"audio_capture": None,
"audio_process": None,
"process": None,
}
self.record_metrics[camera_name] = {
"record_enabled": mp.Value(
"i", self.config.cameras[camera_name].record.enabled
)
}
def set_log_levels(self) -> None:
logging.getLogger().setLevel(self.config.logger.default.value.upper())
for log, level in self.config.logger.logs.items():
logging.getLogger(log).setLevel(level.value.upper())
if not "werkzeug" in self.config.logger.logs:
if "werkzeug" not in self.config.logger.logs:
logging.getLogger("werkzeug").setLevel("ERROR")
if "ws4py" not in self.config.logger.logs:
logging.getLogger("ws4py").setLevel("ERROR")
def init_queues(self) -> None:
# Queues for clip processing
self.event_queue: Queue = mp.Queue()
@ -137,9 +164,21 @@ class FrigateApp:
# Queue for recordings info
self.recordings_info_queue: Queue = mp.Queue()
# Queue for timeline events
self.timeline_queue: Queue = mp.Queue()
def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:
db.execute_sql("VACUUM;")
try:
with open(f"{CONFIG_DIR}/.vacuum", "w") as f:
f.write(str(datetime.datetime.now().timestamp()))
except PermissionError:
logger.error("Unable to write to /config to save DB state")
# Migrate DB location
old_db_path = os.path.join(CLIPS_DIR, "frigate.db")
old_db_path = DEFAULT_DB_PATH
if not os.path.isfile(self.config.database.path) and os.path.isfile(
old_db_path
):
@ -153,14 +192,68 @@ class FrigateApp:
router = Router(migrate_db)
router.run()
# check if vacuum needs to be run
if os.path.exists(f"{CONFIG_DIR}/.vacuum"):
with open(f"{CONFIG_DIR}/.vacuum") as f:
try:
timestamp = round(float(f.readline()))
except Exception:
timestamp = 0
if (
timestamp
< (
datetime.datetime.now() - datetime.timedelta(weeks=2)
).timestamp()
):
vacuum_db(migrate_db)
else:
vacuum_db(migrate_db)
migrate_db.close()
self.db = SqliteQueueDatabase(self.config.database.path)
models = [Event, Recordings]
def init_go2rtc(self) -> None:
for proc in psutil.process_iter(["pid", "name"]):
if proc.info["name"] == "go2rtc":
logger.info(f"go2rtc process pid: {proc.info['pid']}")
self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None:
recording_process = mp.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config, self.recordings_info_queue, self.record_metrics),
)
recording_process.daemon = True
self.recording_process = recording_process
recording_process.start()
self.processes["recording"] = recording_process.pid or 0
logger.info(f"Recording process started: {recording_process.pid}")
def bind_database(self) -> None:
"""Bind db to the main process."""
# NOTE: all db accessing processes need to be created before the db can be bound to the main process
self.db = SqliteQueueDatabase(
self.config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache,
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=60,
)
models = [Event, Recordings, Timeline]
self.db.bind(models)
def init_stats(self) -> None:
self.stats_tracking = stats_init(self.camera_metrics, self.detectors)
self.stats_tracking = stats_init(
self.config, self.camera_metrics, self.detectors, self.processes
)
def init_external_event_processor(self) -> None:
self.external_event_processor = ExternalEventProcessor(
self.config, self.event_queue
)
def init_web_server(self) -> None:
self.flask_app = create_app(
@ -169,12 +262,13 @@ class FrigateApp:
self.stats_tracking,
self.detected_frames_processor,
self.storage_maintainer,
self.onvif_controller,
self.external_event_processor,
self.plus_api,
)
def init_restream(self) -> None:
self.restream = RestreamApi(self.config)
self.restream.add_cameras()
def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config)
def init_dispatcher(self) -> None:
comms: list[Communicator] = []
@ -182,12 +276,17 @@ class FrigateApp:
if self.config.mqtt.enabled:
comms.append(MqttClient(self.config))
self.ws_client = WebSocketClient(self.config)
comms.append(self.ws_client)
self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms)
comms.append(WebSocketClient(self.config))
self.dispatcher = Dispatcher(
self.config,
self.onvif_controller,
self.camera_metrics,
self.record_metrics,
comms,
)
def start_detectors(self) -> None:
for name, camera_config in self.config.cameras.items():
for name in self.config.cameras.keys():
self.detection_out_events[name] = mp.Event()
try:
@ -195,7 +294,6 @@ class FrigateApp:
[
det.model.height * det.model.width * 3
for (name, det) in self.config.detectors.items()
if det.model.type == "object"
]
)
shm_in = mp.shared_memory.SharedMemory(
@ -216,43 +314,10 @@ class FrigateApp:
self.detection_shms.append(shm_in)
self.detection_shms.append(shm_out)
if any(
["detect_audio" in input.roles for input in camera_config.ffmpeg.inputs]
):
self.detection_out_events[f"{name}-audio"] = mp.Event()
try:
shm_in_audio = mp.shared_memory.SharedMemory(
name=f"{name}-audio",
create=True,
size=int(
round(
self.config.audio_model.duration
* self.config.audio_model.sample_rate
)
)
* 4, # stored as float32, so 4 bytes per sample
)
except FileExistsError:
shm_in_audio = mp.shared_memory.SharedMemory(name=f"{name}-audio")
try:
shm_out_audio = mp.shared_memory.SharedMemory(
name=f"out-{name}-audio", create=True, size=20 * 6 * 4
)
except FileExistsError:
shm_out_audio = mp.shared_memory.SharedMemory(
name=f"out-{name}-audio"
)
self.detection_shms.append(shm_in_audio)
self.detection_shms.append(shm_out_audio)
for name, detector_config in self.config.detectors.items():
self.detectors[name] = ObjectDetectProcess(
name,
self.audio_detection_queue
if detector_config.model.type == "audio"
else self.detection_queue,
self.detection_queue,
self.detection_out_events,
detector_config,
)
@ -273,7 +338,7 @@ class FrigateApp:
def start_video_output_processor(self) -> None:
output_processor = mp.Process(
target=output_frames,
name=f"output_processor",
name="output_processor",
args=(
self.config,
self.video_output_queue,
@ -284,54 +349,6 @@ class FrigateApp:
output_processor.start()
logger.info(f"Output process started: {output_processor.pid}")
def start_audio_processors(self) -> None:
# Make sure we have audio detectors
if not any(
[det.model.type == "audio" for det in self.config.detectors.values()]
):
return
for name, config in self.config.cameras.items():
if not any(
["detect_audio" in inputs.roles for inputs in config.ffmpeg.inputs]
):
continue
if not config.enabled:
logger.info(f"Audio processor not started for disabled camera {name}")
continue
audio_capture = mp.Process(
target=capture_audio,
name=f"audio_capture:{name}",
args=(
name,
self.config.audio_model,
self.camera_metrics[name],
),
)
audio_capture.daemon = True
self.camera_metrics[name]["audio_capture"] = audio_capture
audio_capture.start()
logger.info(f"Audio capture started for {name}: {audio_capture.pid}")
audio_process = mp.Process(
target=process_audio,
name=f"audio_process:{name}",
args=(
name,
config,
self.config.audio_model,
self.config.audio_model.merged_labelmap,
self.audio_detection_queue,
self.detection_out_events[f"{name}-audio"],
self.camera_metrics[name],
),
)
audio_process.daemon = True
self.camera_metrics[name]["audio_process"] = audio_process
audio_process.start()
logger.info(f"Audio processor started for {name}: {audio_process.pid}")
def start_camera_processors(self) -> None:
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled:
@ -373,12 +390,19 @@ class FrigateApp:
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def start_timeline_processor(self) -> None:
self.timeline_processor = TimelineProcessor(
self.config, self.timeline_queue, self.stop_event
)
self.timeline_processor.start()
def start_event_processor(self) -> None:
self.event_processor = EventProcessor(
self.config,
self.camera_metrics,
self.event_queue,
self.event_processed_queue,
self.timeline_queue,
self.stop_event,
)
self.event_processor.start()
@ -387,16 +411,6 @@ class FrigateApp:
self.event_cleanup = EventCleanup(self.config, self.stop_event)
self.event_cleanup.start()
def start_recording_maintainer(self) -> None:
self.recording_maintainer = RecordingMaintainer(
self.config, self.recordings_info_queue, self.stop_event
)
self.recording_maintainer.start()
def start_recording_cleanup(self) -> None:
self.recording_cleanup = RecordingCleanup(self.config, self.stop_event)
self.recording_cleanup.start()
def start_storage_maintainer(self) -> None:
self.storage_maintainer = StorageMaintainer(self.config, self.stop_event)
self.storage_maintainer.start()
@ -414,10 +428,27 @@ class FrigateApp:
self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
self.frigate_watchdog.start()
def check_shm(self) -> None:
available_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1)
min_req_shm = 30
for _, camera in self.config.cameras.items():
min_req_shm += round(
(camera.detect.width * camera.detect.height * 1.5 * 9 + 270480)
/ 1048576,
1,
)
if available_shm < min_req_shm:
logger.warning(
f"The current SHM size of {available_shm}MB is too small, recommend increasing it to at least {min_req_shm}MB."
)
def start(self) -> None:
self.init_logger()
logger.info(f"Starting Frigate ({VERSION})")
try:
self.ensure_dirs()
try:
self.init_config()
except Exception as e:
@ -438,32 +469,33 @@ class FrigateApp:
self.log_process.terminate()
sys.exit(1)
self.set_environment_vars()
self.ensure_dirs()
self.set_log_levels()
self.init_queues()
self.init_database()
self.init_onvif()
self.init_recording_manager()
self.init_go2rtc()
self.bind_database()
self.init_dispatcher()
except Exception as e:
print(e)
self.log_process.terminate()
sys.exit(1)
self.init_restream()
self.start_detectors()
self.start_video_output_processor()
self.start_detected_frames_processor()
self.start_audio_processors()
self.start_camera_processors()
self.start_camera_capture_processes()
self.start_storage_maintainer()
self.init_stats()
self.init_external_event_processor()
self.init_web_server()
self.start_timeline_processor()
self.start_event_processor()
self.start_event_cleanup()
self.start_recording_maintainer()
self.start_recording_cleanup()
self.start_stats_emitter()
self.start_watchdog()
# self.zeroconf = broadcast_zeroconf(self.config.mqtt.client_id)
self.check_shm()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
self.stop()
@ -479,23 +511,41 @@ class FrigateApp:
self.stop()
def stop(self) -> None:
logger.info(f"Stopping...")
logger.info("Stopping...")
self.stop_event.set()
self.ws_client.stop()
self.detected_frames_processor.join()
self.event_processor.join()
self.event_cleanup.join()
self.recording_maintainer.join()
self.recording_cleanup.join()
self.stats_emitter.join()
self.frigate_watchdog.join()
self.db.stop()
for detector in self.detectors.values():
detector.stop()
# Empty the detection queue and set the events for all requests
while not self.detection_queue.empty():
connection_id = self.detection_queue.get(timeout=1)
self.detection_out_events[connection_id].set()
self.detection_queue.close()
self.detection_queue.join_thread()
self.dispatcher.stop()
self.detected_frames_processor.join()
self.event_processor.join()
self.event_cleanup.join()
self.stats_emitter.join()
self.frigate_watchdog.join()
self.db.stop()
while len(self.detection_shms) > 0:
shm = self.detection_shms.pop()
shm.close()
shm.unlink()
for queue in [
self.event_queue,
self.event_processed_queue,
self.video_output_queue,
self.detected_frames_queue,
self.recordings_info_queue,
self.log_queue,
]:
while not queue.empty():
queue.get_nowait()
queue.close()
queue.join_thread()

View File

@ -8,24 +8,12 @@ from typing import Dict, List, Optional, Tuple, Union
import matplotlib.pyplot as plt
import numpy as np
import yaml
from pydantic import BaseModel, Extra, Field, validator, parse_obj_as
from pydantic import BaseModel, Extra, Field, parse_obj_as, validator
from pydantic.fields import PrivateAttr
from frigate.const import (
BASE_DIR,
CACHE_DIR,
REGEX_CAMERA_NAME,
YAML_EXT,
)
from frigate.util import (
create_mask,
deep_merge,
get_ffmpeg_arg_list,
escape_special_characters,
load_config_with_no_duplicates,
load_labels,
)
from frigate.const import CACHE_DIR, DEFAULT_DB_PATH, REGEX_CAMERA_NAME, YAML_EXT
from frigate.detectors import DetectorConfig, ModelConfig
from frigate.detectors.detector_config import BaseDetectorConfig
from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_decode,
parse_preset_hardware_acceleration_scale,
@ -33,16 +21,14 @@ from frigate.ffmpeg_presets import (
parse_preset_output_record,
parse_preset_output_rtmp,
)
from frigate.detectors import (
PixelFormatEnum,
InputTensorEnum,
DetectorConfig,
ModelConfig,
AudioModelConfig,
ObjectModelConfig,
from frigate.plus import PlusApi
from frigate.util import (
create_mask,
deep_merge,
escape_special_characters,
get_ffmpeg_arg_list,
load_config_with_no_duplicates,
)
from frigate.version import VERSION
logger = logging.getLogger(__name__)
@ -53,7 +39,7 @@ DEFAULT_TIME_FORMAT = "%m/%d/%Y %H:%M:%S"
FRIGATE_ENV_VARS = {k: v for k, v in os.environ.items() if k.startswith("FRIGATE_")}
DEFAULT_TRACKED_OBJECTS = ["person", "Speech"]
DEFAULT_TRACKED_OBJECTS = ["person"]
DEFAULT_DETECTORS = {"cpu": {"type": "cpu"}}
@ -62,8 +48,62 @@ class FrigateBaseModel(BaseModel):
extra = Extra.forbid
class LiveModeEnum(str, Enum):
jsmpeg = "jsmpeg"
mse = "mse"
webrtc = "webrtc"
class TimeFormatEnum(str, Enum):
browser = "browser"
hours12 = "12hour"
hours24 = "24hour"
class DateTimeStyleEnum(str, Enum):
full = "full"
long = "long"
medium = "medium"
short = "short"
class UIConfig(FrigateBaseModel):
live_mode: LiveModeEnum = Field(
default=LiveModeEnum.mse, title="Default Live Mode."
)
timezone: Optional[str] = Field(title="Override UI timezone.")
use_experimental: bool = Field(default=False, title="Experimental UI")
time_format: TimeFormatEnum = Field(
default=TimeFormatEnum.browser, title="Override UI time format."
)
date_style: DateTimeStyleEnum = Field(
default=DateTimeStyleEnum.short, title="Override UI dateStyle."
)
time_style: DateTimeStyleEnum = Field(
default=DateTimeStyleEnum.medium, title="Override UI timeStyle."
)
strftime_fmt: Optional[str] = Field(
default=None, title="Override date and time format using strftime syntax."
)
class StatsConfig(FrigateBaseModel):
amd_gpu_stats: bool = Field(default=True, title="Enable AMD GPU stats.")
intel_gpu_stats: bool = Field(default=True, title="Enable Intel GPU stats.")
network_bandwidth: bool = Field(
default=False, title="Enable network bandwidth for ffmpeg processes."
)
class TelemetryConfig(FrigateBaseModel):
network_interfaces: List[str] = Field(
default=["eth", "enp", "eno", "ens", "wl", "lo"],
title="Enabled network interfaces for bandwidth calculation.",
)
stats: StatsConfig = Field(
default_factory=StatsConfig, title="System Stats Configuration"
)
version_check: bool = Field(default=True, title="Enable latest version check.")
class MqttConfig(FrigateBaseModel):
@ -87,6 +127,13 @@ class MqttConfig(FrigateBaseModel):
return v
class OnvifConfig(FrigateBaseModel):
host: str = Field(default="", title="Onvif Host")
port: int = Field(default=8000, title="Onvif Port")
user: Optional[str] = Field(title="Onvif Username")
password: Optional[str] = Field(title="Onvif Password")
class RetainModeEnum(str, Enum):
all = "all"
motion = "motion"
@ -127,27 +174,31 @@ class RecordConfig(FrigateBaseModel):
default=60,
title="Number of minutes to wait between cleanup runs.",
)
# deprecated - to be removed in a future version
retain_days: Optional[float] = Field(title="Recording retention period in days.")
retain: RecordRetainConfig = Field(
default_factory=RecordRetainConfig, title="Record retention settings."
)
events: EventsConfig = Field(
default_factory=EventsConfig, title="Event specific settings."
)
enabled_in_config: Optional[bool] = Field(
title="Keep track of original state of recording."
)
class MotionConfig(FrigateBaseModel):
threshold: int = Field(
default=25,
default=40,
title="Motion detection threshold (1-255).",
ge=1,
le=255,
)
improve_contrast: bool = Field(default=False, title="Improve Contrast")
contour_area: Optional[int] = Field(default=30, title="Contour Area")
lightning_threshold: float = Field(
default=0.8, title="Lightning detection threshold (0.3-1.0).", ge=0.3, le=1.0
)
improve_contrast: bool = Field(default=True, title="Improve Contrast")
contour_area: Optional[int] = Field(default=15, title="Contour Area")
delta_alpha: float = Field(default=0.2, title="Delta Alpha")
frame_alpha: float = Field(default=0.2, title="Frame Alpha")
frame_alpha: float = Field(default=0.02, title="Frame Alpha")
frame_height: Optional[int] = Field(default=50, title="Frame Height")
mask: Union[str, List[str]] = Field(
default="", title="Coordinates polygon for the motion mask."
@ -198,9 +249,8 @@ class StationaryMaxFramesConfig(FrigateBaseModel):
class StationaryConfig(FrigateBaseModel):
interval: Optional[int] = Field(
default=0,
title="Frame interval for checking stationary objects.",
ge=0,
gt=0,
)
threshold: Optional[int] = Field(
title="Number of frames without a position change for an object to be considered stationary",
@ -226,6 +276,9 @@ class DetectConfig(FrigateBaseModel):
default_factory=StationaryConfig,
title="Stationary objects config.",
)
annotation_offset: int = Field(
default=0, title="Milliseconds to offset detect annotations by."
)
class FilterConfig(FrigateBaseModel):
@ -288,6 +341,12 @@ class ZoneConfig(BaseModel):
coordinates: Union[str, List[str]] = Field(
title="Coordinates polygon for the defined zone."
)
inertia: int = Field(
default=3,
title="Number of consecutive frames required for object to be considered present in the zone.",
gt=0,
le=10,
)
objects: List[str] = Field(
default_factory=list,
title="List of objects that can trigger the zone.",
@ -336,6 +395,7 @@ class BirdseyeModeEnum(str, Enum):
class BirdseyeConfig(FrigateBaseModel):
enabled: bool = Field(default=True, title="Enable birdseye view.")
restream: bool = Field(default=False, title="Restream birdseye via RTSP.")
width: int = Field(default=1280, title="Birdseye width.")
height: int = Field(default=720, title="Birdseye height.")
quality: int = Field(
@ -352,15 +412,24 @@ class BirdseyeConfig(FrigateBaseModel):
# uses BaseModel because some global attributes are not available at the camera level
class BirdseyeCameraConfig(BaseModel):
enabled: bool = Field(default=True, title="Enable birdseye view for camera.")
order: int = Field(default=0, title="Position of the camera in the birdseye view.")
mode: BirdseyeModeEnum = Field(
default=BirdseyeModeEnum.objects, title="Tracking mode for camera."
)
FFMPEG_GLOBAL_ARGS_DEFAULT = ["-hide_banner", "-loglevel", "warning"]
# Note: Setting threads to less than 2 caused several issues with recording segments
# https://github.com/blakeblackshear/frigate/issues/5659
FFMPEG_GLOBAL_ARGS_DEFAULT = ["-hide_banner", "-loglevel", "warning", "-threads", "2"]
FFMPEG_INPUT_ARGS_DEFAULT = "preset-rtsp-generic"
DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "rawvideo", "-pix_fmt", "yuv420p"]
DETECT_AUDIO_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "s16le", "-ar", "16000", "-ac", "1"]
DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = [
"-threads",
"2",
"-f",
"rawvideo",
"-pix_fmt",
"yuv420p",
]
RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT = "preset-rtmp-generic"
RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT = "preset-record-generic"
@ -370,10 +439,6 @@ class FfmpegOutputArgsConfig(FrigateBaseModel):
default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT,
title="Detect role FFmpeg output arguments.",
)
detect_audio: Union[str, List[str]] = Field(
default=DETECT_AUDIO_FFMPEG_OUTPUT_ARGS_DEFAULT,
title="Detect role FFmpeg output arguments.",
)
record: Union[str, List[str]] = Field(
default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT,
title="Record role FFmpeg output arguments.",
@ -402,10 +467,8 @@ class FfmpegConfig(FrigateBaseModel):
class CameraRoleEnum(str, Enum):
record = "record"
restream = "restream"
rtmp = "rtmp"
detect = "detect"
detect_audio = "detect_audio"
class CameraInput(FrigateBaseModel):
@ -433,7 +496,7 @@ class CameraFfmpegConfig(FfmpegConfig):
if len(roles) > len(roles_set):
raise ValueError("Each input role may only be used once.")
if not "detect" in roles:
if "detect" not in roles:
raise ValueError("The detect role is required.")
return v
@ -517,29 +580,15 @@ class RtmpConfig(FrigateBaseModel):
enabled: bool = Field(default=False, title="RTMP restreaming enabled.")
class JsmpegStreamConfig(FrigateBaseModel):
height: int = Field(default=720, title="Live camera view height.")
quality: int = Field(default=8, ge=1, le=31, title="Live camera view quality.")
class CameraLiveConfig(FrigateBaseModel):
stream_name: str = Field(default="", title="Name of restream to use as live view.")
height: int = Field(default=720, title="Live camera view height")
quality: int = Field(default=8, ge=1, le=31, title="Live camera view quality")
class RestreamCodecEnum(str, Enum):
copy = "copy"
h264 = "h264"
h265 = "h265"
class RestreamConfig(FrigateBaseModel):
enabled: bool = Field(default=True, title="Restreaming enabled.")
video_encoding: RestreamCodecEnum = Field(
default=RestreamCodecEnum.copy, title="Method for encoding the restream."
)
force_audio: bool = Field(
default=True, title="Force audio compatibility with the browser."
)
birdseye: bool = Field(default=False, title="Restream the birdseye feed via RTSP.")
jsmpeg: JsmpegStreamConfig = Field(
default_factory=JsmpegStreamConfig, title="Jsmpeg Stream Configuration."
)
class RestreamConfig(BaseModel):
class Config:
extra = Extra.allow
class CameraUiConfig(FrigateBaseModel):
@ -566,8 +615,8 @@ class CameraConfig(FrigateBaseModel):
rtmp: RtmpConfig = Field(
default_factory=RtmpConfig, title="RTMP restreaming configuration."
)
restream: RestreamConfig = Field(
default_factory=RestreamConfig, title="Restreaming configuration."
live: CameraLiveConfig = Field(
default_factory=CameraLiveConfig, title="Live playback settings."
)
snapshots: SnapshotsConfig = Field(
default_factory=SnapshotsConfig, title="Snapshot configuration."
@ -582,6 +631,9 @@ class CameraConfig(FrigateBaseModel):
detect: DetectConfig = Field(
default_factory=DetectConfig, title="Object detection configuration."
)
onvif: OnvifConfig = Field(
default_factory=OnvifConfig, title="Camera Onvif Configuration."
)
ui: CameraUiConfig = Field(
default_factory=CameraUiConfig, title="Camera UI Modifications."
)
@ -605,18 +657,14 @@ class CameraConfig(FrigateBaseModel):
# add roles to the input if there is only one
if len(config["ffmpeg"]["inputs"]) == 1:
has_rtmp = "rtmp" in config["ffmpeg"]["inputs"][0].get("roles", [])
has_audio = "detect_audio" in config["ffmpeg"]["inputs"][0].get("roles", [])
config["ffmpeg"]["inputs"][0]["roles"] = [
"record",
"detect",
"restream",
]
if has_rtmp:
config["ffmpeg"]["inputs"][0]["roles"].append("rtmp")
if has_audio:
config["ffmpeg"]["inputs"][0]["roles"].append("detect_audio")
super().__init__(**config)
@ -657,15 +705,6 @@ class CameraConfig(FrigateBaseModel):
)
ffmpeg_output_args = scale_detect_args + ffmpeg_output_args + ["pipe:"]
if "detect_audio" in ffmpeg_input.roles:
detect_args = get_ffmpeg_arg_list(self.ffmpeg.output_args.detect_audio)
pipe = f"/tmp/{self.name}-audio"
try:
os.mkfifo(pipe)
except FileExistsError:
pass
ffmpeg_output_args = detect_args + ["-y", pipe] + ffmpeg_output_args
if "rtmp" in ffmpeg_input.roles and self.rtmp.enabled:
rtmp_args = get_ffmpeg_arg_list(
parse_preset_output_rtmp(self.ffmpeg.output_args.rtmp)
@ -720,9 +759,7 @@ class CameraConfig(FrigateBaseModel):
class DatabaseConfig(FrigateBaseModel):
path: str = Field(
default=os.path.join(BASE_DIR, "frigate.db"), title="Database path."
)
path: str = Field(default=DEFAULT_DB_PATH, title="Database path.")
class LogLevelEnum(str, Enum):
@ -748,30 +785,28 @@ def verify_config_roles(camera_config: CameraConfig) -> None:
set([r for i in camera_config.ffmpeg.inputs for r in i.roles])
)
if camera_config.record.enabled and not "record" in assigned_roles:
if camera_config.record.enabled and "record" not in assigned_roles:
raise ValueError(
f"Camera {camera_config.name} has record enabled, but record is not assigned to an input."
)
if camera_config.rtmp.enabled and not "rtmp" in assigned_roles:
if camera_config.rtmp.enabled and "rtmp" not in assigned_roles:
raise ValueError(
f"Camera {camera_config.name} has rtmp enabled, but rtmp is not assigned to an input."
)
if camera_config.restream.enabled and not "restream" in assigned_roles:
raise ValueError(
f"Camera {camera_config.name} has restream enabled, but restream is not assigned to an input."
)
def verify_old_retain_config(camera_config: CameraConfig) -> None:
"""Leave log if old retain_days is used."""
if not camera_config.record.retain_days is None:
logger.warning(
"The 'retain_days' config option has been DEPRECATED and will be removed in a future version. Please use the 'days' setting under 'retain'"
def verify_valid_live_stream_name(
frigate_config: FrigateConfig, camera_config: CameraConfig
) -> ValueError | None:
"""Verify that a restream exists to use for live view."""
if (
camera_config.live.stream_name
not in frigate_config.go2rtc.dict().get("streams", {}).keys()
):
return ValueError(
f"No restream with name {camera_config.live.stream_name} exists for camera {camera_config.name}."
)
if camera_config.record.retain.days == 0:
camera_config.record.retain.days = camera_config.record.retain_days
def verify_recording_retention(camera_config: CameraConfig) -> None:
@ -835,13 +870,13 @@ class FrigateConfig(FrigateBaseModel):
default_factory=dict, title="Frigate environment variables."
)
ui: UIConfig = Field(default_factory=UIConfig, title="UI configuration.")
audio_model: AudioModelConfig = Field(
default_factory=AudioModelConfig, title="Audio model configuration."
telemetry: TelemetryConfig = Field(
default_factory=TelemetryConfig, title="Telemetry configuration."
)
model: ObjectModelConfig = Field(
default_factory=ObjectModelConfig, title="Detection model configuration."
model: ModelConfig = Field(
default_factory=ModelConfig, title="Detection model configuration."
)
detectors: Dict[str, DetectorConfig] = Field(
detectors: Dict[str, BaseDetectorConfig] = Field(
default=DEFAULT_DETECTORS,
title="Detector hardware configuration.",
)
@ -857,7 +892,10 @@ class FrigateConfig(FrigateBaseModel):
rtmp: RtmpConfig = Field(
default_factory=RtmpConfig, title="Global RTMP restreaming configuration."
)
restream: RestreamConfig = Field(
live: CameraLiveConfig = Field(
default_factory=CameraLiveConfig, title="Live playback settings."
)
go2rtc: RestreamConfig = Field(
default_factory=RestreamConfig, title="Global restream configuration."
)
birdseye: BirdseyeConfig = Field(
@ -881,13 +919,13 @@ class FrigateConfig(FrigateBaseModel):
title="Global timestamp style configuration.",
)
@property
def runtime_config(self) -> FrigateConfig:
def runtime_config(self, plus_api: PlusApi = None) -> FrigateConfig:
"""Merge camera config with globals."""
config = self.copy(deep=True)
# MQTT password substitution
if config.mqtt.password:
# MQTT user/password substitutions
if config.mqtt.user or config.mqtt.password:
config.mqtt.user = config.mqtt.user.format(**FRIGATE_ENV_VARS)
config.mqtt.password = config.mqtt.password.format(**FRIGATE_ENV_VARS)
# Global config to propagate down to camera level
@ -897,7 +935,7 @@ class FrigateConfig(FrigateBaseModel):
"record": ...,
"snapshots": ...,
"rtmp": ...,
"restream": ...,
"live": ...,
"objects": ...,
"motion": ...,
"detect": ...,
@ -922,11 +960,25 @@ class FrigateConfig(FrigateBaseModel):
stationary_threshold = camera_config.detect.fps * 10
if camera_config.detect.stationary.threshold is None:
camera_config.detect.stationary.threshold = stationary_threshold
# default to the stationary_threshold if not defined
if camera_config.detect.stationary.interval is None:
camera_config.detect.stationary.interval = stationary_threshold
# FFMPEG input substitution
for input in camera_config.ffmpeg.inputs:
input.path = input.path.format(**FRIGATE_ENV_VARS)
# ONVIF substitution
if camera_config.onvif.user or camera_config.onvif.password:
camera_config.onvif.user = camera_config.onvif.user.format(
**FRIGATE_ENV_VARS
)
camera_config.onvif.password = camera_config.onvif.password.format(
**FRIGATE_ENV_VARS
)
# set config recording value
camera_config.record.enabled_in_config = camera_config.record.enabled
# Add default filters
object_keys = camera_config.objects.track
if camera_config.objects.filters is None:
@ -970,8 +1022,12 @@ class FrigateConfig(FrigateBaseModel):
**camera_config.motion.dict(exclude_unset=True),
)
# Set live view stream if none is set
if not camera_config.live.stream_name:
camera_config.live.stream_name = name
verify_config_roles(camera_config)
verify_old_retain_config(camera_config)
verify_valid_live_stream_name(config, camera_config)
verify_recording_retention(camera_config)
verify_recording_segments_setup_with_reasonable_time(camera_config)
verify_zone_objects_are_tracked(camera_config)
@ -991,28 +1047,44 @@ class FrigateConfig(FrigateBaseModel):
for _, camera in config.cameras.items():
enabled_labels.update(camera.objects.track)
config.model.create_colormap(enabled_labels)
config.model.create_colormap(sorted(enabled_labels))
config.model.check_and_load_plus_model(plus_api)
for key, detector in config.detectors.items():
detector_config: DetectorConfig = parse_obj_as(DetectorConfig, detector)
if detector_config.model is None:
detector_config.model = config.model
else:
detector_model = detector_config.model.dict(exclude_unset=True)
# If any keys are set in the detector_model other than type or path, warn
if any(key not in ["type", "path"] for key in detector_model.keys()):
model = detector_config.model
schema = ModelConfig.schema()["properties"]
if (
model.width != schema["width"]["default"]
or model.height != schema["height"]["default"]
or model.labelmap_path is not None
or model.labelmap is not {}
or model.input_tensor != schema["input_tensor"]["default"]
or model.input_pixel_format
!= schema["input_pixel_format"]["default"]
):
logger.warning(
"Customizing more than a detector model type or path is unsupported."
"Customizing more than a detector model path is unsupported."
)
merged_model = deep_merge(
detector_model,
config.model.dict(exclude_unset=True)
if detector_config.model.type == "object"
else config.audio_model.dict(exclude_unset=True),
detector_config.model.dict(exclude_unset=True),
config.model.dict(exclude_unset=True),
)
detector_config.model = parse_obj_as(
ModelConfig, {"type": detector_config.model.type, **merged_model}
if "path" not in merged_model:
if detector_config.type == "cpu":
merged_model["path"] = "/cpu_model.tflite"
elif detector_config.type == "edgetpu":
merged_model["path"] = "/edgetpu_model.tflite"
detector_config.model = ModelConfig.parse_obj(merged_model)
detector_config.model.check_and_load_plus_model(
plus_api, detector_config.type
)
detector_config.model.compute_model_hash()
config.detectors[key] = detector_config
return config

View File

@ -1,24 +1,12 @@
import logging
from .detection_api import DetectionApi
from .detector_config import (
AudioModelConfig,
PixelFormatEnum,
InputTensorEnum,
ModelConfig,
ObjectModelConfig,
)
from .detector_types import (
DetectorTypeEnum,
api_types,
DetectorConfig,
)
from .detector_config import InputTensorEnum, ModelConfig, PixelFormatEnum # noqa: F401
from .detector_types import DetectorConfig, DetectorTypeEnum, api_types # noqa: F401
logger = logging.getLogger(__name__)
def create_detector(detector_config: DetectorConfig):
def create_detector(detector_config):
if detector_config.type == DetectorTypeEnum.cpu:
logger.warning(
"CPU detectors are not recommended and should only be used for testing or for trial purposes."

View File

@ -16,6 +16,11 @@ from frigate.util import load_labels
logger = logging.getLogger(__name__)
class ProducesEnum(str, Enum):
object = "object"
audio = "audio"
class PixelFormatEnum(str, Enum):
rgb = "rgb"
bgr = "bgr"

View File

@ -1,11 +1,16 @@
import logging
import numpy as np
from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig
from typing import Literal
from pydantic import Extra, Field
import tflite_runtime.interpreter as tflite
try:
from tflite_runtime.interpreter import Interpreter
except ModuleNotFoundError:
from tensorflow.lite.python.interpreter import Interpreter
logger = logging.getLogger(__name__)
@ -22,12 +27,8 @@ class CpuTfl(DetectionApi):
type_key = DETECTOR_KEY
def __init__(self, detector_config: CpuDetectorConfig):
self.is_audio = detector_config.model.type == "audio"
default_model = (
"/cpu_model.tflite" if not self.is_audio else "/cpu_audio_model.tflite"
)
self.interpreter = tflite.Interpreter(
model_path=detector_config.model.path or default_model,
self.interpreter = Interpreter(
model_path=detector_config.model.path,
num_threads=detector_config.num_threads or 3,
)
@ -40,29 +41,15 @@ class CpuTfl(DetectionApi):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
detections = np.zeros((20, 6), np.float32)
if self.is_audio:
res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0]
non_zero_indices = res > 0
class_ids = np.argpartition(-res, 20)[:20]
class_ids = class_ids[np.argsort(-res[class_ids])]
class_ids = class_ids[non_zero_indices[class_ids]]
scores = res[class_ids]
boxes = np.full((scores.shape[0], 4), -1, np.float32)
count = len(scores)
else:
boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
class_ids = self.interpreter.tensor(
self.tensor_output_details[1]["index"]
)()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[
0
]
class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
count = int(
self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
)
detections = np.zeros((20, 6), np.float32)
for i in range(count):
if scores[i] < 0.4 or i == 20:
break

View File

@ -1,12 +1,16 @@
import logging
import numpy as np
from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig
from typing import Literal
from pydantic import Extra, Field
import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate
try:
from tflite_runtime.interpreter import Interpreter, load_delegate
except ModuleNotFoundError:
from tensorflow.lite.python.interpreter import Interpreter, load_delegate
logger = logging.getLogger(__name__)
@ -23,7 +27,6 @@ class EdgeTpuTfl(DetectionApi):
type_key = DETECTOR_KEY
def __init__(self, detector_config: EdgeTpuDetectorConfig):
self.is_audio = detector_config.model.type == "audio"
device_config = {"device": "usb"}
if detector_config.device is not None:
device_config = {"device": detector_config.device}
@ -34,13 +37,8 @@ class EdgeTpuTfl(DetectionApi):
logger.info(f"Attempting to load TPU as {device_config['device']}")
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
logger.info("TPU found")
default_model = (
"/edgetpu_model.tflite"
if not self.is_audio
else "/edgetpu_audio_model.tflite"
)
self.interpreter = tflite.Interpreter(
model_path=detector_config.model.path or default_model,
self.interpreter = Interpreter(
model_path=detector_config.model.path,
experimental_delegates=[edge_tpu_delegate],
)
except ValueError:
@ -58,29 +56,15 @@ class EdgeTpuTfl(DetectionApi):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
detections = np.zeros((20, 6), np.float32)
if self.is_audio:
res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0]
non_zero_indices = res > 0
class_ids = np.argpartition(-res, 20)[:20]
class_ids = class_ids[np.argsort(-res[class_ids])]
class_ids = class_ids[non_zero_indices[class_ids]]
scores = res[class_ids]
boxes = np.full((scores.shape[0], 4), -1, np.float32)
count = len(scores)
else:
boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
class_ids = self.interpreter.tensor(
self.tensor_output_details[1]["index"]
)()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[
0
]
class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
count = int(
self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
)
detections = np.zeros((20, 6), np.float32)
for i in range(count):
if scores[i] < 0.4 or i == 20:
break

View File

@ -10,9 +10,8 @@ from abc import ABC, abstractmethod
import numpy as np
from setproctitle import setproctitle
from frigate.config import InputTensorEnum
from frigate.detectors import create_detector
from frigate.detectors.detector_config import InputTensorEnum
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels
logger = logging.getLogger(__name__)
@ -44,7 +43,7 @@ class LocalObjectDetector(ObjectDetector):
else:
self.labels = load_labels(labels)
if detector_config.model.type == "object":
if detector_config:
self.input_transform = tensor_transform(detector_config.model.input_tensor)
else:
self.input_transform = None
@ -88,6 +87,7 @@ def run_detector(
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
logger.info("Signal to exit detection process...")
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
@ -104,23 +104,9 @@ def run_detector(
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=5)
connection_id = detection_queue.get(timeout=1)
except queue.Empty:
continue
if detector_config.model.type == "audio":
input_frame = frame_manager.get(
connection_id,
(
int(
round(
detector_config.model.duration
* detector_config.model.sample_rate
)
),
),
dtype=np.float32,
)
else:
input_frame = frame_manager.get(
connection_id,
(1, detector_config.model.height, detector_config.model.width, 3),
@ -139,6 +125,8 @@ def run_detector(
avg_speed.value = (avg_speed.value * 9 + duration) / 10
logger.info("Exited detection process...")
class ObjectDetectProcess:
def __init__(
@ -158,6 +146,9 @@ class ObjectDetectProcess:
self.start_or_restart()
def stop(self):
# if the process has already exited on its own, just return
if self.detect_process and self.detect_process.exitcode:
return
self.detect_process.terminate()
logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
@ -165,10 +156,11 @@ class ObjectDetectProcess:
logging.info("Detection process didnt exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()
logging.info("Detection process has exited...")
def start_or_restart(self):
self.detection_start.value = 0.0
if (not self.detect_process is None) and self.detect_process.is_alive():
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
self.detect_process = mp.Process(
target=run_detector,
@ -187,20 +179,14 @@ class ObjectDetectProcess:
class RemoteObjectDetector:
def __init__(self, name, labels, detection_queue, event, model_config):
def __init__(self, name, labels, detection_queue, event, model_config, stop_event):
self.labels = labels
self.name = name
self.fps = EventsPerSecond()
self.detection_queue = detection_queue
self.event = event
self.stop_event = stop_event
self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False)
if model_config.type == "audio":
self.np_shm = np.ndarray(
(int(round(model_config.duration * model_config.sample_rate)),),
dtype=np.float32,
buffer=self.shm.buf,
)
else:
self.np_shm = np.ndarray(
(1, model_config.height, model_config.width, 3),
dtype=np.uint8,
@ -214,11 +200,14 @@ class RemoteObjectDetector:
def detect(self, tensor_input, threshold=0.4):
detections = []
if self.stop_event.is_set():
return detections
# copy input to shared memory
self.np_shm[:] = tensor_input[:]
self.event.clear()
self.detection_queue.put(self.name)
result = self.event.wait(timeout=10.0)
result = self.event.wait(timeout=5.0)
# if it timed out
if result is None:

View File

@ -1,7 +1,7 @@
from typing import Optional, TypedDict
from multiprocessing.context import Process
from multiprocessing.queues import Queue
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.context import Process
from typing import Optional, TypedDict
from frigate.object_detection import ObjectDetectProcess
@ -9,14 +9,11 @@ from frigate.object_detection import ObjectDetectProcess
class CameraMetricsTypes(TypedDict):
camera_fps: Synchronized
capture_process: Optional[Process]
audio_capture: Optional[Process]
audio_process: Optional[Process]
detection_enabled: Synchronized
detection_fps: Synchronized
detection_frame: Synchronized
ffmpeg_pid: Synchronized
frame_queue: Queue
audio_queue: Queue
motion_enabled: Synchronized
improve_contrast_enabled: Synchronized
motion_threshold: Synchronized
@ -27,8 +24,14 @@ class CameraMetricsTypes(TypedDict):
skipped_fps: Synchronized
class RecordMetricsTypes(TypedDict):
record_enabled: Synchronized
class StatsTrackingTypes(TypedDict):
camera_metrics: dict[str, CameraMetricsTypes]
detectors: dict[str, ObjectDetectProcess]
started: int
latest_frigate_version: str
last_updated: int
processes: dict[str, int]

View File

@ -1,25 +1,26 @@
import copy
import datetime
import logging
import shlex
import subprocess as sp
import json
import logging
import os
import re
import shlex
import signal
import subprocess as sp
import traceback
import urllib.parse
import yaml
from abc import ABC, abstractmethod
from collections import Counter
from collections.abc import Mapping
from multiprocessing import shared_memory
from typing import Any, AnyStr
from typing import Any, AnyStr, Optional, Tuple
import cv2
import numpy as np
import os
import psutil
import py3nvml.py3nvml as nvml
import pytz
import yaml
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
@ -454,7 +455,7 @@ def copy_yuv_to_position(
# clear v2
destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = 128
if not source_frame is None:
if source_frame is not None:
# calculate the resized frame, maintaining the aspect ratio
source_aspect_ratio = source_frame.shape[1] / (source_frame.shape[0] // 3 * 2)
dest_aspect_ratio = destination_shape[1] / destination_shape[0]
@ -571,7 +572,16 @@ def yuv_region_2_bgr(frame, region):
raise
def intersection(box_a, box_b):
def intersection(box_a, box_b) -> Optional[list[int]]:
"""Return intersection box or None if boxes do not intersect."""
if (
box_a[2] < box_b[0]
or box_a[0] > box_b[2]
or box_a[1] > box_b[3]
or box_a[3] < box_b[1]
):
return None
return (
max(box_a[0], box_b[0]),
max(box_a[1], box_b[1]),
@ -588,6 +598,9 @@ def intersection_over_union(box_a, box_b):
# determine the (x, y)-coordinates of the intersection rectangle
intersect = intersection(box_a, box_b)
if intersect is None:
return 0.0
# compute the area of intersection rectangle
inter_area = max(0, intersect[2] - intersect[0] + 1) * max(
0, intersect[3] - intersect[1] + 1
@ -721,7 +734,7 @@ def load_labels(path, encoding="utf-8"):
def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line."""
if line.startswith("rtsp://"):
if "rtsp://" in line:
return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)
else:
return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line)
@ -738,11 +751,132 @@ def escape_special_characters(path: str) -> str:
return path
def get_cgroups_version() -> str:
"""Determine what version of cgroups is enabled."""
cgroup_path = "/sys/fs/cgroup"
if not os.path.ismount(cgroup_path):
logger.debug(f"{cgroup_path} is not a mount point.")
return "unknown"
try:
with open("/proc/mounts", "r") as f:
mounts = f.readlines()
for mount in mounts:
mount_info = mount.split()
if mount_info[1] == cgroup_path:
fs_type = mount_info[2]
if fs_type == "cgroup2fs" or fs_type == "cgroup2":
return "cgroup2"
elif fs_type == "tmpfs":
return "cgroup"
else:
logger.debug(
f"Could not determine cgroups version: unhandled filesystem {fs_type}"
)
break
except Exception as e:
logger.debug(f"Could not determine cgroups version: {e}")
return "unknown"
def get_docker_memlimit_bytes() -> int:
"""Get mem limit in bytes set in docker if present. Returns -1 if no limit detected."""
# check running a supported cgroups version
if get_cgroups_version() == "cgroup2":
memlimit_path = "/sys/fs/cgroup/memory.max"
try:
with open(memlimit_path, "r") as f:
value = f.read().strip()
if value.isnumeric():
return int(value)
elif value.lower() == "max":
return -1
except Exception as e:
logger.debug(f"Unable to get docker memlimit: {e}")
return -1
def get_cpu_stats() -> dict[str, dict]:
"""Get cpu usages for each process id"""
usages = {}
# -n=2 runs to ensure extraneous values are not included
top_command = ["top", "-b", "-n", "2"]
docker_memlimit = get_docker_memlimit_bytes() / 1024
total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024
for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]):
pid = process.info["pid"]
try:
cpu_percent = process.info["cpu_percent"]
cmdline = process.info["cmdline"]
with open(f"/proc/{pid}/stat", "r") as f:
stats = f.readline().split()
utime = int(stats[13])
stime = int(stats[14])
starttime = int(stats[21])
with open("/proc/uptime") as f:
system_uptime_sec = int(float(f.read().split()[0]))
clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
process_utime_sec = utime // clk_tck
process_stime_sec = stime // clk_tck
process_starttime_sec = starttime // clk_tck
process_elapsed_sec = system_uptime_sec - process_starttime_sec
process_usage_sec = process_utime_sec + process_stime_sec
cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec
with open(f"/proc/{pid}/statm", "r") as f:
mem_stats = f.readline().split()
mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024
if docker_memlimit > 0:
mem_pct = round((mem_res / docker_memlimit) * 100, 1)
else:
mem_pct = round((mem_res / total_mem) * 100, 1)
usages[pid] = {
"cpu": str(cpu_percent),
"cpu_average": str(round(cpu_average_usage, 2)),
"mem": f"{mem_pct}",
"cmdline": " ".join(cmdline),
}
except Exception:
continue
return usages
def get_physical_interfaces(interfaces) -> list:
with open("/proc/net/dev", "r") as file:
lines = file.readlines()
physical_interfaces = []
for line in lines:
if ":" in line:
interface = line.split(":")[0].strip()
for int in interfaces:
if interface.startswith(int):
physical_interfaces.append(interface)
return physical_interfaces
def get_bandwidth_stats(config) -> dict[str, dict]:
"""Get bandwidth usages for each ffmpeg process id"""
usages = {}
top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces(
config.telemetry.network_interfaces
)
p = sp.run(
top_command,
@ -751,19 +885,20 @@ def get_cpu_stats() -> dict[str, dict]:
)
if p.returncode != 0:
logger.error(p.stderr)
return usages
else:
lines = p.stdout.split("\n")
for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split(" ")))
stats = list(filter(lambda a: a != "", line.strip().split("\t")))
try:
usages[stats[0]] = {
"cpu": stats[8],
"mem": stats[9],
if re.search(
r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0]
):
process = stats[0].split("/")
usages[process[len(process) - 2]] = {
"bandwidth": round(float(stats[1]) + float(stats[2]), 1),
}
except:
except (IndexError, ValueError):
continue
return usages
@ -780,7 +915,7 @@ def get_amd_gpu_stats() -> dict[str, str]:
)
if p.returncode != 0:
logger.error(p.stderr)
logger.error(f"Unable to poll radeon GPU stats: {p.stderr}")
return None
else:
usages = p.stdout.split(",")
@ -788,9 +923,9 @@ def get_amd_gpu_stats() -> dict[str, str]:
for hw in usages:
if "gpu" in hw:
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')} %"
results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
elif "vram" in hw:
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')} %"
results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%"
return results
@ -816,7 +951,7 @@ def get_intel_gpu_stats() -> dict[str, str]:
# timeout has a non-zero returncode when timeout is reached
if p.returncode != 124:
logger.error(p.stderr)
logger.error(f"Unable to poll intel GPU stats: {p.stderr}")
return None
else:
reading = "".join(p.stdout.split())
@ -824,7 +959,7 @@ def get_intel_gpu_stats() -> dict[str, str]:
# render is used for qsv
render = []
for result in re.findall('"Render/3D/0":{[a-z":\d.,%]+}', reading):
for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading):
packet = json.loads(result[14:])
single = packet.get("busy", 0.0)
render.append(float(single))
@ -846,37 +981,46 @@ def get_intel_gpu_stats() -> dict[str, str]:
else:
video_avg = 1
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)} %"
results["mem"] = "- %"
results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%"
results["mem"] = "-%"
return results
def get_nvidia_gpu_stats() -> dict[str, str]:
"""Get stats using nvidia-smi."""
nvidia_smi_command = [
"nvidia-smi",
"--query-gpu=gpu_name,utilization.gpu,memory.used,memory.total",
"--format=csv",
]
def try_get_info(f, h, default="N/A"):
try:
v = f(h)
except nvml.NVMLError_NotSupported:
v = default
return v
p = sp.run(
nvidia_smi_command,
encoding="ascii",
capture_output=True,
)
if p.returncode != 0:
logger.error(p.stderr)
return None
def get_nvidia_gpu_stats() -> dict[int, dict]:
results = {}
try:
nvml.nvmlInit()
deviceCount = nvml.nvmlDeviceGetCount()
for i in range(deviceCount):
handle = nvml.nvmlDeviceGetHandleByIndex(i)
meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle)
util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle)
if util != "N/A":
gpu_util = util.gpu
else:
usages = p.stdout.split("\n")[1].strip().split(",")
memory_percent = f"{round(float(usages[2].replace(' MiB', '').strip()) / float(usages[3].replace(' MiB', '').strip()) * 100, 1)} %"
results: dict[str, str] = {
"name": usages[0],
"gpu": usages[1].strip(),
"mem": memory_percent,
}
gpu_util = 0
if meminfo != "N/A":
gpu_mem_util = meminfo.used / meminfo.total * 100
else:
gpu_mem_util = -1
results[i] = {
"name": nvml.nvmlDeviceGetName(handle),
"gpu": gpu_util,
"mem": gpu_mem_util,
}
except Exception:
pass
finally:
return results
@ -898,9 +1042,13 @@ def ffprobe_stream(path: str) -> sp.CompletedProcess:
return sp.run(ffprobe_cmd, capture_output=True)
def vainfo_hwaccel() -> sp.CompletedProcess:
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
ffprobe_cmd = ["vainfo"]
ffprobe_cmd = (
["vainfo"]
if not device_name
else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"]
)
return sp.run(ffprobe_cmd, capture_output=True)
@ -915,7 +1063,7 @@ class FrameManager(ABC):
pass
@abstractmethod
def get(self, name):
def get(self, name, timeout_ms=0):
pass
@abstractmethod
@ -956,13 +1104,13 @@ class SharedMemoryFrameManager(FrameManager):
self.shm_store[name] = shm
return shm.buf
def get(self, name, shape, dtype=np.uint8):
def get(self, name, shape):
if name in self.shm_store:
shm = self.shm_store[name]
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=dtype, buffer=shm.buf)
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
def close(self, name):
if name in self.shm_store:
@ -974,3 +1122,90 @@ class SharedMemoryFrameManager(FrameManager):
self.shm_store[name].close()
self.shm_store[name].unlink()
del self.shm_store[name]
def get_tz_modifiers(tz_name: str) -> Tuple[str, str]:
seconds_offset = (
datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds()
)
hours_offset = int(seconds_offset / 60 / 60)
minutes_offset = int(seconds_offset / 60 - hours_offset * 60)
hour_modifier = f"{hours_offset} hour"
minute_modifier = f"{minutes_offset} minute"
return hour_modifier, minute_modifier
def to_relative_box(
width: int, height: int, box: Tuple[int, int, int, int]
) -> Tuple[int, int, int, int]:
return (
box[0] / width, # x
box[1] / height, # y
(box[2] - box[0]) / width, # w
(box[3] - box[1]) / height, # h
)
def get_video_properties(url, get_duration=False):
def calculate_duration(video: Optional[any]) -> float:
duration = None
if video is not None:
# Get the frames per second (fps) of the video stream
fps = video.get(cv2.CAP_PROP_FPS)
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
if fps and total_frames:
duration = total_frames / fps
# if cv2 failed need to use ffprobe
if duration is None:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{url}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
return duration
width = height = 0
try:
# Open the video stream
video = cv2.VideoCapture(url)
# Check if the video stream was opened successfully
if not video.isOpened():
video = None
except Exception:
video = None
result = {}
if get_duration:
result["duration"] = calculate_duration(video)
if video is not None:
# Get the width of frames in the video stream
width = video.get(cv2.CAP_PROP_FRAME_WIDTH)
# Get the height of frames in the video stream
height = video.get(cv2.CAP_PROP_FRAME_HEIGHT)
# Release the video stream
video.release()
result = {"width": round(width), "height": round(height)}
return result