add env var check function

This commit is contained in:
Josh Hawkins 2026-05-15 09:51:51 -05:00
parent e1334188e3
commit 03d2525859
4 changed files with 57 additions and 33 deletions

View File

@ -17,37 +17,12 @@ from frigate.const import (
)
from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_encode
from frigate.util.config import find_config_file
from frigate.util.services import is_restricted_source
from frigate.util.services import is_restricted_go2rtc_source
sys.path.remove("/opt/frigate")
yaml = YAML()
# Check if arbitrary exec sources are allowed (defaults to False for security)
allow_arbitrary_exec = None
if "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.environ:
allow_arbitrary_exec = os.environ.get("GO2RTC_ALLOW_ARBITRARY_EXEC")
elif (
os.path.isdir("/run/secrets")
and os.access("/run/secrets", os.R_OK)
and "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.listdir("/run/secrets")
):
allow_arbitrary_exec = (
Path(os.path.join("/run/secrets", "GO2RTC_ALLOW_ARBITRARY_EXEC"))
.read_text()
.strip()
)
# check for the add-on options file
elif os.path.isfile("/data/options.json"):
with open("/data/options.json") as f:
raw_options = f.read()
options = json.loads(raw_options)
allow_arbitrary_exec = options.get("go2rtc_allow_arbitrary_exec")
ALLOW_ARBITRARY_EXEC = allow_arbitrary_exec is not None and str(
allow_arbitrary_exec
).lower() in ("true", "1", "yes")
FRIGATE_ENV_VARS = {k: v for k, v in os.environ.items() if k.startswith("FRIGATE_")}
# read docker secret files as env vars too
if os.path.isdir("/run/secrets"):
@ -142,7 +117,7 @@ for name in list(go2rtc_config.get("streams", {})):
if isinstance(stream, str):
try:
formatted_stream = stream.format(**FRIGATE_ENV_VARS)
if not ALLOW_ARBITRARY_EXEC and is_restricted_source(formatted_stream):
if is_restricted_go2rtc_source(formatted_stream):
print(
f"[ERROR] Stream '{name}' uses a restricted source (echo/expr/exec) which is disabled by default for security. "
f"Set GO2RTC_ALLOW_ARBITRARY_EXEC=true to enable arbitrary exec sources."
@ -161,7 +136,7 @@ for name in list(go2rtc_config.get("streams", {})):
for i, stream_item in enumerate(stream):
try:
formatted_stream = stream_item.format(**FRIGATE_ENV_VARS)
if not ALLOW_ARBITRARY_EXEC and is_restricted_source(formatted_stream):
if is_restricted_go2rtc_source(formatted_stream):
print(
f"[ERROR] Stream '{name}' item {i + 1} uses a restricted source (echo/expr/exec) which is disabled by default for security. "
f"Set GO2RTC_ALLOW_ARBITRARY_EXEC=true to enable arbitrary exec sources."

View File

@ -24,7 +24,7 @@ from frigate.api.defs.tags import Tags
from frigate.config.config import FrigateConfig
from frigate.util.builtin import clean_camera_user_pass
from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import ffprobe_stream, is_restricted_source
from frigate.util.services import ffprobe_stream, is_restricted_go2rtc_source
logger = logging.getLogger(__name__)
@ -111,7 +111,7 @@ def go2rtc_camera_stream(request: Request, stream_name: str):
)
def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""):
"""Add or update a go2rtc stream configuration."""
if src and is_restricted_source(src):
if src and is_restricted_go2rtc_source(src):
logger.warning(
"Rejected go2rtc stream '%s' with restricted source type (echo/expr/exec)",
stream_name,

View File

@ -1,3 +1,4 @@
import os
from unittest.mock import patch
from fastapi import HTTPException, Request
@ -384,6 +385,24 @@ class TestGo2rtcStreamAccess(BaseTestHttp):
f"Non-restricted source should not be rejected with 400; got {resp.status_code}"
)
def test_add_stream_allows_restricted_source_when_override_set(self):
"""When GO2RTC_ALLOW_ARBITRARY_EXEC is set, the API must defer to operator
intent and forward the request to go2rtc instead of short-circuiting with 400."""
app = self._make_app(_MULTI_CAMERA_CONFIG)
mock_response = type("R", (), {"ok": True, "status_code": 200, "text": "ok"})()
with patch.dict(os.environ, {"GO2RTC_ALLOW_ARBITRARY_EXEC": "true"}):
with patch(
"frigate.api.camera.requests.put", return_value=mock_response
) as mock_put:
with AuthTestClient(app) as client:
resp = client.put("/go2rtc/streams/legit?src=exec:/tmp/something")
assert resp.status_code == 200, (
f"Restricted src should be forwarded when override set; got {resp.status_code}"
)
mock_put.assert_called_once()
forwarded_src = mock_put.call_args.kwargs["params"]["src"]
assert forwarded_src == "exec:/tmp/something"
def test_stream_alias_blocked_when_owning_camera_disallowed(self):
"""limited_user cannot access a stream alias that belongs to a camera they
are not allowed to see."""

View File

@ -556,9 +556,39 @@ def get_jetson_stats() -> Optional[dict[int, dict]]:
return results
def is_restricted_source(stream_source: str) -> bool:
"""Check if a stream source is restricted (echo, expr, or exec)."""
return stream_source.strip().startswith(("echo:", "expr:", "exec:"))
def _go2rtc_arbitrary_exec_allowed() -> bool:
"""Read the GO2RTC_ALLOW_ARBITRARY_EXEC override from env, docker
secrets, or the Home Assistant add-on options file."""
raw: Optional[str] = None
if "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.environ:
raw = os.environ.get("GO2RTC_ALLOW_ARBITRARY_EXEC")
elif (
os.path.isdir("/run/secrets")
and os.access("/run/secrets", os.R_OK)
and "GO2RTC_ALLOW_ARBITRARY_EXEC" in os.listdir("/run/secrets")
):
try:
with open("/run/secrets/GO2RTC_ALLOW_ARBITRARY_EXEC") as f:
raw = f.read().strip()
except OSError:
raw = None
elif os.path.isfile("/data/options.json"):
try:
with open("/data/options.json") as f:
options = json.loads(f.read())
raw = options.get("go2rtc_allow_arbitrary_exec")
except (OSError, json.JSONDecodeError):
raw = None
return raw is not None and str(raw).lower() in ("true", "1", "yes")
def is_restricted_go2rtc_source(stream_source: str) -> bool:
"""Check if a stream source is a restricted type (echo, expr, or exec)
and the GO2RTC_ALLOW_ARBITRARY_EXEC override is not set."""
if not stream_source.strip().startswith(("echo:", "expr:", "exec:")):
return False
return not _go2rtc_arbitrary_exec_allowed()
def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedProcess: