add env var check function

This commit is contained in:
Josh Hawkins 2026-05-15 09:49:05 -05:00
parent 9b5fc8fb0f
commit 35aee8cd4c
4 changed files with 57 additions and 35 deletions

View File

@ -3,7 +3,6 @@
import json import json
import os import os
import sys import sys
from pathlib import Path
from typing import Any from typing import Any
from ruamel.yaml import YAML from ruamel.yaml import YAML
@ -18,38 +17,12 @@ from frigate.const import (
) )
from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_encode from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_encode
from frigate.util.config import find_config_file from frigate.util.config import find_config_file
from frigate.util.services import is_restricted_source from frigate.util.services import is_restricted_go2rtc_source
sys.path.remove("/opt/frigate") sys.path.remove("/opt/frigate")
yaml = YAML() yaml = YAML()
# Check if arbitrary exec sources are allowed (defaults to False for security)
allow_arbitrary_exec = None
if "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.environ:
allow_arbitrary_exec = os.environ.get("GO2RTC_ALLOW_ARBITRARY_EXEC")
elif (
os.path.isdir("/run/secrets")
and os.access("/run/secrets", os.R_OK)
and "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.listdir("/run/secrets")
):
allow_arbitrary_exec = (
Path(os.path.join("/run/secrets", "GO2RTC_ALLOW_ARBITRARY_EXEC"))
.read_text()
.strip()
)
# check for the add-on options file
elif os.path.isfile("/data/options.json"):
with open("/data/options.json") as f:
raw_options = f.read()
options = json.loads(raw_options)
allow_arbitrary_exec = options.get("go2rtc_allow_arbitrary_exec")
ALLOW_ARBITRARY_EXEC = allow_arbitrary_exec is not None and str(
allow_arbitrary_exec
).lower() in ("true", "1", "yes")
config_file = find_config_file() config_file = find_config_file()
try: try:
@ -135,7 +108,7 @@ for name in list(go2rtc_config.get("streams", {})):
if isinstance(stream, str): if isinstance(stream, str):
try: try:
formatted_stream = substitute_frigate_vars(stream) formatted_stream = substitute_frigate_vars(stream)
if not ALLOW_ARBITRARY_EXEC and is_restricted_source(formatted_stream): if is_restricted_go2rtc_source(formatted_stream):
print( print(
f"[ERROR] Stream '{name}' uses a restricted source (echo/expr/exec) which is disabled by default for security. " f"[ERROR] Stream '{name}' uses a restricted source (echo/expr/exec) which is disabled by default for security. "
f"Set GO2RTC_ALLOW_ARBITRARY_EXEC=true to enable arbitrary exec sources." f"Set GO2RTC_ALLOW_ARBITRARY_EXEC=true to enable arbitrary exec sources."
@ -154,7 +127,7 @@ for name in list(go2rtc_config.get("streams", {})):
for i, stream_item in enumerate(stream): for i, stream_item in enumerate(stream):
try: try:
formatted_stream = substitute_frigate_vars(stream_item) formatted_stream = substitute_frigate_vars(stream_item)
if not ALLOW_ARBITRARY_EXEC and is_restricted_source(formatted_stream): if is_restricted_go2rtc_source(formatted_stream):
print( print(
f"[ERROR] Stream '{name}' item {i + 1} uses a restricted source (echo/expr/exec) which is disabled by default for security. " f"[ERROR] Stream '{name}' item {i + 1} uses a restricted source (echo/expr/exec) which is disabled by default for security. "
f"Set GO2RTC_ALLOW_ARBITRARY_EXEC=true to enable arbitrary exec sources." f"Set GO2RTC_ALLOW_ARBITRARY_EXEC=true to enable arbitrary exec sources."

View File

@ -38,7 +38,7 @@ from frigate.util.builtin import clean_camera_user_pass
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
from frigate.util.config import find_config_file from frigate.util.config import find_config_file
from frigate.util.image import run_ffmpeg_snapshot from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import ffprobe_stream, is_restricted_source from frigate.util.services import ffprobe_stream, is_restricted_go2rtc_source
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -151,7 +151,7 @@ def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""):
except KeyError: except KeyError:
resolved_src = src resolved_src = src
if is_restricted_source(resolved_src): if is_restricted_go2rtc_source(resolved_src):
logger.warning( logger.warning(
"Rejected go2rtc stream '%s' with restricted source type (echo/expr/exec)", "Rejected go2rtc stream '%s' with restricted source type (echo/expr/exec)",
stream_name, stream_name,

View File

@ -1,3 +1,4 @@
import os
from unittest.mock import patch from unittest.mock import patch
from fastapi import HTTPException, Request from fastapi import HTTPException, Request
@ -384,6 +385,24 @@ class TestGo2rtcStreamAccess(BaseTestHttp):
f"Non-restricted source should not be rejected with 400; got {resp.status_code}" f"Non-restricted source should not be rejected with 400; got {resp.status_code}"
) )
def test_add_stream_allows_restricted_source_when_override_set(self):
"""When GO2RTC_ALLOW_ARBITRARY_EXEC is set, the API must defer to operator
intent and forward the request to go2rtc instead of short-circuiting with 400."""
app = self._make_app(_MULTI_CAMERA_CONFIG)
mock_response = type("R", (), {"ok": True, "status_code": 200, "text": "ok"})()
with patch.dict(os.environ, {"GO2RTC_ALLOW_ARBITRARY_EXEC": "true"}):
with patch(
"frigate.api.camera.requests.put", return_value=mock_response
) as mock_put:
with AuthTestClient(app) as client:
resp = client.put("/go2rtc/streams/legit?src=exec:/tmp/something")
assert resp.status_code == 200, (
f"Restricted src should be forwarded when override set; got {resp.status_code}"
)
mock_put.assert_called_once()
forwarded_src = mock_put.call_args.kwargs["params"]["src"]
assert forwarded_src == "exec:/tmp/something"
def test_stream_alias_blocked_when_owning_camera_disallowed(self): def test_stream_alias_blocked_when_owning_camera_disallowed(self):
"""limited_user cannot access a stream alias that belongs to a camera they """limited_user cannot access a stream alias that belongs to a camera they
are not allowed to see.""" are not allowed to see."""

View File

@ -778,9 +778,39 @@ def get_hailo_temps() -> dict[str, float]:
return temps return temps
def is_restricted_source(stream_source: str) -> bool: def _go2rtc_arbitrary_exec_allowed() -> bool:
"""Check if a stream source is restricted (echo, expr, or exec).""" """Read the GO2RTC_ALLOW_ARBITRARY_EXEC override from env, docker
return stream_source.strip().startswith(("echo:", "expr:", "exec:")) secrets, or the Home Assistant add-on options file."""
raw: Optional[str] = None
if "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.environ:
raw = os.environ.get("GO2RTC_ALLOW_ARBITRARY_EXEC")
elif (
os.path.isdir("/run/secrets")
and os.access("/run/secrets", os.R_OK)
and "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.listdir("/run/secrets")
):
try:
with open("/run/secrets/GO2RTC_ALLOW_ARBITRARY_EXEC") as f:
raw = f.read().strip()
except OSError:
raw = None
elif os.path.isfile("/data/options.json"):
try:
with open("/data/options.json") as f:
options = json.loads(f.read())
raw = options.get("go2rtc_allow_arbitrary_exec")
except (OSError, json.JSONDecodeError):
raw = None
return raw is not None and str(raw).lower() in ("true", "1", "yes")
def is_restricted_go2rtc_source(stream_source: str) -> bool:
"""Check if a stream source is a restricted type (echo, expr, or exec)
and the GO2RTC_ALLOW_ARBITRARY_EXEC override is not set."""
if not stream_source.strip().startswith(("echo:", "expr:", "exec:")):
return False
return not _go2rtc_arbitrary_exec_allowed()
def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedProcess: def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedProcess: