reject restricted go2rtc stream sources when added via api

This commit is contained in:
Josh Hawkins 2026-05-15 09:30:36 -05:00
parent 3c71816fd0
commit e1334188e3
4 changed files with 47 additions and 6 deletions

View File

@ -17,6 +17,7 @@ from frigate.const import (
) )
from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_encode from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_encode
from frigate.util.config import find_config_file from frigate.util.config import find_config_file
from frigate.util.services import is_restricted_source
sys.path.remove("/opt/frigate") sys.path.remove("/opt/frigate")
@ -135,11 +136,6 @@ if LIBAVFORMAT_VERSION_MAJOR < 59:
go2rtc_config["ffmpeg"]["rtsp"] = rtsp_args go2rtc_config["ffmpeg"]["rtsp"] = rtsp_args
def is_restricted_source(stream_source: str) -> bool:
"""Check if a stream source is restricted (echo, expr, or exec)."""
return stream_source.strip().startswith(("echo:", "expr:", "exec:"))
for name in list(go2rtc_config.get("streams", {})): for name in list(go2rtc_config.get("streams", {})):
stream = go2rtc_config["streams"][name] stream = go2rtc_config["streams"][name]

View File

@ -24,7 +24,7 @@ from frigate.api.defs.tags import Tags
from frigate.config.config import FrigateConfig from frigate.config.config import FrigateConfig
from frigate.util.builtin import clean_camera_user_pass from frigate.util.builtin import clean_camera_user_pass
from frigate.util.image import run_ffmpeg_snapshot from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import ffprobe_stream from frigate.util.services import ffprobe_stream, is_restricted_source
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -111,6 +111,19 @@ def go2rtc_camera_stream(request: Request, stream_name: str):
) )
def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""): def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""):
"""Add or update a go2rtc stream configuration.""" """Add or update a go2rtc stream configuration."""
if src and is_restricted_source(src):
logger.warning(
"Rejected go2rtc stream '%s' with restricted source type (echo/expr/exec)",
stream_name,
)
return JSONResponse(
content={
"success": False,
"message": "Restricted stream source type",
},
status_code=400,
)
try: try:
params = {"name": stream_name} params = {"name": stream_name}
if src: if src:

View File

@ -357,6 +357,33 @@ class TestGo2rtcStreamAccess(BaseTestHttp):
f"got {resp.status_code}" f"got {resp.status_code}"
) )
def test_add_stream_rejects_restricted_source(self):
"""PUT /go2rtc/streams must reject exec:/echo:/expr: sources even for
admins"""
app = self._make_app(_MULTI_CAMERA_CONFIG)
with AuthTestClient(app) as client:
for src in (
"exec:/tmp/rev.sh",
"echo:foo",
"expr:bar",
" exec:/tmp/rev.sh",
):
resp = client.put(f"/go2rtc/streams/revshell?src={src}")
assert resp.status_code == 400, (
f"Expected 400 for restricted src {src!r}; got {resp.status_code}"
)
assert resp.json().get("success") is False
def test_add_stream_allows_non_restricted_source(self):
"""A normal stream URL should pass the restricted-source check and reach
the (unavailable in tests) go2rtc proxy so we expect 500, not 400."""
app = self._make_app(_MULTI_CAMERA_CONFIG)
with AuthTestClient(app) as client:
resp = client.put("/go2rtc/streams/legit?src=rtsp://10.0.0.1:554/video")
assert resp.status_code != 400, (
f"Non-restricted source should not be rejected with 400; got {resp.status_code}"
)
def test_stream_alias_blocked_when_owning_camera_disallowed(self): def test_stream_alias_blocked_when_owning_camera_disallowed(self):
"""limited_user cannot access a stream alias that belongs to a camera they """limited_user cannot access a stream alias that belongs to a camera they
are not allowed to see.""" are not allowed to see."""

View File

@ -556,6 +556,11 @@ def get_jetson_stats() -> Optional[dict[int, dict]]:
return results return results
def is_restricted_source(stream_source: str) -> bool:
"""Check if a stream source is restricted (echo, expr, or exec)."""
return stream_source.strip().startswith(("echo:", "expr:", "exec:"))
def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedProcess: def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedProcess:
"""Run ffprobe on stream.""" """Run ffprobe on stream."""
clean_path = escape_special_characters(path) clean_path = escape_special_characters(path)