Rework config YAML parsing to use only ruamel.yaml

PyYAML silently overrides keys when encountering duplicates, but ruamel
raises and exception by default. Since we're already using it elsewhere,
dropping PyYAML is an easy choice to make.
This commit is contained in:
George Tsiamasiotis 2024-09-18 12:36:26 +03:00
parent a14025200d
commit 5c87abbd8f
5 changed files with 43 additions and 47 deletions

View File

@ -248,7 +248,7 @@ def config_save():
# Validate the config schema # Validate the config schema
try: try:
FrigateConfig.parse_raw(new_config) FrigateConfig.parse_yaml(new_config)
except Exception: except Exception:
return make_response( return make_response(
jsonify( jsonify(
@ -336,7 +336,7 @@ def config_set():
f.close() f.close()
# Validate the config schema # Validate the config schema
try: try:
config_obj = FrigateConfig.parse_raw(new_raw_config) config_obj = FrigateConfig.parse_yaml(new_raw_config)
except Exception: except Exception:
with open(config_file, "w") as f: with open(config_file, "w") as f:
f.write(old_raw_config) f.write(old_raw_config)

View File

@ -19,6 +19,8 @@ from pydantic import (
field_validator, field_validator,
) )
from pydantic.fields import PrivateAttr from pydantic.fields import PrivateAttr
from ruamel.yaml import YAML
from typing_extensions import Self
from frigate.const import ( from frigate.const import (
ALL_ATTRIBUTE_LABELS, ALL_ATTRIBUTE_LABELS,
@ -31,7 +33,7 @@ from frigate.const import (
INCLUDED_FFMPEG_VERSIONS, INCLUDED_FFMPEG_VERSIONS,
MAX_PRE_CAPTURE, MAX_PRE_CAPTURE,
REGEX_CAMERA_NAME, REGEX_CAMERA_NAME,
YAML_EXT, REGEX_JSON,
) )
from frigate.detectors import DetectorConfig, ModelConfig from frigate.detectors import DetectorConfig, ModelConfig
from frigate.detectors.detector_config import BaseDetectorConfig from frigate.detectors.detector_config import BaseDetectorConfig
@ -47,7 +49,6 @@ from frigate.util.builtin import (
escape_special_characters, escape_special_characters,
generate_color_palette, generate_color_palette,
get_ffmpeg_arg_list, get_ffmpeg_arg_list,
load_config_with_no_duplicates,
) )
from frigate.util.config import StreamInfoRetriever, get_relative_coordinates from frigate.util.config import StreamInfoRetriever, get_relative_coordinates
from frigate.util.image import create_mask from frigate.util.image import create_mask
@ -55,6 +56,8 @@ from frigate.util.services import auto_detect_hwaccel
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
yaml = YAML()
# TODO: Identify what the default format to display timestamps is # TODO: Identify what the default format to display timestamps is
DEFAULT_TIME_FORMAT = "%m/%d/%Y %H:%M:%S" DEFAULT_TIME_FORMAT = "%m/%d/%Y %H:%M:%S"
# German Style: # German Style:
@ -1757,18 +1760,38 @@ class FrigateConfig(FrigateBaseModel):
return v return v
@classmethod @classmethod
def parse_file(cls, config_file): def parse_file(cls, config_path, *, is_json=None) -> Self:
with open(config_file) as f: with open(config_path) as f:
raw_config = f.read() return FrigateConfig.parse(f, is_json=is_json)
if config_file.endswith(YAML_EXT): @classmethod
config = load_config_with_no_duplicates(raw_config) def parse(cls, config, *, is_json=None) -> Self:
elif config_file.endswith(".json"): # If config is a file, read its contents.
config = json.loads(raw_config) if hasattr(config, "read"):
fname = getattr(config, "name", None)
config = config.read()
# Try to guess the value of is_json from the file extension.
if is_json is None and fname:
_, ext = os.path.splitext(fname)
if ext in (".yaml", ".yml"):
is_json = False
elif ext == ".json":
is_json = True
# At this point, ry to sniff the config string, to guess if it is json or not.
if is_json is None:
is_json = REGEX_JSON.match(config) is not None
# Parse the config into a dictionary.
if is_json:
config = json.load(config)
else:
config = yaml.load(config)
# Validate and return the config dict.
return cls.model_validate(config) return cls.model_validate(config)
@classmethod @classmethod
def parse_raw(cls, raw_config): def parse_yaml(cls, config_yaml) -> Self:
config = load_config_with_no_duplicates(raw_config) return cls.parse(config_yaml, is_json=False)
return cls.model_validate(config)

View File

@ -1,3 +1,5 @@
import re
CONFIG_DIR = "/config" CONFIG_DIR = "/config"
DEFAULT_DB_PATH = f"{CONFIG_DIR}/frigate.db" DEFAULT_DB_PATH = f"{CONFIG_DIR}/frigate.db"
MODEL_CACHE_DIR = f"{CONFIG_DIR}/model_cache" MODEL_CACHE_DIR = f"{CONFIG_DIR}/model_cache"
@ -7,7 +9,6 @@ RECORD_DIR = f"{BASE_DIR}/recordings"
EXPORT_DIR = f"{BASE_DIR}/exports" EXPORT_DIR = f"{BASE_DIR}/exports"
BIRDSEYE_PIPE = "/tmp/cache/birdseye" BIRDSEYE_PIPE = "/tmp/cache/birdseye"
CACHE_DIR = "/tmp/cache" CACHE_DIR = "/tmp/cache"
YAML_EXT = (".yaml", ".yml")
FRIGATE_LOCALHOST = "http://127.0.0.1:5000" FRIGATE_LOCALHOST = "http://127.0.0.1:5000"
PLUS_ENV_VAR = "PLUS_API_KEY" PLUS_ENV_VAR = "PLUS_API_KEY"
PLUS_API_HOST = "https://api.frigate.video" PLUS_API_HOST = "https://api.frigate.video"
@ -56,6 +57,7 @@ FFMPEG_HWACCEL_VULKAN = "preset-vulkan"
REGEX_CAMERA_NAME = r"^[a-zA-Z0-9_-]+$" REGEX_CAMERA_NAME = r"^[a-zA-Z0-9_-]+$"
REGEX_RTSP_CAMERA_USER_PASS = r":\/\/[a-zA-Z0-9_-]+:[\S]+@" REGEX_RTSP_CAMERA_USER_PASS = r":\/\/[a-zA-Z0-9_-]+:[\S]+@"
REGEX_HTTP_CAMERA_USER_PASS = r"user=[a-zA-Z0-9_-]+&password=[\S]+" REGEX_HTTP_CAMERA_USER_PASS = r"user=[a-zA-Z0-9_-]+&password=[\S]+"
REGEX_JSON = re.compile(r"^\s*\{")
# Known Driver Names # Known Driver Names

View File

@ -5,12 +5,13 @@ from unittest.mock import patch
import numpy as np import numpy as np
from pydantic import ValidationError from pydantic import ValidationError
from ruamel.yaml.constructor import DuplicateKeyError
from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import MODEL_CACHE_DIR from frigate.const import MODEL_CACHE_DIR
from frigate.detectors import DetectorTypeEnum from frigate.detectors import DetectorTypeEnum
from frigate.plus import PlusApi from frigate.plus import PlusApi
from frigate.util.builtin import deep_merge, load_config_with_no_duplicates from frigate.util.builtin import deep_merge
class TestConfig(unittest.TestCase): class TestConfig(unittest.TestCase):
@ -1537,7 +1538,7 @@ class TestConfig(unittest.TestCase):
""" """
self.assertRaises( self.assertRaises(
ValueError, lambda: load_config_with_no_duplicates(raw_config) DuplicateKeyError, lambda: FrigateConfig.parse_yaml(raw_config)
) )
def test_object_filter_ratios_work(self): def test_object_filter_ratios_work(self):

View File

@ -9,14 +9,12 @@ import queue
import re import re
import shlex import shlex
import urllib.parse import urllib.parse
from collections import Counter
from collections.abc import Mapping from collections.abc import Mapping
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Tuple from typing import Any, Optional, Tuple
import numpy as np import numpy as np
import pytz import pytz
import yaml
from ruamel.yaml import YAML from ruamel.yaml import YAML
from tzlocal import get_localzone from tzlocal import get_localzone
from zoneinfo import ZoneInfoNotFoundError from zoneinfo import ZoneInfoNotFoundError
@ -89,34 +87,6 @@ def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dic
return merged return merged
def load_config_with_no_duplicates(raw_config) -> dict:
"""Get config ensuring duplicate keys are not allowed."""
# https://stackoverflow.com/a/71751051
# important to use SafeLoader here to avoid RCE
class PreserveDuplicatesLoader(yaml.loader.SafeLoader):
pass
def map_constructor(loader, node, deep=False):
keys = [loader.construct_object(node, deep=deep) for node, _ in node.value]
vals = [loader.construct_object(node, deep=deep) for _, node in node.value]
key_count = Counter(keys)
data = {}
for key, val in zip(keys, vals):
if key_count[key] > 1:
raise ValueError(
f"Config input {key} is defined multiple times for the same field, this is not allowed."
)
else:
data[key] = val
return data
PreserveDuplicatesLoader.add_constructor(
yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, map_constructor
)
return yaml.load(raw_config, PreserveDuplicatesLoader)
def clean_camera_user_pass(line: str) -> str: def clean_camera_user_pass(line: str) -> str:
"""Removes user and password from line.""" """Removes user and password from line."""
rtsp_cleaned = re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line) rtsp_cleaned = re.sub(REGEX_RTSP_CAMERA_USER_PASS, "://*:*@", line)