from __future__ import annotations import json import logging import os from typing import Any, Dict, Optional import numpy as np from pydantic import ( BaseModel, ConfigDict, Field, TypeAdapter, ValidationInfo, field_validator, model_validator, ) from ruamel.yaml import YAML from typing_extensions import Self from frigate.const import REGEX_JSON from frigate.detectors import DetectorConfig, ModelConfig from frigate.detectors.detector_config import BaseDetectorConfig from frigate.plus import PlusApi from frigate.util.builtin import ( deep_merge, get_ffmpeg_arg_list, load_labels, ) from frigate.util.config import ( CURRENT_CONFIG_VERSION, StreamInfoRetriever, convert_area_to_pixels, find_config_file, get_relative_coordinates, migrate_frigate_config, ) from frigate.util.image import create_mask from frigate.util.services import auto_detect_hwaccel from .auth import AuthConfig from .base import FrigateBaseModel from .camera import CameraConfig, CameraLiveConfig from .camera.audio import AudioConfig, AudioFilterConfig from .camera.birdseye import BirdseyeConfig from .camera.detect import DetectConfig from .camera.ffmpeg import FfmpegConfig from .camera.genai import GenAIConfig, GenAIRoleEnum from .camera.mask import ObjectMaskConfig from .camera.motion import MotionConfig from .camera.notification import NotificationConfig from .camera.objects import FilterConfig, ObjectConfig from .camera.record import RecordConfig from .camera.review import ReviewConfig from .camera.snapshots import SnapshotsConfig from .camera.timestamp import TimestampStyleConfig from .camera_group import CameraGroupConfig from .classification import ( AudioTranscriptionConfig, ClassificationConfig, FaceRecognitionConfig, LicensePlateRecognitionConfig, SemanticSearchConfig, SemanticSearchModelEnum, ) from .database import DatabaseConfig from .env import EnvVars from .logger import LoggerConfig from .mqtt import MqttConfig from .network import NetworkingConfig from .profile import ProfileDefinitionConfig from .proxy import ProxyConfig from .telemetry import TelemetryConfig from .tls import TlsConfig from .ui import UIConfig __all__ = ["FrigateConfig"] logger = logging.getLogger(__name__) yaml = YAML() DEFAULT_CONFIG = f""" mqtt: enabled: False cameras: {{}} # No cameras defined, UI wizard should be used version: {CURRENT_CONFIG_VERSION} """ DEFAULT_DETECTORS = {"cpu": {"type": "cpu"}} DEFAULT_DETECT_DIMENSIONS = {"width": 1280, "height": 720} # stream info handler stream_info_retriever = StreamInfoRetriever() class RuntimeMotionConfig(MotionConfig): """Runtime version of MotionConfig with rasterized masks.""" rasterized_mask: np.ndarray = Field(default=None, exclude=True) def __init__(self, **config): frame_shape = config.get("frame_shape", (1, 1)) # Store original mask dict for serialization original_mask = config.get("mask", {}) if isinstance(original_mask, dict): # Process the new dict format - update raw_coordinates for each mask processed_mask = {} for mask_id, mask_config in original_mask.items(): if isinstance(mask_config, dict): coords = mask_config.get("coordinates", "") relative_coords = get_relative_coordinates(coords, frame_shape) mask_config_copy = mask_config.copy() mask_config_copy["raw_coordinates"] = ( relative_coords if relative_coords else coords ) mask_config_copy["coordinates"] = ( relative_coords if relative_coords else coords ) processed_mask[mask_id] = mask_config_copy else: processed_mask[mask_id] = mask_config config["mask"] = processed_mask config["raw_mask"] = processed_mask super().__init__(**config) # Rasterize only enabled masks enabled_coords = [] for mask_config in self.mask.values(): if mask_config.enabled and mask_config.coordinates: coords = mask_config.coordinates if isinstance(coords, list): enabled_coords.extend(coords) else: enabled_coords.append(coords) if enabled_coords: self.rasterized_mask = create_mask(frame_shape, enabled_coords) else: empty_mask = np.zeros(frame_shape, np.uint8) empty_mask[:] = 255 self.rasterized_mask = empty_mask model_config = ConfigDict(arbitrary_types_allowed=True, extra="ignore") class RuntimeFilterConfig(FilterConfig): """Runtime version of FilterConfig with rasterized masks.""" rasterized_mask: Optional[np.ndarray] = Field(default=None, exclude=True) def __init__(self, **config): frame_shape = config.get("frame_shape", (1, 1)) # Store original mask dict for serialization original_mask = config.get("mask", {}) if isinstance(original_mask, dict): # Process the new dict format - update raw_coordinates for each mask processed_mask = {} for mask_id, mask_config in original_mask.items(): # Handle both dict and ObjectMaskConfig formats if hasattr(mask_config, "model_dump"): # It's an ObjectMaskConfig object mask_dict = mask_config.model_dump() coords = mask_dict.get("coordinates", "") relative_coords = get_relative_coordinates(coords, frame_shape) mask_dict["raw_coordinates"] = ( relative_coords if relative_coords else coords ) mask_dict["coordinates"] = ( relative_coords if relative_coords else coords ) processed_mask[mask_id] = mask_dict elif isinstance(mask_config, dict): coords = mask_config.get("coordinates", "") relative_coords = get_relative_coordinates(coords, frame_shape) mask_config_copy = mask_config.copy() mask_config_copy["raw_coordinates"] = ( relative_coords if relative_coords else coords ) mask_config_copy["coordinates"] = ( relative_coords if relative_coords else coords ) processed_mask[mask_id] = mask_config_copy else: processed_mask[mask_id] = mask_config config["mask"] = processed_mask config["raw_mask"] = processed_mask # Convert min_area and max_area to pixels if they're percentages if "min_area" in config: config["min_area"] = convert_area_to_pixels(config["min_area"], frame_shape) if "max_area" in config: config["max_area"] = convert_area_to_pixels(config["max_area"], frame_shape) super().__init__(**config) # Rasterize only enabled masks enabled_coords = [] for mask_config in self.mask.values(): if mask_config.enabled and mask_config.coordinates: coords = mask_config.coordinates if isinstance(coords, list): enabled_coords.extend(coords) else: enabled_coords.append(coords) if enabled_coords: self.rasterized_mask = create_mask(frame_shape, enabled_coords) else: self.rasterized_mask = None model_config = ConfigDict(arbitrary_types_allowed=True, extra="ignore") class RestreamConfig(BaseModel): model_config = ConfigDict(extra="allow") def verify_config_roles(camera_config: CameraConfig) -> None: """Verify that roles are setup in the config correctly.""" assigned_roles = list( set([r for i in camera_config.ffmpeg.inputs for r in i.roles]) ) if camera_config.record.enabled and "record" not in assigned_roles: raise ValueError( f"Camera {camera_config.name} has record enabled, but record is not assigned to an input." ) if camera_config.audio.enabled and "audio" not in assigned_roles: raise ValueError( f"Camera {camera_config.name} has audio events enabled, but audio is not assigned to an input." ) def verify_valid_live_stream_names( frigate_config: FrigateConfig, camera_config: CameraConfig ) -> ValueError | None: """Verify that a restream exists to use for live view.""" for _, stream_name in camera_config.live.streams.items(): if ( stream_name not in frigate_config.go2rtc.model_dump().get("streams", {}).keys() ): return ValueError( f"No restream with name {stream_name} exists for camera {camera_config.name}." ) def verify_recording_segments_setup_with_reasonable_time( camera_config: CameraConfig, ) -> None: """Verify that recording segments are setup and segment time is not greater than 60.""" record_args: list[str] = get_ffmpeg_arg_list( camera_config.ffmpeg.output_args.record ) if record_args[0].startswith("preset"): return try: seg_arg_index = record_args.index("-segment_time") except ValueError: raise ValueError( f"Camera {camera_config.name} has no segment_time in \ recording output args, segment args are required for record." ) if int(record_args[seg_arg_index + 1]) > 60: raise ValueError( f"Camera {camera_config.name} has invalid segment_time output arg, \ segment_time must be 60 or less." ) def verify_zone_objects_are_tracked(camera_config: CameraConfig) -> None: """Verify that user has not entered zone objects that are not in the tracking config.""" for zone_name, zone in camera_config.zones.items(): for obj in zone.objects: if obj not in camera_config.objects.track: raise ValueError( f"Zone {zone_name} is configured to track {obj} but that object type is not added to objects -> track." ) def verify_required_zones_exist(camera_config: CameraConfig) -> None: for det_zone in camera_config.review.detections.required_zones: if det_zone not in camera_config.zones.keys(): raise ValueError( f"Camera {camera_config.name} has a required zone for detections {det_zone} that is not defined." ) for det_zone in camera_config.review.alerts.required_zones: if det_zone not in camera_config.zones.keys(): raise ValueError( f"Camera {camera_config.name} has a required zone for alerts {det_zone} that is not defined." ) def verify_autotrack_zones(camera_config: CameraConfig) -> ValueError | None: """Verify that required_zones are specified when autotracking is enabled.""" if ( camera_config.onvif.autotracking.enabled and not camera_config.onvif.autotracking.required_zones ): raise ValueError( f"Camera {camera_config.name} has autotracking enabled, required_zones must be set to at least one of the camera's zones." ) def verify_motion_and_detect(camera_config: CameraConfig) -> ValueError | None: """Verify that motion detection is not disabled and object detection is enabled.""" if camera_config.detect.enabled and not camera_config.motion.enabled: raise ValueError( f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection." ) def verify_objects_track( camera_config: CameraConfig, enabled_objects: list[str] ) -> None: """Verify that a user has not specified an object to track that is not in the labelmap.""" valid_objects = [ obj for obj in camera_config.objects.track if obj in enabled_objects ] if len(valid_objects) != len(camera_config.objects.track): invalid_objects = set(camera_config.objects.track) - set(valid_objects) logger.warning( f"{camera_config.name} is configured to track {list(invalid_objects)} objects, which are not supported by the current model." ) camera_config.objects.track = valid_objects def verify_lpr_and_face( frigate_config: FrigateConfig, camera_config: CameraConfig ) -> ValueError | None: """Verify that lpr and face are enabled at the global level if enabled at the camera level.""" if camera_config.lpr.enabled and not frigate_config.lpr.enabled: raise ValueError( f"Camera {camera_config.name} has lpr enabled but lpr is disabled at the global level of the config. You must enable lpr at the global level." ) if ( camera_config.face_recognition.enabled and not frigate_config.face_recognition.enabled ): raise ValueError( f"Camera {camera_config.name} has face_recognition enabled but face_recognition is disabled at the global level of the config. You must enable face_recognition at the global level." ) class FrigateConfig(FrigateBaseModel): version: Optional[str] = Field( default=None, title="Current config version", description="Numeric or string version of the active configuration to help detect migrations or format changes.", ) safe_mode: bool = Field( default=False, title="Safe mode", description="When enabled, start Frigate in safe mode with reduced features for troubleshooting.", ) # Fields that install global state should be defined first, so that their validators run first. environment_vars: EnvVars = Field( default_factory=dict, title="Environment variables", description="Key/value pairs of environment variables to set for the Frigate process in Home Assistant OS. Non-HAOS users must use Docker environment variable configuration instead.", ) logger: LoggerConfig = Field( default_factory=LoggerConfig, title="Logging", description="Controls default log verbosity and per-component log level overrides.", validate_default=True, ) # Global config auth: AuthConfig = Field( default_factory=AuthConfig, title="Authentication", description="Authentication and session-related settings including cookie and rate limit options.", ) database: DatabaseConfig = Field( default_factory=DatabaseConfig, title="Database", description="Settings for the SQLite database used by Frigate to store tracked object and recording metadata.", ) go2rtc: RestreamConfig = Field( default_factory=RestreamConfig, title="go2rtc", description="Settings for the integrated go2rtc restreaming service used for live stream relaying and translation.", ) mqtt: MqttConfig = Field( title="MQTT", description="Settings for connecting and publishing telemetry, snapshots, and event details to an MQTT broker.", ) notifications: NotificationConfig = Field( default_factory=NotificationConfig, title="Notifications", description="Settings to enable and control notifications for all cameras; can be overridden per-camera.", ) networking: NetworkingConfig = Field( default_factory=NetworkingConfig, title="Networking", description="Network-related settings such as IPv6 enablement for Frigate endpoints.", ) proxy: ProxyConfig = Field( default_factory=ProxyConfig, title="Proxy", description="Settings for integrating Frigate behind a reverse proxy that passes authenticated user headers.", ) telemetry: TelemetryConfig = Field( default_factory=TelemetryConfig, title="Telemetry", description="System telemetry and stats options including GPU and network bandwidth monitoring.", ) tls: TlsConfig = Field( default_factory=TlsConfig, title="TLS", description="TLS settings for Frigate's web endpoints (port 8971).", ) ui: UIConfig = Field( default_factory=UIConfig, title="UI", description="User interface preferences such as timezone, time/date formatting, and units.", ) # Detector config detectors: Dict[str, BaseDetectorConfig] = Field( default=DEFAULT_DETECTORS, title="Detector hardware", description="Configuration for object detectors (CPU, GPU, ONNX backends) and any detector-specific model settings.", ) model: ModelConfig = Field( default_factory=ModelConfig, title="Detection model", description="Settings to configure a custom object detection model and its input shape.", ) # GenAI config (named provider configs: name -> GenAIConfig) genai: Dict[str, GenAIConfig] = Field( default_factory=dict, title="Generative AI configuration", description="Settings for integrated generative AI providers used to generate object descriptions and review summaries.", ) # Camera config cameras: Dict[str, CameraConfig] = Field(title="Cameras", description="Cameras") audio: AudioConfig = Field( default_factory=AudioConfig, title="Audio events", description="Settings for audio-based event detection for all cameras; can be overridden per-camera.", ) birdseye: BirdseyeConfig = Field( default_factory=BirdseyeConfig, title="Birdseye", description="Settings for the Birdseye composite view that composes multiple camera feeds into a single layout.", ) detect: DetectConfig = Field( default_factory=DetectConfig, title="Object Detection", description="Settings for the detection/detect role used to run object detection and initialize trackers.", ) ffmpeg: FfmpegConfig = Field( default_factory=FfmpegConfig, title="FFmpeg", description="FFmpeg settings including binary path, args, hwaccel options, and per-role output args.", ) live: CameraLiveConfig = Field( default_factory=CameraLiveConfig, title="Live playback", description="Settings to control the jsmpeg live stream resolution and quality. This does not affect restreamed cameras that use go2rtc for live view.", ) motion: Optional[MotionConfig] = Field( default=None, title="Motion detection", description="Default motion detection settings applied to cameras unless overridden per-camera.", ) objects: ObjectConfig = Field( default_factory=ObjectConfig, title="Objects", description="Object tracking defaults including which labels to track and per-object filters.", ) record: RecordConfig = Field( default_factory=RecordConfig, title="Recording", description="Recording and retention settings applied to cameras unless overridden per-camera.", ) review: ReviewConfig = Field( default_factory=ReviewConfig, title="Review", description="Settings that control alerts, detections, and GenAI review summaries used by the UI and storage.", ) snapshots: SnapshotsConfig = Field( default_factory=SnapshotsConfig, title="Snapshots", description="Settings for API-generated snapshots of tracked objects for all cameras; can be overridden per-camera.", ) timestamp_style: TimestampStyleConfig = Field( default_factory=TimestampStyleConfig, title="Timestamp style", description="Styling options for in-feed timestamps applied to debug view and snapshots.", ) # Classification Config audio_transcription: AudioTranscriptionConfig = Field( default_factory=AudioTranscriptionConfig, title="Audio transcription", description="Settings for live and speech audio transcription used for events and live captions.", ) classification: ClassificationConfig = Field( default_factory=ClassificationConfig, title="Object classification", description="Settings for classification models used to refine object labels or state classification.", ) semantic_search: SemanticSearchConfig = Field( default_factory=SemanticSearchConfig, title="Semantic Search", description="Settings for Semantic Search which builds and queries object embeddings to find similar items.", ) face_recognition: FaceRecognitionConfig = Field( default_factory=FaceRecognitionConfig, title="Face recognition", description="Settings for face detection and recognition for all cameras; can be overridden per-camera.", ) lpr: LicensePlateRecognitionConfig = Field( default_factory=LicensePlateRecognitionConfig, title="License Plate Recognition", description="License plate recognition settings including detection thresholds, formatting, and known plates.", ) camera_groups: Dict[str, CameraGroupConfig] = Field( default_factory=dict, title="Camera groups", description="Configuration for named camera groups used to organize cameras in the UI.", ) profiles: Dict[str, ProfileDefinitionConfig] = Field( default_factory=dict, title="Profiles", description="Named profile definitions with friendly names. Camera profiles must reference names defined here.", ) active_profile: Optional[str] = Field( default=None, title="Active profile", description="Currently active profile name. Runtime-only, not persisted in YAML.", exclude=True, ) _plus_api: PlusApi @property def plus_api(self) -> PlusApi: return self._plus_api @model_validator(mode="after") def post_validation(self, info: ValidationInfo) -> Self: # Load plus api from context, if possible. self._plus_api = None if isinstance(info.context, dict): self._plus_api = info.context.get("plus_api") # Ensure self._plus_api is set, if no explicit value is provided. if self._plus_api is None: self._plus_api = PlusApi() # set notifications state self.notifications.enabled_in_config = self.notifications.enabled # validate genai: each role (tools, vision, embeddings) at most once role_to_name: dict[GenAIRoleEnum, str] = {} for name, genai_cfg in self.genai.items(): for role in genai_cfg.roles: if role in role_to_name: raise ValueError( f"GenAI role '{role.value}' is assigned to both " f"'{role_to_name[role]}' and '{name}'; each role must have " "exactly one provider." ) role_to_name[role] = name # validate semantic_search.model when it is a GenAI provider name if ( self.semantic_search.enabled and isinstance(self.semantic_search.model, str) and not isinstance(self.semantic_search.model, SemanticSearchModelEnum) ): if self.semantic_search.model not in self.genai: raise ValueError( f"semantic_search.model '{self.semantic_search.model}' is not a " "valid GenAI config key. Must match a key in genai config." ) genai_cfg = self.genai[self.semantic_search.model] if GenAIRoleEnum.embeddings not in genai_cfg.roles: raise ValueError( f"GenAI provider '{self.semantic_search.model}' must have " "'embeddings' in its roles for semantic search." ) # set default min_score for object attributes for attribute in self.model.all_attributes: if not self.objects.filters.get(attribute): self.objects.filters[attribute] = FilterConfig(min_score=0.7) elif self.objects.filters[attribute].min_score == 0.5: self.objects.filters[attribute].min_score = 0.7 # auto detect hwaccel args if self.ffmpeg.hwaccel_args == "auto": self.ffmpeg.hwaccel_args = auto_detect_hwaccel() # Populate global audio filters for all audio labels all_audio_labels = { label for label in load_labels("/audio-labelmap.txt", prefill=521).values() if label } if self.audio.filters is None: self.audio.filters = {} for key in sorted(all_audio_labels - self.audio.filters.keys()): self.audio.filters[key] = AudioFilterConfig() self.audio.filters = dict(sorted(self.audio.filters.items())) # Global config to propagate down to camera level global_config = self.model_dump( include={ "audio": ..., "audio_transcription": ..., "birdseye": ..., "face_recognition": ..., "lpr": ..., "record": ..., "snapshots": ..., "live": ..., "objects": ..., "review": ..., "motion": ..., "notifications": ..., "detect": ..., "ffmpeg": ..., "timestamp_style": ..., }, exclude_unset=True, ) for key, detector in self.detectors.items(): adapter = TypeAdapter(DetectorConfig) model_dict = ( detector if isinstance(detector, dict) else detector.model_dump(warnings="none") ) detector_config: BaseDetectorConfig = adapter.validate_python(model_dict) # users should not set model themselves if detector_config.model: logger.warning( "The model key should be specified at the root level of the config, not under detectors. The nested model key will be ignored." ) detector_config.model = None model_config = self.model.model_dump(exclude_unset=True, warnings="none") if detector_config.model_path: model_config["path"] = detector_config.model_path if "path" not in model_config: if detector_config.type == "cpu" or detector_config.type.endswith( "_tfl" ): model_config["path"] = "/cpu_model.tflite" elif detector_config.type == "edgetpu": model_config["path"] = "/edgetpu_model.tflite" model = ModelConfig.model_validate(model_config) model.check_and_load_plus_model(self.plus_api, detector_config.type) model.compute_model_hash() labelmap_objects = model.merged_labelmap.values() detector_config.model = model self.detectors[key] = detector_config for name, camera in self.cameras.items(): modified_global_config = global_config.copy() # only populate some fields down to the camera level for specific keys allowed_fields_map = { "face_recognition": ["enabled", "min_area"], "lpr": ["enabled", "expire_time", "min_area", "enhancement"], "audio_transcription": ["enabled", "live_enabled"], } for section in allowed_fields_map: if section in modified_global_config: modified_global_config[section] = { k: v for k, v in modified_global_config[section].items() if k in allowed_fields_map[section] } merged_config = deep_merge( camera.model_dump(exclude_unset=True), modified_global_config ) camera_config: CameraConfig = CameraConfig.model_validate( {"name": name, **merged_config} ) if camera_config.ffmpeg.hwaccel_args == "auto": camera_config.ffmpeg.hwaccel_args = self.ffmpeg.hwaccel_args # Resolve export hwaccel_args: camera export -> camera ffmpeg -> global ffmpeg # This allows per-camera override for exports (e.g., when camera resolution # exceeds hardware encoder limits) if camera_config.record.export.hwaccel_args == "auto": camera_config.record.export.hwaccel_args = ( camera_config.ffmpeg.hwaccel_args ) for input in camera_config.ffmpeg.inputs: need_detect_dimensions = "detect" in input.roles and ( camera_config.detect.height is None or camera_config.detect.width is None ) if need_detect_dimensions: stream_info = {"width": 0, "height": 0, "fourcc": None} try: stream_info = stream_info_retriever.get_stream_info( self.ffmpeg, input.path ) except Exception: logger.warning( f"Error detecting stream parameters automatically for {input.path} Applying default values." ) stream_info = {"width": 0, "height": 0, "fourcc": None} if need_detect_dimensions: camera_config.detect.width = ( stream_info["width"] if stream_info.get("width") else DEFAULT_DETECT_DIMENSIONS["width"] ) camera_config.detect.height = ( stream_info["height"] if stream_info.get("height") else DEFAULT_DETECT_DIMENSIONS["height"] ) # Warn if detect fps > 10 if camera_config.detect.fps > 10 and camera_config.type != "lpr": logger.warning( f"{camera_config.name} detect fps is set to {camera_config.detect.fps}. This does NOT need to match your camera's frame rate. High values could lead to reduced performance. Recommended value is 5." ) if camera_config.detect.fps > 15 and camera_config.type == "lpr": logger.warning( f"{camera_config.name} detect fps is set to {camera_config.detect.fps}. This does NOT need to match your camera's frame rate. High values could lead to reduced performance. Recommended value for LPR cameras are between 5-15." ) # Default min_initialized configuration min_initialized = max(int(camera_config.detect.fps / 2), 2) if camera_config.detect.min_initialized is None: camera_config.detect.min_initialized = min_initialized # Default max_disappeared configuration max_disappeared = camera_config.detect.fps * 5 if camera_config.detect.max_disappeared is None: camera_config.detect.max_disappeared = max_disappeared # Default stationary_threshold configuration stationary_threshold = camera_config.detect.fps * 10 if camera_config.detect.stationary.threshold is None: camera_config.detect.stationary.threshold = stationary_threshold # default to the stationary_threshold if not defined if camera_config.detect.stationary.interval is None: camera_config.detect.stationary.interval = stationary_threshold # set config pre-value camera_config.enabled_in_config = camera_config.enabled camera_config.audio.enabled_in_config = camera_config.audio.enabled camera_config.audio_transcription.enabled_in_config = ( camera_config.audio_transcription.enabled ) camera_config.record.enabled_in_config = camera_config.record.enabled camera_config.notifications.enabled_in_config = ( camera_config.notifications.enabled ) camera_config.onvif.autotracking.enabled_in_config = ( camera_config.onvif.autotracking.enabled ) camera_config.review.alerts.enabled_in_config = ( camera_config.review.alerts.enabled ) camera_config.review.detections.enabled_in_config = ( camera_config.review.detections.enabled ) camera_config.objects.genai.enabled_in_config = ( camera_config.objects.genai.enabled ) camera_config.review.genai.enabled_in_config = ( camera_config.review.genai.enabled ) if camera_config.audio.filters is None: camera_config.audio.filters = {} for key in sorted(all_audio_labels - camera_config.audio.filters.keys()): camera_config.audio.filters[key] = AudioFilterConfig() camera_config.audio.filters = dict( sorted(camera_config.audio.filters.items()) ) # Add default filters object_keys = camera_config.objects.track if camera_config.objects.filters is None: camera_config.objects.filters = {} object_keys = object_keys - camera_config.objects.filters.keys() for key in object_keys: camera_config.objects.filters[key] = FilterConfig() # Process global object masks to set raw_coordinates if camera_config.objects.mask: processed_global_masks = {} for mask_id, mask_config in camera_config.objects.mask.items(): if mask_config: coords = mask_config.coordinates relative_coords = get_relative_coordinates( coords, camera_config.frame_shape ) # Create a new ObjectMaskConfig with raw_coordinates set processed_global_masks[mask_id] = ObjectMaskConfig( friendly_name=mask_config.friendly_name, enabled=mask_config.enabled, coordinates=relative_coords if relative_coords else coords, raw_coordinates=relative_coords if relative_coords else coords, enabled_in_config=mask_config.enabled, ) else: processed_global_masks[mask_id] = mask_config camera_config.objects.mask = processed_global_masks camera_config.objects.raw_mask = processed_global_masks # Apply global object masks and convert masks to numpy array for object, filter in camera_config.objects.filters.items(): # Set enabled_in_config for per-object masks before processing for mask_config in filter.mask.values(): if mask_config: mask_config.enabled_in_config = mask_config.enabled # Merge global object masks with per-object filter masks merged_mask = dict(filter.mask) # Copy filter-specific masks # Add global object masks if they exist if camera_config.objects.mask: for mask_id, mask_config in camera_config.objects.mask.items(): # Use a global prefix to avoid key collisions global_mask_id = f"global_{mask_id}" merged_mask[global_mask_id] = mask_config # Set runtime filter to create masks camera_config.objects.filters[object] = RuntimeFilterConfig( frame_shape=camera_config.frame_shape, mask=merged_mask, **filter.model_dump( exclude_unset=True, exclude={"mask", "raw_mask"} ), ) # Set enabled_in_config for motion masks to match config file state BEFORE creating RuntimeMotionConfig if camera_config.motion: camera_config.motion.enabled_in_config = camera_config.motion.enabled for mask_config in camera_config.motion.mask.values(): if mask_config: mask_config.enabled_in_config = mask_config.enabled # Convert motion configuration if camera_config.motion is None: camera_config.motion = RuntimeMotionConfig( frame_shape=camera_config.frame_shape ) else: camera_config.motion = RuntimeMotionConfig( frame_shape=camera_config.frame_shape, **camera_config.motion.model_dump(exclude_unset=True), ) # generate zone contours if len(camera_config.zones) > 0: for zone in camera_config.zones.values(): if zone.filters: for object_name, filter_config in zone.filters.items(): zone.filters[object_name] = RuntimeFilterConfig( frame_shape=camera_config.frame_shape, **filter_config.model_dump(exclude_unset=True), ) zone.generate_contour(camera_config.frame_shape) # Set enabled_in_config for zones to match config file state for zone in camera_config.zones.values(): zone.enabled_in_config = zone.enabled # Set live view stream if none is set if not camera_config.live.streams: camera_config.live.streams = {name: name} # generate the ffmpeg commands camera_config.create_ffmpeg_cmds() self.cameras[name] = camera_config verify_config_roles(camera_config) verify_valid_live_stream_names(self, camera_config) verify_recording_segments_setup_with_reasonable_time(camera_config) verify_zone_objects_are_tracked(camera_config) verify_required_zones_exist(camera_config) verify_autotrack_zones(camera_config) verify_motion_and_detect(camera_config) verify_objects_track(camera_config, labelmap_objects) verify_lpr_and_face(self, camera_config) # Validate camera profiles reference top-level profile definitions for cam_name, cam_config in self.cameras.items(): for profile_name in cam_config.profiles: if profile_name not in self.profiles: raise ValueError( f"Camera '{cam_name}' references profile '{profile_name}' " f"which is not defined in the top-level 'profiles' section" ) # set names on classification configs for name, config in self.classification.custom.items(): config.name = name self.objects.parse_all_objects(self.cameras) self.model.create_colormap(sorted(self.objects.all_objects)) self.model.check_and_load_plus_model(self.plus_api) # Check audio transcription and audio detection requirements if self.audio_transcription.enabled: # If audio transcription is enabled globally, at least one camera must have audio detection enabled if not any(camera.audio.enabled for camera in self.cameras.values()): raise ValueError( "Audio transcription is enabled globally, but no cameras have audio detection enabled. At least one camera must have audio detection enabled." ) else: # If audio transcription is disabled globally, check each camera with audio_transcription enabled for camera in self.cameras.values(): if camera.audio_transcription.enabled and not camera.audio.enabled: raise ValueError( f"Camera {camera.name} has audio transcription enabled, but audio detection is not enabled for this camera. Audio detection must be enabled for cameras with audio transcription when it is disabled globally." ) # Validate auth roles against cameras camera_names = set(self.cameras.keys()) for role, allowed_cameras in self.auth.roles.items(): invalid_cameras = [ cam for cam in allowed_cameras if cam not in camera_names ] if invalid_cameras: logger.warning( f"Role '{role}' references non-existent cameras: {invalid_cameras}. " ) return self @field_validator("cameras") @classmethod def ensure_zones_and_cameras_have_different_names(cls, v: Dict[str, CameraConfig]): zones = [zone for camera in v.values() for zone in camera.zones.keys()] for zone in zones: if zone in v.keys(): raise ValueError("Zones cannot share names with cameras") return v @classmethod def load(cls, **kwargs): """Loads the Frigate config file, runs migrations, and creates the config object.""" config_path = find_config_file() # No configuration file found, create one. new_config = False if not os.path.isfile(config_path): logger.info("No config file found, saving default config") config_path = config_path new_config = True else: # Check if the config file needs to be migrated. migrate_frigate_config(config_path) # Finally, load the resulting configuration file. with open(config_path, "a+" if new_config else "r") as f: # Only write the default config if the opened file is non-empty. This can happen as # a race condition. It's extremely unlikely, but eh. Might as well check it. if new_config and f.tell() == 0: f.write(DEFAULT_CONFIG) logger.info( "Created default config file, see the getting started docs for configuration: https://docs.frigate.video/guides/getting_started" ) f.seek(0) return FrigateConfig.parse(f, **kwargs) @classmethod def parse(cls, config, *, is_json=None, safe_load=False, **context): # If config is a file, read its contents. if hasattr(config, "read"): fname = getattr(config, "name", None) config = config.read() # Try to guess the value of is_json from the file extension. if is_json is None and fname: _, ext = os.path.splitext(fname) if ext in (".yaml", ".yml"): is_json = False elif ext == ".json": is_json = True # At this point, try to sniff the config string, to guess if it is json or not. if is_json is None: is_json = REGEX_JSON.match(config) is not None # Parse the config into a dictionary. if is_json: config = json.load(config) else: config = yaml.load(config) # load minimal Frigate config after the full config did not validate if safe_load: safe_config = {"safe_mode": True, "cameras": {}, "mqtt": {"enabled": False}} # copy over auth and proxy config in case auth needs to be enforced safe_config["auth"] = config.get("auth", {}) safe_config["proxy"] = config.get("proxy", {}) # copy over database config for auth and so a new db is not created safe_config["database"] = config.get("database", {}) return cls.parse_object(safe_config, **context) # Validate and return the config dict. return cls.parse_object(config, **context) @classmethod def parse_yaml(cls, config_yaml, **context): return cls.parse(config_yaml, is_json=False, **context) @classmethod def parse_object( cls, obj: Any, *, plus_api: Optional[PlusApi] = None, install: bool = False ): return cls.model_validate( obj, context={"plus_api": plus_api, "install": install} )