diff --git a/frigate/app.py b/frigate/app.py index 03a7c0e28..9d85f461e 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -1,40 +1,53 @@ +import datetime import logging import multiprocessing as mp -from multiprocessing.queues import Queue -from multiprocessing.synchronize import Event as MpEvent import os +import shutil import signal import sys -from typing import Optional -from types import FrameType - import traceback +from multiprocessing.queues import Queue +from multiprocessing.synchronize import Event as MpEvent +from types import FrameType +from typing import Optional + +import psutil from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase -from frigate.audio import capture_audio, process_audio from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.mqtt import MqttClient from frigate.comms.ws import WebSocketClient from frigate.config import FrigateConfig -from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR -from frigate.object_detection import ObjectDetectProcess -from frigate.events import EventCleanup, EventProcessor +from frigate.const import ( + CACHE_DIR, + CLIPS_DIR, + CONFIG_DIR, + DEFAULT_DB_PATH, + EXPORT_DIR, + MODEL_CACHE_DIR, + RECORD_DIR, +) +from frigate.events.cleanup import EventCleanup +from frigate.events.external import ExternalEventProcessor +from frigate.events.maintainer import EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer -from frigate.models import Event, Recordings +from frigate.models import Event, Recordings, Timeline +from frigate.object_detection import ObjectDetectProcess from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.plus import PlusApi -from frigate.record import RecordingCleanup, RecordingMaintainer -from frigate.restream import RestreamApi +from frigate.ptz import OnvifController +from frigate.record.record import manage_recordings from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer +from frigate.timeline import TimelineProcessor +from frigate.types import CameraMetricsTypes, RecordMetricsTypes from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog -from frigate.types import CameraMetricsTypes logger = logging.getLogger(__name__) @@ -43,20 +56,28 @@ class FrigateApp: def __init__(self) -> None: self.stop_event: MpEvent = mp.Event() self.detection_queue: Queue = mp.Queue() - self.audio_detection_queue: Queue = mp.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} + self.record_metrics: dict[str, RecordMetricsTypes] = {} + self.processes: dict[str, int] = {} def set_environment_vars(self) -> None: for key, value in self.config.environment_vars.items(): os.environ[key] = value def ensure_dirs(self) -> None: - for d in [RECORD_DIR, CLIPS_DIR, CACHE_DIR]: + for d in [ + CONFIG_DIR, + RECORD_DIR, + CLIPS_DIR, + CACHE_DIR, + MODEL_CACHE_DIR, + EXPORT_DIR, + ]: if not os.path.exists(d) and not os.path.islink(d): logger.info(f"Creating directory: {d}") os.makedirs(d) @@ -69,6 +90,7 @@ class FrigateApp: ) self.log_process.daemon = True self.log_process.start() + self.processes["logger"] = self.log_process.pid or 0 root_configurer(self.log_queue) def init_config(self) -> None: @@ -80,7 +102,7 @@ class FrigateApp: config_file = config_file_yaml user_config = FrigateConfig.parse_file(config_file) - self.config = user_config.runtime_config + self.config = user_config.runtime_config(self.plus_api) for camera_name in self.config.cameras.keys(): # create camera_metrics @@ -106,21 +128,26 @@ class FrigateApp: "read_start": mp.Value("d", 0.0), "ffmpeg_pid": mp.Value("i", 0), "frame_queue": mp.Queue(maxsize=2), - "audio_queue": mp.Queue(maxsize=2), "capture_process": None, - "audio_capture": None, - "audio_process": None, "process": None, } + self.record_metrics[camera_name] = { + "record_enabled": mp.Value( + "i", self.config.cameras[camera_name].record.enabled + ) + } def set_log_levels(self) -> None: logging.getLogger().setLevel(self.config.logger.default.value.upper()) for log, level in self.config.logger.logs.items(): logging.getLogger(log).setLevel(level.value.upper()) - if not "werkzeug" in self.config.logger.logs: + if "werkzeug" not in self.config.logger.logs: logging.getLogger("werkzeug").setLevel("ERROR") + if "ws4py" not in self.config.logger.logs: + logging.getLogger("ws4py").setLevel("ERROR") + def init_queues(self) -> None: # Queues for clip processing self.event_queue: Queue = mp.Queue() @@ -137,9 +164,21 @@ class FrigateApp: # Queue for recordings info self.recordings_info_queue: Queue = mp.Queue() + # Queue for timeline events + self.timeline_queue: Queue = mp.Queue() + def init_database(self) -> None: + def vacuum_db(db: SqliteExtDatabase) -> None: + db.execute_sql("VACUUM;") + + try: + with open(f"{CONFIG_DIR}/.vacuum", "w") as f: + f.write(str(datetime.datetime.now().timestamp())) + except PermissionError: + logger.error("Unable to write to /config to save DB state") + # Migrate DB location - old_db_path = os.path.join(CLIPS_DIR, "frigate.db") + old_db_path = DEFAULT_DB_PATH if not os.path.isfile(self.config.database.path) and os.path.isfile( old_db_path ): @@ -153,14 +192,68 @@ class FrigateApp: router = Router(migrate_db) router.run() + # check if vacuum needs to be run + if os.path.exists(f"{CONFIG_DIR}/.vacuum"): + with open(f"{CONFIG_DIR}/.vacuum") as f: + try: + timestamp = round(float(f.readline())) + except Exception: + timestamp = 0 + + if ( + timestamp + < ( + datetime.datetime.now() - datetime.timedelta(weeks=2) + ).timestamp() + ): + vacuum_db(migrate_db) + else: + vacuum_db(migrate_db) + migrate_db.close() - self.db = SqliteQueueDatabase(self.config.database.path) - models = [Event, Recordings] + def init_go2rtc(self) -> None: + for proc in psutil.process_iter(["pid", "name"]): + if proc.info["name"] == "go2rtc": + logger.info(f"go2rtc process pid: {proc.info['pid']}") + self.processes["go2rtc"] = proc.info["pid"] + + def init_recording_manager(self) -> None: + recording_process = mp.Process( + target=manage_recordings, + name="recording_manager", + args=(self.config, self.recordings_info_queue, self.record_metrics), + ) + recording_process.daemon = True + self.recording_process = recording_process + recording_process.start() + self.processes["recording"] = recording_process.pid or 0 + logger.info(f"Recording process started: {recording_process.pid}") + + def bind_database(self) -> None: + """Bind db to the main process.""" + # NOTE: all db accessing processes need to be created before the db can be bound to the main process + self.db = SqliteQueueDatabase( + self.config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache, + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=60, + ) + models = [Event, Recordings, Timeline] self.db.bind(models) def init_stats(self) -> None: - self.stats_tracking = stats_init(self.camera_metrics, self.detectors) + self.stats_tracking = stats_init( + self.config, self.camera_metrics, self.detectors, self.processes + ) + + def init_external_event_processor(self) -> None: + self.external_event_processor = ExternalEventProcessor( + self.config, self.event_queue + ) def init_web_server(self) -> None: self.flask_app = create_app( @@ -169,12 +262,13 @@ class FrigateApp: self.stats_tracking, self.detected_frames_processor, self.storage_maintainer, + self.onvif_controller, + self.external_event_processor, self.plus_api, ) - def init_restream(self) -> None: - self.restream = RestreamApi(self.config) - self.restream.add_cameras() + def init_onvif(self) -> None: + self.onvif_controller = OnvifController(self.config) def init_dispatcher(self) -> None: comms: list[Communicator] = [] @@ -182,12 +276,17 @@ class FrigateApp: if self.config.mqtt.enabled: comms.append(MqttClient(self.config)) - self.ws_client = WebSocketClient(self.config) - comms.append(self.ws_client) - self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) + comms.append(WebSocketClient(self.config)) + self.dispatcher = Dispatcher( + self.config, + self.onvif_controller, + self.camera_metrics, + self.record_metrics, + comms, + ) def start_detectors(self) -> None: - for name, camera_config in self.config.cameras.items(): + for name in self.config.cameras.keys(): self.detection_out_events[name] = mp.Event() try: @@ -195,7 +294,6 @@ class FrigateApp: [ det.model.height * det.model.width * 3 for (name, det) in self.config.detectors.items() - if det.model.type == "object" ] ) shm_in = mp.shared_memory.SharedMemory( @@ -216,43 +314,10 @@ class FrigateApp: self.detection_shms.append(shm_in) self.detection_shms.append(shm_out) - if any( - ["detect_audio" in input.roles for input in camera_config.ffmpeg.inputs] - ): - self.detection_out_events[f"{name}-audio"] = mp.Event() - try: - shm_in_audio = mp.shared_memory.SharedMemory( - name=f"{name}-audio", - create=True, - size=int( - round( - self.config.audio_model.duration - * self.config.audio_model.sample_rate - ) - ) - * 4, # stored as float32, so 4 bytes per sample - ) - except FileExistsError: - shm_in_audio = mp.shared_memory.SharedMemory(name=f"{name}-audio") - - try: - shm_out_audio = mp.shared_memory.SharedMemory( - name=f"out-{name}-audio", create=True, size=20 * 6 * 4 - ) - except FileExistsError: - shm_out_audio = mp.shared_memory.SharedMemory( - name=f"out-{name}-audio" - ) - - self.detection_shms.append(shm_in_audio) - self.detection_shms.append(shm_out_audio) - for name, detector_config in self.config.detectors.items(): self.detectors[name] = ObjectDetectProcess( name, - self.audio_detection_queue - if detector_config.model.type == "audio" - else self.detection_queue, + self.detection_queue, self.detection_out_events, detector_config, ) @@ -273,7 +338,7 @@ class FrigateApp: def start_video_output_processor(self) -> None: output_processor = mp.Process( target=output_frames, - name=f"output_processor", + name="output_processor", args=( self.config, self.video_output_queue, @@ -284,54 +349,6 @@ class FrigateApp: output_processor.start() logger.info(f"Output process started: {output_processor.pid}") - def start_audio_processors(self) -> None: - # Make sure we have audio detectors - if not any( - [det.model.type == "audio" for det in self.config.detectors.values()] - ): - return - - for name, config in self.config.cameras.items(): - if not any( - ["detect_audio" in inputs.roles for inputs in config.ffmpeg.inputs] - ): - continue - if not config.enabled: - logger.info(f"Audio processor not started for disabled camera {name}") - continue - - audio_capture = mp.Process( - target=capture_audio, - name=f"audio_capture:{name}", - args=( - name, - self.config.audio_model, - self.camera_metrics[name], - ), - ) - audio_capture.daemon = True - self.camera_metrics[name]["audio_capture"] = audio_capture - audio_capture.start() - logger.info(f"Audio capture started for {name}: {audio_capture.pid}") - - audio_process = mp.Process( - target=process_audio, - name=f"audio_process:{name}", - args=( - name, - config, - self.config.audio_model, - self.config.audio_model.merged_labelmap, - self.audio_detection_queue, - self.detection_out_events[f"{name}-audio"], - self.camera_metrics[name], - ), - ) - audio_process.daemon = True - self.camera_metrics[name]["audio_process"] = audio_process - audio_process.start() - logger.info(f"Audio processor started for {name}: {audio_process.pid}") - def start_camera_processors(self) -> None: for name, config in self.config.cameras.items(): if not self.config.cameras[name].enabled: @@ -373,12 +390,19 @@ class FrigateApp: capture_process.start() logger.info(f"Capture process started for {name}: {capture_process.pid}") + def start_timeline_processor(self) -> None: + self.timeline_processor = TimelineProcessor( + self.config, self.timeline_queue, self.stop_event + ) + self.timeline_processor.start() + def start_event_processor(self) -> None: self.event_processor = EventProcessor( self.config, self.camera_metrics, self.event_queue, self.event_processed_queue, + self.timeline_queue, self.stop_event, ) self.event_processor.start() @@ -387,16 +411,6 @@ class FrigateApp: self.event_cleanup = EventCleanup(self.config, self.stop_event) self.event_cleanup.start() - def start_recording_maintainer(self) -> None: - self.recording_maintainer = RecordingMaintainer( - self.config, self.recordings_info_queue, self.stop_event - ) - self.recording_maintainer.start() - - def start_recording_cleanup(self) -> None: - self.recording_cleanup = RecordingCleanup(self.config, self.stop_event) - self.recording_cleanup.start() - def start_storage_maintainer(self) -> None: self.storage_maintainer = StorageMaintainer(self.config, self.stop_event) self.storage_maintainer.start() @@ -414,10 +428,27 @@ class FrigateApp: self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event) self.frigate_watchdog.start() + def check_shm(self) -> None: + available_shm = round(shutil.disk_usage("/dev/shm").total / pow(2, 20), 1) + min_req_shm = 30 + + for _, camera in self.config.cameras.items(): + min_req_shm += round( + (camera.detect.width * camera.detect.height * 1.5 * 9 + 270480) + / 1048576, + 1, + ) + + if available_shm < min_req_shm: + logger.warning( + f"The current SHM size of {available_shm}MB is too small, recommend increasing it to at least {min_req_shm}MB." + ) + def start(self) -> None: self.init_logger() logger.info(f"Starting Frigate ({VERSION})") try: + self.ensure_dirs() try: self.init_config() except Exception as e: @@ -438,32 +469,33 @@ class FrigateApp: self.log_process.terminate() sys.exit(1) self.set_environment_vars() - self.ensure_dirs() self.set_log_levels() self.init_queues() self.init_database() + self.init_onvif() + self.init_recording_manager() + self.init_go2rtc() + self.bind_database() self.init_dispatcher() except Exception as e: print(e) self.log_process.terminate() sys.exit(1) - self.init_restream() self.start_detectors() self.start_video_output_processor() self.start_detected_frames_processor() - self.start_audio_processors() self.start_camera_processors() self.start_camera_capture_processes() self.start_storage_maintainer() self.init_stats() + self.init_external_event_processor() self.init_web_server() + self.start_timeline_processor() self.start_event_processor() self.start_event_cleanup() - self.start_recording_maintainer() - self.start_recording_cleanup() self.start_stats_emitter() self.start_watchdog() - # self.zeroconf = broadcast_zeroconf(self.config.mqtt.client_id) + self.check_shm() def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: self.stop() @@ -479,23 +511,41 @@ class FrigateApp: self.stop() def stop(self) -> None: - logger.info(f"Stopping...") + logger.info("Stopping...") self.stop_event.set() - self.ws_client.stop() - self.detected_frames_processor.join() - self.event_processor.join() - self.event_cleanup.join() - self.recording_maintainer.join() - self.recording_cleanup.join() - self.stats_emitter.join() - self.frigate_watchdog.join() - self.db.stop() - for detector in self.detectors.values(): detector.stop() + # Empty the detection queue and set the events for all requests + while not self.detection_queue.empty(): + connection_id = self.detection_queue.get(timeout=1) + self.detection_out_events[connection_id].set() + self.detection_queue.close() + self.detection_queue.join_thread() + + self.dispatcher.stop() + self.detected_frames_processor.join() + self.event_processor.join() + self.event_cleanup.join() + self.stats_emitter.join() + self.frigate_watchdog.join() + self.db.stop() + while len(self.detection_shms) > 0: shm = self.detection_shms.pop() shm.close() shm.unlink() + + for queue in [ + self.event_queue, + self.event_processed_queue, + self.video_output_queue, + self.detected_frames_queue, + self.recordings_info_queue, + self.log_queue, + ]: + while not queue.empty(): + queue.get_nowait() + queue.close() + queue.join_thread() diff --git a/frigate/config.py b/frigate/config.py index c9893ffe3..9b434ca1e 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -8,24 +8,12 @@ from typing import Dict, List, Optional, Tuple, Union import matplotlib.pyplot as plt import numpy as np -import yaml -from pydantic import BaseModel, Extra, Field, validator, parse_obj_as +from pydantic import BaseModel, Extra, Field, parse_obj_as, validator from pydantic.fields import PrivateAttr -from frigate.const import ( - BASE_DIR, - CACHE_DIR, - REGEX_CAMERA_NAME, - YAML_EXT, -) -from frigate.util import ( - create_mask, - deep_merge, - get_ffmpeg_arg_list, - escape_special_characters, - load_config_with_no_duplicates, - load_labels, -) +from frigate.const import CACHE_DIR, DEFAULT_DB_PATH, REGEX_CAMERA_NAME, YAML_EXT +from frigate.detectors import DetectorConfig, ModelConfig +from frigate.detectors.detector_config import BaseDetectorConfig from frigate.ffmpeg_presets import ( parse_preset_hardware_acceleration_decode, parse_preset_hardware_acceleration_scale, @@ -33,16 +21,14 @@ from frigate.ffmpeg_presets import ( parse_preset_output_record, parse_preset_output_rtmp, ) -from frigate.detectors import ( - PixelFormatEnum, - InputTensorEnum, - DetectorConfig, - ModelConfig, - AudioModelConfig, - ObjectModelConfig, +from frigate.plus import PlusApi +from frigate.util import ( + create_mask, + deep_merge, + escape_special_characters, + get_ffmpeg_arg_list, + load_config_with_no_duplicates, ) -from frigate.version import VERSION - logger = logging.getLogger(__name__) @@ -53,7 +39,7 @@ DEFAULT_TIME_FORMAT = "%m/%d/%Y %H:%M:%S" FRIGATE_ENV_VARS = {k: v for k, v in os.environ.items() if k.startswith("FRIGATE_")} -DEFAULT_TRACKED_OBJECTS = ["person", "Speech"] +DEFAULT_TRACKED_OBJECTS = ["person"] DEFAULT_DETECTORS = {"cpu": {"type": "cpu"}} @@ -62,8 +48,62 @@ class FrigateBaseModel(BaseModel): extra = Extra.forbid +class LiveModeEnum(str, Enum): + jsmpeg = "jsmpeg" + mse = "mse" + webrtc = "webrtc" + + +class TimeFormatEnum(str, Enum): + browser = "browser" + hours12 = "12hour" + hours24 = "24hour" + + +class DateTimeStyleEnum(str, Enum): + full = "full" + long = "long" + medium = "medium" + short = "short" + + class UIConfig(FrigateBaseModel): + live_mode: LiveModeEnum = Field( + default=LiveModeEnum.mse, title="Default Live Mode." + ) + timezone: Optional[str] = Field(title="Override UI timezone.") use_experimental: bool = Field(default=False, title="Experimental UI") + time_format: TimeFormatEnum = Field( + default=TimeFormatEnum.browser, title="Override UI time format." + ) + date_style: DateTimeStyleEnum = Field( + default=DateTimeStyleEnum.short, title="Override UI dateStyle." + ) + time_style: DateTimeStyleEnum = Field( + default=DateTimeStyleEnum.medium, title="Override UI timeStyle." + ) + strftime_fmt: Optional[str] = Field( + default=None, title="Override date and time format using strftime syntax." + ) + + +class StatsConfig(FrigateBaseModel): + amd_gpu_stats: bool = Field(default=True, title="Enable AMD GPU stats.") + intel_gpu_stats: bool = Field(default=True, title="Enable Intel GPU stats.") + network_bandwidth: bool = Field( + default=False, title="Enable network bandwidth for ffmpeg processes." + ) + + +class TelemetryConfig(FrigateBaseModel): + network_interfaces: List[str] = Field( + default=["eth", "enp", "eno", "ens", "wl", "lo"], + title="Enabled network interfaces for bandwidth calculation.", + ) + stats: StatsConfig = Field( + default_factory=StatsConfig, title="System Stats Configuration" + ) + version_check: bool = Field(default=True, title="Enable latest version check.") class MqttConfig(FrigateBaseModel): @@ -87,6 +127,13 @@ class MqttConfig(FrigateBaseModel): return v +class OnvifConfig(FrigateBaseModel): + host: str = Field(default="", title="Onvif Host") + port: int = Field(default=8000, title="Onvif Port") + user: Optional[str] = Field(title="Onvif Username") + password: Optional[str] = Field(title="Onvif Password") + + class RetainModeEnum(str, Enum): all = "all" motion = "motion" @@ -127,27 +174,31 @@ class RecordConfig(FrigateBaseModel): default=60, title="Number of minutes to wait between cleanup runs.", ) - # deprecated - to be removed in a future version - retain_days: Optional[float] = Field(title="Recording retention period in days.") retain: RecordRetainConfig = Field( default_factory=RecordRetainConfig, title="Record retention settings." ) events: EventsConfig = Field( default_factory=EventsConfig, title="Event specific settings." ) + enabled_in_config: Optional[bool] = Field( + title="Keep track of original state of recording." + ) class MotionConfig(FrigateBaseModel): threshold: int = Field( - default=25, + default=40, title="Motion detection threshold (1-255).", ge=1, le=255, ) - improve_contrast: bool = Field(default=False, title="Improve Contrast") - contour_area: Optional[int] = Field(default=30, title="Contour Area") + lightning_threshold: float = Field( + default=0.8, title="Lightning detection threshold (0.3-1.0).", ge=0.3, le=1.0 + ) + improve_contrast: bool = Field(default=True, title="Improve Contrast") + contour_area: Optional[int] = Field(default=15, title="Contour Area") delta_alpha: float = Field(default=0.2, title="Delta Alpha") - frame_alpha: float = Field(default=0.2, title="Frame Alpha") + frame_alpha: float = Field(default=0.02, title="Frame Alpha") frame_height: Optional[int] = Field(default=50, title="Frame Height") mask: Union[str, List[str]] = Field( default="", title="Coordinates polygon for the motion mask." @@ -198,9 +249,8 @@ class StationaryMaxFramesConfig(FrigateBaseModel): class StationaryConfig(FrigateBaseModel): interval: Optional[int] = Field( - default=0, title="Frame interval for checking stationary objects.", - ge=0, + gt=0, ) threshold: Optional[int] = Field( title="Number of frames without a position change for an object to be considered stationary", @@ -226,6 +276,9 @@ class DetectConfig(FrigateBaseModel): default_factory=StationaryConfig, title="Stationary objects config.", ) + annotation_offset: int = Field( + default=0, title="Milliseconds to offset detect annotations by." + ) class FilterConfig(FrigateBaseModel): @@ -288,6 +341,12 @@ class ZoneConfig(BaseModel): coordinates: Union[str, List[str]] = Field( title="Coordinates polygon for the defined zone." ) + inertia: int = Field( + default=3, + title="Number of consecutive frames required for object to be considered present in the zone.", + gt=0, + le=10, + ) objects: List[str] = Field( default_factory=list, title="List of objects that can trigger the zone.", @@ -336,6 +395,7 @@ class BirdseyeModeEnum(str, Enum): class BirdseyeConfig(FrigateBaseModel): enabled: bool = Field(default=True, title="Enable birdseye view.") + restream: bool = Field(default=False, title="Restream birdseye via RTSP.") width: int = Field(default=1280, title="Birdseye width.") height: int = Field(default=720, title="Birdseye height.") quality: int = Field( @@ -352,15 +412,24 @@ class BirdseyeConfig(FrigateBaseModel): # uses BaseModel because some global attributes are not available at the camera level class BirdseyeCameraConfig(BaseModel): enabled: bool = Field(default=True, title="Enable birdseye view for camera.") + order: int = Field(default=0, title="Position of the camera in the birdseye view.") mode: BirdseyeModeEnum = Field( default=BirdseyeModeEnum.objects, title="Tracking mode for camera." ) -FFMPEG_GLOBAL_ARGS_DEFAULT = ["-hide_banner", "-loglevel", "warning"] +# Note: Setting threads to less than 2 caused several issues with recording segments +# https://github.com/blakeblackshear/frigate/issues/5659 +FFMPEG_GLOBAL_ARGS_DEFAULT = ["-hide_banner", "-loglevel", "warning", "-threads", "2"] FFMPEG_INPUT_ARGS_DEFAULT = "preset-rtsp-generic" -DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "rawvideo", "-pix_fmt", "yuv420p"] -DETECT_AUDIO_FFMPEG_OUTPUT_ARGS_DEFAULT = ["-f", "s16le", "-ar", "16000", "-ac", "1"] +DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT = [ + "-threads", + "2", + "-f", + "rawvideo", + "-pix_fmt", + "yuv420p", +] RTMP_FFMPEG_OUTPUT_ARGS_DEFAULT = "preset-rtmp-generic" RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT = "preset-record-generic" @@ -370,10 +439,6 @@ class FfmpegOutputArgsConfig(FrigateBaseModel): default=DETECT_FFMPEG_OUTPUT_ARGS_DEFAULT, title="Detect role FFmpeg output arguments.", ) - detect_audio: Union[str, List[str]] = Field( - default=DETECT_AUDIO_FFMPEG_OUTPUT_ARGS_DEFAULT, - title="Detect role FFmpeg output arguments.", - ) record: Union[str, List[str]] = Field( default=RECORD_FFMPEG_OUTPUT_ARGS_DEFAULT, title="Record role FFmpeg output arguments.", @@ -402,10 +467,8 @@ class FfmpegConfig(FrigateBaseModel): class CameraRoleEnum(str, Enum): record = "record" - restream = "restream" rtmp = "rtmp" detect = "detect" - detect_audio = "detect_audio" class CameraInput(FrigateBaseModel): @@ -433,7 +496,7 @@ class CameraFfmpegConfig(FfmpegConfig): if len(roles) > len(roles_set): raise ValueError("Each input role may only be used once.") - if not "detect" in roles: + if "detect" not in roles: raise ValueError("The detect role is required.") return v @@ -517,29 +580,15 @@ class RtmpConfig(FrigateBaseModel): enabled: bool = Field(default=False, title="RTMP restreaming enabled.") -class JsmpegStreamConfig(FrigateBaseModel): - height: int = Field(default=720, title="Live camera view height.") - quality: int = Field(default=8, ge=1, le=31, title="Live camera view quality.") +class CameraLiveConfig(FrigateBaseModel): + stream_name: str = Field(default="", title="Name of restream to use as live view.") + height: int = Field(default=720, title="Live camera view height") + quality: int = Field(default=8, ge=1, le=31, title="Live camera view quality") -class RestreamCodecEnum(str, Enum): - copy = "copy" - h264 = "h264" - h265 = "h265" - - -class RestreamConfig(FrigateBaseModel): - enabled: bool = Field(default=True, title="Restreaming enabled.") - video_encoding: RestreamCodecEnum = Field( - default=RestreamCodecEnum.copy, title="Method for encoding the restream." - ) - force_audio: bool = Field( - default=True, title="Force audio compatibility with the browser." - ) - birdseye: bool = Field(default=False, title="Restream the birdseye feed via RTSP.") - jsmpeg: JsmpegStreamConfig = Field( - default_factory=JsmpegStreamConfig, title="Jsmpeg Stream Configuration." - ) +class RestreamConfig(BaseModel): + class Config: + extra = Extra.allow class CameraUiConfig(FrigateBaseModel): @@ -566,8 +615,8 @@ class CameraConfig(FrigateBaseModel): rtmp: RtmpConfig = Field( default_factory=RtmpConfig, title="RTMP restreaming configuration." ) - restream: RestreamConfig = Field( - default_factory=RestreamConfig, title="Restreaming configuration." + live: CameraLiveConfig = Field( + default_factory=CameraLiveConfig, title="Live playback settings." ) snapshots: SnapshotsConfig = Field( default_factory=SnapshotsConfig, title="Snapshot configuration." @@ -582,6 +631,9 @@ class CameraConfig(FrigateBaseModel): detect: DetectConfig = Field( default_factory=DetectConfig, title="Object detection configuration." ) + onvif: OnvifConfig = Field( + default_factory=OnvifConfig, title="Camera Onvif Configuration." + ) ui: CameraUiConfig = Field( default_factory=CameraUiConfig, title="Camera UI Modifications." ) @@ -605,18 +657,14 @@ class CameraConfig(FrigateBaseModel): # add roles to the input if there is only one if len(config["ffmpeg"]["inputs"]) == 1: has_rtmp = "rtmp" in config["ffmpeg"]["inputs"][0].get("roles", []) - has_audio = "detect_audio" in config["ffmpeg"]["inputs"][0].get("roles", []) config["ffmpeg"]["inputs"][0]["roles"] = [ "record", "detect", - "restream", ] if has_rtmp: config["ffmpeg"]["inputs"][0]["roles"].append("rtmp") - if has_audio: - config["ffmpeg"]["inputs"][0]["roles"].append("detect_audio") super().__init__(**config) @@ -657,15 +705,6 @@ class CameraConfig(FrigateBaseModel): ) ffmpeg_output_args = scale_detect_args + ffmpeg_output_args + ["pipe:"] - if "detect_audio" in ffmpeg_input.roles: - detect_args = get_ffmpeg_arg_list(self.ffmpeg.output_args.detect_audio) - - pipe = f"/tmp/{self.name}-audio" - try: - os.mkfifo(pipe) - except FileExistsError: - pass - ffmpeg_output_args = detect_args + ["-y", pipe] + ffmpeg_output_args if "rtmp" in ffmpeg_input.roles and self.rtmp.enabled: rtmp_args = get_ffmpeg_arg_list( parse_preset_output_rtmp(self.ffmpeg.output_args.rtmp) @@ -720,9 +759,7 @@ class CameraConfig(FrigateBaseModel): class DatabaseConfig(FrigateBaseModel): - path: str = Field( - default=os.path.join(BASE_DIR, "frigate.db"), title="Database path." - ) + path: str = Field(default=DEFAULT_DB_PATH, title="Database path.") class LogLevelEnum(str, Enum): @@ -748,30 +785,28 @@ def verify_config_roles(camera_config: CameraConfig) -> None: set([r for i in camera_config.ffmpeg.inputs for r in i.roles]) ) - if camera_config.record.enabled and not "record" in assigned_roles: + if camera_config.record.enabled and "record" not in assigned_roles: raise ValueError( f"Camera {camera_config.name} has record enabled, but record is not assigned to an input." ) - if camera_config.rtmp.enabled and not "rtmp" in assigned_roles: + if camera_config.rtmp.enabled and "rtmp" not in assigned_roles: raise ValueError( f"Camera {camera_config.name} has rtmp enabled, but rtmp is not assigned to an input." ) - if camera_config.restream.enabled and not "restream" in assigned_roles: - raise ValueError( - f"Camera {camera_config.name} has restream enabled, but restream is not assigned to an input." - ) - -def verify_old_retain_config(camera_config: CameraConfig) -> None: - """Leave log if old retain_days is used.""" - if not camera_config.record.retain_days is None: - logger.warning( - "The 'retain_days' config option has been DEPRECATED and will be removed in a future version. Please use the 'days' setting under 'retain'" +def verify_valid_live_stream_name( + frigate_config: FrigateConfig, camera_config: CameraConfig +) -> ValueError | None: + """Verify that a restream exists to use for live view.""" + if ( + camera_config.live.stream_name + not in frigate_config.go2rtc.dict().get("streams", {}).keys() + ): + return ValueError( + f"No restream with name {camera_config.live.stream_name} exists for camera {camera_config.name}." ) - if camera_config.record.retain.days == 0: - camera_config.record.retain.days = camera_config.record.retain_days def verify_recording_retention(camera_config: CameraConfig) -> None: @@ -835,13 +870,13 @@ class FrigateConfig(FrigateBaseModel): default_factory=dict, title="Frigate environment variables." ) ui: UIConfig = Field(default_factory=UIConfig, title="UI configuration.") - audio_model: AudioModelConfig = Field( - default_factory=AudioModelConfig, title="Audio model configuration." + telemetry: TelemetryConfig = Field( + default_factory=TelemetryConfig, title="Telemetry configuration." ) - model: ObjectModelConfig = Field( - default_factory=ObjectModelConfig, title="Detection model configuration." + model: ModelConfig = Field( + default_factory=ModelConfig, title="Detection model configuration." ) - detectors: Dict[str, DetectorConfig] = Field( + detectors: Dict[str, BaseDetectorConfig] = Field( default=DEFAULT_DETECTORS, title="Detector hardware configuration.", ) @@ -857,7 +892,10 @@ class FrigateConfig(FrigateBaseModel): rtmp: RtmpConfig = Field( default_factory=RtmpConfig, title="Global RTMP restreaming configuration." ) - restream: RestreamConfig = Field( + live: CameraLiveConfig = Field( + default_factory=CameraLiveConfig, title="Live playback settings." + ) + go2rtc: RestreamConfig = Field( default_factory=RestreamConfig, title="Global restream configuration." ) birdseye: BirdseyeConfig = Field( @@ -881,13 +919,13 @@ class FrigateConfig(FrigateBaseModel): title="Global timestamp style configuration.", ) - @property - def runtime_config(self) -> FrigateConfig: + def runtime_config(self, plus_api: PlusApi = None) -> FrigateConfig: """Merge camera config with globals.""" config = self.copy(deep=True) - # MQTT password substitution - if config.mqtt.password: + # MQTT user/password substitutions + if config.mqtt.user or config.mqtt.password: + config.mqtt.user = config.mqtt.user.format(**FRIGATE_ENV_VARS) config.mqtt.password = config.mqtt.password.format(**FRIGATE_ENV_VARS) # Global config to propagate down to camera level @@ -897,7 +935,7 @@ class FrigateConfig(FrigateBaseModel): "record": ..., "snapshots": ..., "rtmp": ..., - "restream": ..., + "live": ..., "objects": ..., "motion": ..., "detect": ..., @@ -922,11 +960,25 @@ class FrigateConfig(FrigateBaseModel): stationary_threshold = camera_config.detect.fps * 10 if camera_config.detect.stationary.threshold is None: camera_config.detect.stationary.threshold = stationary_threshold + # default to the stationary_threshold if not defined + if camera_config.detect.stationary.interval is None: + camera_config.detect.stationary.interval = stationary_threshold # FFMPEG input substitution for input in camera_config.ffmpeg.inputs: input.path = input.path.format(**FRIGATE_ENV_VARS) + # ONVIF substitution + if camera_config.onvif.user or camera_config.onvif.password: + camera_config.onvif.user = camera_config.onvif.user.format( + **FRIGATE_ENV_VARS + ) + camera_config.onvif.password = camera_config.onvif.password.format( + **FRIGATE_ENV_VARS + ) + # set config recording value + camera_config.record.enabled_in_config = camera_config.record.enabled + # Add default filters object_keys = camera_config.objects.track if camera_config.objects.filters is None: @@ -970,8 +1022,12 @@ class FrigateConfig(FrigateBaseModel): **camera_config.motion.dict(exclude_unset=True), ) + # Set live view stream if none is set + if not camera_config.live.stream_name: + camera_config.live.stream_name = name + verify_config_roles(camera_config) - verify_old_retain_config(camera_config) + verify_valid_live_stream_name(config, camera_config) verify_recording_retention(camera_config) verify_recording_segments_setup_with_reasonable_time(camera_config) verify_zone_objects_are_tracked(camera_config) @@ -991,28 +1047,44 @@ class FrigateConfig(FrigateBaseModel): for _, camera in config.cameras.items(): enabled_labels.update(camera.objects.track) - config.model.create_colormap(enabled_labels) + config.model.create_colormap(sorted(enabled_labels)) + config.model.check_and_load_plus_model(plus_api) for key, detector in config.detectors.items(): detector_config: DetectorConfig = parse_obj_as(DetectorConfig, detector) if detector_config.model is None: detector_config.model = config.model else: - detector_model = detector_config.model.dict(exclude_unset=True) - # If any keys are set in the detector_model other than type or path, warn - if any(key not in ["type", "path"] for key in detector_model.keys()): + model = detector_config.model + schema = ModelConfig.schema()["properties"] + if ( + model.width != schema["width"]["default"] + or model.height != schema["height"]["default"] + or model.labelmap_path is not None + or model.labelmap is not {} + or model.input_tensor != schema["input_tensor"]["default"] + or model.input_pixel_format + != schema["input_pixel_format"]["default"] + ): logger.warning( - "Customizing more than a detector model type or path is unsupported." + "Customizing more than a detector model path is unsupported." ) - merged_model = deep_merge( - detector_model, - config.model.dict(exclude_unset=True) - if detector_config.model.type == "object" - else config.audio_model.dict(exclude_unset=True), - ) - detector_config.model = parse_obj_as( - ModelConfig, {"type": detector_config.model.type, **merged_model} - ) + merged_model = deep_merge( + detector_config.model.dict(exclude_unset=True), + config.model.dict(exclude_unset=True), + ) + + if "path" not in merged_model: + if detector_config.type == "cpu": + merged_model["path"] = "/cpu_model.tflite" + elif detector_config.type == "edgetpu": + merged_model["path"] = "/edgetpu_model.tflite" + + detector_config.model = ModelConfig.parse_obj(merged_model) + detector_config.model.check_and_load_plus_model( + plus_api, detector_config.type + ) + detector_config.model.compute_model_hash() config.detectors[key] = detector_config return config diff --git a/frigate/detectors/__init__.py b/frigate/detectors/__init__.py index a1fdef4ac..7465ed7c0 100644 --- a/frigate/detectors/__init__.py +++ b/frigate/detectors/__init__.py @@ -1,24 +1,12 @@ import logging -from .detection_api import DetectionApi -from .detector_config import ( - AudioModelConfig, - PixelFormatEnum, - InputTensorEnum, - ModelConfig, - ObjectModelConfig, -) -from .detector_types import ( - DetectorTypeEnum, - api_types, - DetectorConfig, -) - +from .detector_config import InputTensorEnum, ModelConfig, PixelFormatEnum # noqa: F401 +from .detector_types import DetectorConfig, DetectorTypeEnum, api_types # noqa: F401 logger = logging.getLogger(__name__) -def create_detector(detector_config: DetectorConfig): +def create_detector(detector_config): if detector_config.type == DetectorTypeEnum.cpu: logger.warning( "CPU detectors are not recommended and should only be used for testing or for trial purposes." diff --git a/frigate/detectors/detector_config.py b/frigate/detectors/detector_config.py index f65826a57..6a179429a 100644 --- a/frigate/detectors/detector_config.py +++ b/frigate/detectors/detector_config.py @@ -16,6 +16,11 @@ from frigate.util import load_labels logger = logging.getLogger(__name__) +class ProducesEnum(str, Enum): + object = "object" + audio = "audio" + + class PixelFormatEnum(str, Enum): rgb = "rgb" bgr = "bgr" diff --git a/frigate/detectors/plugins/cpu_tfl.py b/frigate/detectors/plugins/cpu_tfl.py index b5a2a0a8c..8a54363e1 100644 --- a/frigate/detectors/plugins/cpu_tfl.py +++ b/frigate/detectors/plugins/cpu_tfl.py @@ -1,11 +1,16 @@ import logging + import numpy as np +from pydantic import Field +from typing_extensions import Literal from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detector_config import BaseDetectorConfig -from typing import Literal -from pydantic import Extra, Field -import tflite_runtime.interpreter as tflite + +try: + from tflite_runtime.interpreter import Interpreter +except ModuleNotFoundError: + from tensorflow.lite.python.interpreter import Interpreter logger = logging.getLogger(__name__) @@ -22,12 +27,8 @@ class CpuTfl(DetectionApi): type_key = DETECTOR_KEY def __init__(self, detector_config: CpuDetectorConfig): - self.is_audio = detector_config.model.type == "audio" - default_model = ( - "/cpu_model.tflite" if not self.is_audio else "/cpu_audio_model.tflite" - ) - self.interpreter = tflite.Interpreter( - model_path=detector_config.model.path or default_model, + self.interpreter = Interpreter( + model_path=detector_config.model.path, num_threads=detector_config.num_threads or 3, ) @@ -40,28 +41,14 @@ class CpuTfl(DetectionApi): self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.invoke() - detections = np.zeros((20, 6), np.float32) + boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] + class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] + scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] + count = int( + self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] + ) - if self.is_audio: - res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0] - non_zero_indices = res > 0 - class_ids = np.argpartition(-res, 20)[:20] - class_ids = class_ids[np.argsort(-res[class_ids])] - class_ids = class_ids[non_zero_indices[class_ids]] - scores = res[class_ids] - boxes = np.full((scores.shape[0], 4), -1, np.float32) - count = len(scores) - else: - boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] - class_ids = self.interpreter.tensor( - self.tensor_output_details[1]["index"] - )()[0] - scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[ - 0 - ] - count = int( - self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] - ) + detections = np.zeros((20, 6), np.float32) for i in range(count): if scores[i] < 0.4 or i == 20: diff --git a/frigate/detectors/plugins/edgetpu_tfl.py b/frigate/detectors/plugins/edgetpu_tfl.py index 7f0d7f901..ca03d483b 100644 --- a/frigate/detectors/plugins/edgetpu_tfl.py +++ b/frigate/detectors/plugins/edgetpu_tfl.py @@ -1,12 +1,16 @@ import logging + import numpy as np +from pydantic import Field +from typing_extensions import Literal from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detector_config import BaseDetectorConfig -from typing import Literal -from pydantic import Extra, Field -import tflite_runtime.interpreter as tflite -from tflite_runtime.interpreter import load_delegate + +try: + from tflite_runtime.interpreter import Interpreter, load_delegate +except ModuleNotFoundError: + from tensorflow.lite.python.interpreter import Interpreter, load_delegate logger = logging.getLogger(__name__) @@ -23,7 +27,6 @@ class EdgeTpuTfl(DetectionApi): type_key = DETECTOR_KEY def __init__(self, detector_config: EdgeTpuDetectorConfig): - self.is_audio = detector_config.model.type == "audio" device_config = {"device": "usb"} if detector_config.device is not None: device_config = {"device": detector_config.device} @@ -34,13 +37,8 @@ class EdgeTpuTfl(DetectionApi): logger.info(f"Attempting to load TPU as {device_config['device']}") edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config) logger.info("TPU found") - default_model = ( - "/edgetpu_model.tflite" - if not self.is_audio - else "/edgetpu_audio_model.tflite" - ) - self.interpreter = tflite.Interpreter( - model_path=detector_config.model.path or default_model, + self.interpreter = Interpreter( + model_path=detector_config.model.path, experimental_delegates=[edge_tpu_delegate], ) except ValueError: @@ -58,28 +56,14 @@ class EdgeTpuTfl(DetectionApi): self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.invoke() - detections = np.zeros((20, 6), np.float32) + boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] + class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] + scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] + count = int( + self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] + ) - if self.is_audio: - res = self.interpreter.get_tensor(self.tensor_output_details[0]["index"])[0] - non_zero_indices = res > 0 - class_ids = np.argpartition(-res, 20)[:20] - class_ids = class_ids[np.argsort(-res[class_ids])] - class_ids = class_ids[non_zero_indices[class_ids]] - scores = res[class_ids] - boxes = np.full((scores.shape[0], 4), -1, np.float32) - count = len(scores) - else: - boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] - class_ids = self.interpreter.tensor( - self.tensor_output_details[1]["index"] - )()[0] - scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[ - 0 - ] - count = int( - self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] - ) + detections = np.zeros((20, 6), np.float32) for i in range(count): if scores[i] < 0.4 or i == 20: diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 4dccccc59..0a2a7059c 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -10,9 +10,8 @@ from abc import ABC, abstractmethod import numpy as np from setproctitle import setproctitle -from frigate.config import InputTensorEnum from frigate.detectors import create_detector - +from frigate.detectors.detector_config import InputTensorEnum from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels logger = logging.getLogger(__name__) @@ -44,7 +43,7 @@ class LocalObjectDetector(ObjectDetector): else: self.labels = load_labels(labels) - if detector_config.model.type == "object": + if detector_config: self.input_transform = tensor_transform(detector_config.model.input_tensor) else: self.input_transform = None @@ -88,6 +87,7 @@ def run_detector( stop_event = mp.Event() def receiveSignal(signalNumber, frame): + logger.info("Signal to exit detection process...") stop_event.set() signal.signal(signal.SIGTERM, receiveSignal) @@ -104,27 +104,13 @@ def run_detector( while not stop_event.is_set(): try: - connection_id = detection_queue.get(timeout=5) + connection_id = detection_queue.get(timeout=1) except queue.Empty: continue - if detector_config.model.type == "audio": - input_frame = frame_manager.get( - connection_id, - ( - int( - round( - detector_config.model.duration - * detector_config.model.sample_rate - ) - ), - ), - dtype=np.float32, - ) - else: - input_frame = frame_manager.get( - connection_id, - (1, detector_config.model.height, detector_config.model.width, 3), - ) + input_frame = frame_manager.get( + connection_id, + (1, detector_config.model.height, detector_config.model.width, 3), + ) if input_frame is None: continue @@ -139,6 +125,8 @@ def run_detector( avg_speed.value = (avg_speed.value * 9 + duration) / 10 + logger.info("Exited detection process...") + class ObjectDetectProcess: def __init__( @@ -158,6 +146,9 @@ class ObjectDetectProcess: self.start_or_restart() def stop(self): + # if the process has already exited on its own, just return + if self.detect_process and self.detect_process.exitcode: + return self.detect_process.terminate() logging.info("Waiting for detection process to exit gracefully...") self.detect_process.join(timeout=30) @@ -165,10 +156,11 @@ class ObjectDetectProcess: logging.info("Detection process didnt exit. Force killing...") self.detect_process.kill() self.detect_process.join() + logging.info("Detection process has exited...") def start_or_restart(self): self.detection_start.value = 0.0 - if (not self.detect_process is None) and self.detect_process.is_alive(): + if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() self.detect_process = mp.Process( target=run_detector, @@ -187,25 +179,19 @@ class ObjectDetectProcess: class RemoteObjectDetector: - def __init__(self, name, labels, detection_queue, event, model_config): + def __init__(self, name, labels, detection_queue, event, model_config, stop_event): self.labels = labels self.name = name self.fps = EventsPerSecond() self.detection_queue = detection_queue self.event = event + self.stop_event = stop_event self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False) - if model_config.type == "audio": - self.np_shm = np.ndarray( - (int(round(model_config.duration * model_config.sample_rate)),), - dtype=np.float32, - buffer=self.shm.buf, - ) - else: - self.np_shm = np.ndarray( - (1, model_config.height, model_config.width, 3), - dtype=np.uint8, - buffer=self.shm.buf, - ) + self.np_shm = np.ndarray( + (1, model_config.height, model_config.width, 3), + dtype=np.uint8, + buffer=self.shm.buf, + ) self.out_shm = mp.shared_memory.SharedMemory( name=f"out-{self.name}", create=False ) @@ -214,11 +200,14 @@ class RemoteObjectDetector: def detect(self, tensor_input, threshold=0.4): detections = [] + if self.stop_event.is_set(): + return detections + # copy input to shared memory self.np_shm[:] = tensor_input[:] self.event.clear() self.detection_queue.put(self.name) - result = self.event.wait(timeout=10.0) + result = self.event.wait(timeout=5.0) # if it timed out if result is None: diff --git a/frigate/types.py b/frigate/types.py index 7d2fd1533..8c3e54654 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -1,7 +1,7 @@ -from typing import Optional, TypedDict +from multiprocessing.context import Process from multiprocessing.queues import Queue from multiprocessing.sharedctypes import Synchronized -from multiprocessing.context import Process +from typing import Optional, TypedDict from frigate.object_detection import ObjectDetectProcess @@ -9,14 +9,11 @@ from frigate.object_detection import ObjectDetectProcess class CameraMetricsTypes(TypedDict): camera_fps: Synchronized capture_process: Optional[Process] - audio_capture: Optional[Process] - audio_process: Optional[Process] detection_enabled: Synchronized detection_fps: Synchronized detection_frame: Synchronized ffmpeg_pid: Synchronized frame_queue: Queue - audio_queue: Queue motion_enabled: Synchronized improve_contrast_enabled: Synchronized motion_threshold: Synchronized @@ -27,8 +24,14 @@ class CameraMetricsTypes(TypedDict): skipped_fps: Synchronized +class RecordMetricsTypes(TypedDict): + record_enabled: Synchronized + + class StatsTrackingTypes(TypedDict): camera_metrics: dict[str, CameraMetricsTypes] detectors: dict[str, ObjectDetectProcess] started: int latest_frigate_version: str + last_updated: int + processes: dict[str, int] diff --git a/frigate/util.py b/frigate/util.py index d35b7131a..bf0e9172c 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1,25 +1,26 @@ import copy import datetime -import logging -import shlex -import subprocess as sp import json +import logging +import os import re +import shlex import signal +import subprocess as sp import traceback import urllib.parse -import yaml - from abc import ABC, abstractmethod from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory -from typing import Any, AnyStr +from typing import Any, AnyStr, Optional, Tuple import cv2 import numpy as np -import os import psutil +import py3nvml.py3nvml as nvml +import pytz +import yaml from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS @@ -454,7 +455,7 @@ def copy_yuv_to_position( # clear v2 destination_frame[v2[1] : v2[3], v2[0] : v2[2]] = 128 - if not source_frame is None: + if source_frame is not None: # calculate the resized frame, maintaining the aspect ratio source_aspect_ratio = source_frame.shape[1] / (source_frame.shape[0] // 3 * 2) dest_aspect_ratio = destination_shape[1] / destination_shape[0] @@ -571,7 +572,16 @@ def yuv_region_2_bgr(frame, region): raise -def intersection(box_a, box_b): +def intersection(box_a, box_b) -> Optional[list[int]]: + """Return intersection box or None if boxes do not intersect.""" + if ( + box_a[2] < box_b[0] + or box_a[0] > box_b[2] + or box_a[1] > box_b[3] + or box_a[3] < box_b[1] + ): + return None + return ( max(box_a[0], box_b[0]), max(box_a[1], box_b[1]), @@ -588,6 +598,9 @@ def intersection_over_union(box_a, box_b): # determine the (x, y)-coordinates of the intersection rectangle intersect = intersection(box_a, box_b) + if intersect is None: + return 0.0 + # compute the area of intersection rectangle inter_area = max(0, intersect[2] - intersect[0] + 1) * max( 0, intersect[3] - intersect[1] + 1 @@ -721,7 +734,7 @@ def load_labels(path, encoding="utf-8"): def clean_camera_user_pass(line: str) -> str: """Removes user and password from line.""" - if line.startswith("rtsp://"): + if "rtsp://" in line: return re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line) else: return re.sub(REGEX_HTTP_CAMERA_USER_PASS, "user=*&password=*", line) @@ -738,11 +751,132 @@ def escape_special_characters(path: str) -> str: return path +def get_cgroups_version() -> str: + """Determine what version of cgroups is enabled.""" + + cgroup_path = "/sys/fs/cgroup" + + if not os.path.ismount(cgroup_path): + logger.debug(f"{cgroup_path} is not a mount point.") + return "unknown" + + try: + with open("/proc/mounts", "r") as f: + mounts = f.readlines() + + for mount in mounts: + mount_info = mount.split() + if mount_info[1] == cgroup_path: + fs_type = mount_info[2] + if fs_type == "cgroup2fs" or fs_type == "cgroup2": + return "cgroup2" + elif fs_type == "tmpfs": + return "cgroup" + else: + logger.debug( + f"Could not determine cgroups version: unhandled filesystem {fs_type}" + ) + break + except Exception as e: + logger.debug(f"Could not determine cgroups version: {e}") + + return "unknown" + + +def get_docker_memlimit_bytes() -> int: + """Get mem limit in bytes set in docker if present. Returns -1 if no limit detected.""" + + # check running a supported cgroups version + if get_cgroups_version() == "cgroup2": + memlimit_path = "/sys/fs/cgroup/memory.max" + + try: + with open(memlimit_path, "r") as f: + value = f.read().strip() + + if value.isnumeric(): + return int(value) + elif value.lower() == "max": + return -1 + except Exception as e: + logger.debug(f"Unable to get docker memlimit: {e}") + + return -1 + + def get_cpu_stats() -> dict[str, dict]: """Get cpu usages for each process id""" usages = {} - # -n=2 runs to ensure extraneous values are not included - top_command = ["top", "-b", "-n", "2"] + docker_memlimit = get_docker_memlimit_bytes() / 1024 + total_mem = os.sysconf("SC_PAGE_SIZE") * os.sysconf("SC_PHYS_PAGES") / 1024 + + for process in psutil.process_iter(["pid", "name", "cpu_percent", "cmdline"]): + pid = process.info["pid"] + try: + cpu_percent = process.info["cpu_percent"] + cmdline = process.info["cmdline"] + + with open(f"/proc/{pid}/stat", "r") as f: + stats = f.readline().split() + utime = int(stats[13]) + stime = int(stats[14]) + starttime = int(stats[21]) + + with open("/proc/uptime") as f: + system_uptime_sec = int(float(f.read().split()[0])) + + clk_tck = os.sysconf(os.sysconf_names["SC_CLK_TCK"]) + + process_utime_sec = utime // clk_tck + process_stime_sec = stime // clk_tck + process_starttime_sec = starttime // clk_tck + + process_elapsed_sec = system_uptime_sec - process_starttime_sec + process_usage_sec = process_utime_sec + process_stime_sec + cpu_average_usage = process_usage_sec * 100 // process_elapsed_sec + + with open(f"/proc/{pid}/statm", "r") as f: + mem_stats = f.readline().split() + mem_res = int(mem_stats[1]) * os.sysconf("SC_PAGE_SIZE") / 1024 + + if docker_memlimit > 0: + mem_pct = round((mem_res / docker_memlimit) * 100, 1) + else: + mem_pct = round((mem_res / total_mem) * 100, 1) + + usages[pid] = { + "cpu": str(cpu_percent), + "cpu_average": str(round(cpu_average_usage, 2)), + "mem": f"{mem_pct}", + "cmdline": " ".join(cmdline), + } + except Exception: + continue + + return usages + + +def get_physical_interfaces(interfaces) -> list: + with open("/proc/net/dev", "r") as file: + lines = file.readlines() + + physical_interfaces = [] + for line in lines: + if ":" in line: + interface = line.split(":")[0].strip() + for int in interfaces: + if interface.startswith(int): + physical_interfaces.append(interface) + + return physical_interfaces + + +def get_bandwidth_stats(config) -> dict[str, dict]: + """Get bandwidth usages for each ffmpeg process id""" + usages = {} + top_command = ["nethogs", "-t", "-v0", "-c5", "-d1"] + get_physical_interfaces( + config.telemetry.network_interfaces + ) p = sp.run( top_command, @@ -751,22 +885,23 @@ def get_cpu_stats() -> dict[str, dict]: ) if p.returncode != 0: - logger.error(p.stderr) return usages else: lines = p.stdout.split("\n") - for line in lines: - stats = list(filter(lambda a: a != "", line.strip().split(" "))) + stats = list(filter(lambda a: a != "", line.strip().split("\t"))) try: - usages[stats[0]] = { - "cpu": stats[8], - "mem": stats[9], - } - except: + if re.search( + r"(^ffmpeg|\/go2rtc|frigate\.detector\.[a-z]+)/([0-9]+)/", stats[0] + ): + process = stats[0].split("/") + usages[process[len(process) - 2]] = { + "bandwidth": round(float(stats[1]) + float(stats[2]), 1), + } + except (IndexError, ValueError): continue - return usages + return usages def get_amd_gpu_stats() -> dict[str, str]: @@ -780,7 +915,7 @@ def get_amd_gpu_stats() -> dict[str, str]: ) if p.returncode != 0: - logger.error(p.stderr) + logger.error(f"Unable to poll radeon GPU stats: {p.stderr}") return None else: usages = p.stdout.split(",") @@ -788,9 +923,9 @@ def get_amd_gpu_stats() -> dict[str, str]: for hw in usages: if "gpu" in hw: - results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')} %" + results["gpu"] = f"{hw.strip().split(' ')[1].replace('%', '')}%" elif "vram" in hw: - results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')} %" + results["mem"] = f"{hw.strip().split(' ')[1].replace('%', '')}%" return results @@ -816,7 +951,7 @@ def get_intel_gpu_stats() -> dict[str, str]: # timeout has a non-zero returncode when timeout is reached if p.returncode != 124: - logger.error(p.stderr) + logger.error(f"Unable to poll intel GPU stats: {p.stderr}") return None else: reading = "".join(p.stdout.split()) @@ -824,7 +959,7 @@ def get_intel_gpu_stats() -> dict[str, str]: # render is used for qsv render = [] - for result in re.findall('"Render/3D/0":{[a-z":\d.,%]+}', reading): + for result in re.findall(r'"Render/3D/0":{[a-z":\d.,%]+}', reading): packet = json.loads(result[14:]) single = packet.get("busy", 0.0) render.append(float(single)) @@ -846,37 +981,46 @@ def get_intel_gpu_stats() -> dict[str, str]: else: video_avg = 1 - results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)} %" - results["mem"] = "- %" + results["gpu"] = f"{round((video_avg + render_avg) / 2, 2)}%" + results["mem"] = "-%" return results -def get_nvidia_gpu_stats() -> dict[str, str]: - """Get stats using nvidia-smi.""" - nvidia_smi_command = [ - "nvidia-smi", - "--query-gpu=gpu_name,utilization.gpu,memory.used,memory.total", - "--format=csv", - ] +def try_get_info(f, h, default="N/A"): + try: + v = f(h) + except nvml.NVMLError_NotSupported: + v = default + return v - p = sp.run( - nvidia_smi_command, - encoding="ascii", - capture_output=True, - ) - if p.returncode != 0: - logger.error(p.stderr) - return None - else: - usages = p.stdout.split("\n")[1].strip().split(",") - memory_percent = f"{round(float(usages[2].replace(' MiB', '').strip()) / float(usages[3].replace(' MiB', '').strip()) * 100, 1)} %" - results: dict[str, str] = { - "name": usages[0], - "gpu": usages[1].strip(), - "mem": memory_percent, - } +def get_nvidia_gpu_stats() -> dict[int, dict]: + results = {} + try: + nvml.nvmlInit() + deviceCount = nvml.nvmlDeviceGetCount() + for i in range(deviceCount): + handle = nvml.nvmlDeviceGetHandleByIndex(i) + meminfo = try_get_info(nvml.nvmlDeviceGetMemoryInfo, handle) + util = try_get_info(nvml.nvmlDeviceGetUtilizationRates, handle) + if util != "N/A": + gpu_util = util.gpu + else: + gpu_util = 0 + if meminfo != "N/A": + gpu_mem_util = meminfo.used / meminfo.total * 100 + else: + gpu_mem_util = -1 + + results[i] = { + "name": nvml.nvmlDeviceGetName(handle), + "gpu": gpu_util, + "mem": gpu_mem_util, + } + except Exception: + pass + finally: return results @@ -898,9 +1042,13 @@ def ffprobe_stream(path: str) -> sp.CompletedProcess: return sp.run(ffprobe_cmd, capture_output=True) -def vainfo_hwaccel() -> sp.CompletedProcess: +def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess: """Run vainfo.""" - ffprobe_cmd = ["vainfo"] + ffprobe_cmd = ( + ["vainfo"] + if not device_name + else ["vainfo", "--display", "drm", "--device", f"/dev/dri/{device_name}"] + ) return sp.run(ffprobe_cmd, capture_output=True) @@ -915,7 +1063,7 @@ class FrameManager(ABC): pass @abstractmethod - def get(self, name): + def get(self, name, timeout_ms=0): pass @abstractmethod @@ -956,13 +1104,13 @@ class SharedMemoryFrameManager(FrameManager): self.shm_store[name] = shm return shm.buf - def get(self, name, shape, dtype=np.uint8): + def get(self, name, shape): if name in self.shm_store: shm = self.shm_store[name] else: shm = shared_memory.SharedMemory(name=name) self.shm_store[name] = shm - return np.ndarray(shape, dtype=dtype, buffer=shm.buf) + return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) def close(self, name): if name in self.shm_store: @@ -974,3 +1122,90 @@ class SharedMemoryFrameManager(FrameManager): self.shm_store[name].close() self.shm_store[name].unlink() del self.shm_store[name] + + +def get_tz_modifiers(tz_name: str) -> Tuple[str, str]: + seconds_offset = ( + datetime.datetime.now(pytz.timezone(tz_name)).utcoffset().total_seconds() + ) + hours_offset = int(seconds_offset / 60 / 60) + minutes_offset = int(seconds_offset / 60 - hours_offset * 60) + hour_modifier = f"{hours_offset} hour" + minute_modifier = f"{minutes_offset} minute" + return hour_modifier, minute_modifier + + +def to_relative_box( + width: int, height: int, box: Tuple[int, int, int, int] +) -> Tuple[int, int, int, int]: + return ( + box[0] / width, # x + box[1] / height, # y + (box[2] - box[0]) / width, # w + (box[3] - box[1]) / height, # h + ) + + +def get_video_properties(url, get_duration=False): + def calculate_duration(video: Optional[any]) -> float: + duration = None + + if video is not None: + # Get the frames per second (fps) of the video stream + fps = video.get(cv2.CAP_PROP_FPS) + total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) + + if fps and total_frames: + duration = total_frames / fps + + # if cv2 failed need to use ffprobe + if duration is None: + ffprobe_cmd = [ + "ffprobe", + "-v", + "error", + "-show_entries", + "format=duration", + "-of", + "default=noprint_wrappers=1:nokey=1", + f"{url}", + ] + p = sp.run(ffprobe_cmd, capture_output=True) + + if p.returncode == 0 and p.stdout.decode(): + duration = float(p.stdout.decode().strip()) + else: + duration = -1 + + return duration + + width = height = 0 + + try: + # Open the video stream + video = cv2.VideoCapture(url) + + # Check if the video stream was opened successfully + if not video.isOpened(): + video = None + except Exception: + video = None + + result = {} + + if get_duration: + result["duration"] = calculate_duration(video) + + if video is not None: + # Get the width of frames in the video stream + width = video.get(cv2.CAP_PROP_FRAME_WIDTH) + + # Get the height of frames in the video stream + height = video.get(cv2.CAP_PROP_FRAME_HEIGHT) + + # Release the video stream + video.release() + + result = {"width": round(width), "height": round(height)} + + return result