Implement cross-camera safety for indexed media folders (#23164)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions

* Implement cross-camera safety for indexed media folders

* Cleanup

* Improve robustness
This commit is contained in:
Nicolas Mowen 2026-05-11 13:52:18 -06:00 committed by GitHub
parent e9432d55e8
commit c67170aa20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 696 additions and 10 deletions

View File

@ -26,6 +26,7 @@ from frigate.api.defs.request.app_body import (
AppPutRoleBody,
)
from frigate.api.defs.tags import Tags
from frigate.api.media_auth import check_camera_access, deny_response_for_media_uri
from frigate.config import AuthConfig, NetworkingConfig, ProxyConfig
from frigate.const import CONFIG_DIR, JWT_SECRET_ENV_VAR, PASSWORD_HASH_ALGORITHM
from frigate.models import User
@ -633,6 +634,9 @@ def auth(request: Request):
logger.debug("X-Proxy-Secret header does not match configured secret value")
return fail_response
original_url = request.headers.get("x-original-url")
frigate_config = request.app.frigate_config
# if auth is disabled, just apply the proxy header map and return success
if not auth_config.enabled:
# pass the user header value from the upstream proxy if a mapping is specified
@ -649,6 +653,11 @@ def auth(request: Request):
role = resolve_role(request.headers, proxy_config, config_roles_set)
success_response.headers["remote-role"] = role
deny_status = deny_response_for_media_uri(original_url, role, frigate_config)
if deny_status is not None:
return Response("", status_code=deny_status)
return success_response
# now apply authentication
@ -743,6 +752,11 @@ def auth(request: Request):
success_response.headers["remote-user"] = user
success_response.headers["remote-role"] = role
deny_status = deny_response_for_media_uri(original_url, role, frigate_config)
if deny_status is not None:
return Response("", status_code=deny_status)
return success_response
except Exception as e:
logger.error(f"Error parsing jwt: {e}")
@ -1069,15 +1083,15 @@ async def require_camera_access(
raise HTTPException(status_code=current_user.status_code, detail=detail)
role = current_user["role"]
all_camera_names = set(request.app.frigate_config.cameras.keys())
roles_dict = request.app.frigate_config.auth.roles
allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names)
frigate_config = request.app.frigate_config
# Admin or full access bypasses
if role == "admin" or not roles_dict.get(role):
if check_camera_access(role, camera_name, frigate_config):
return
if camera_name not in allowed_cameras:
all_camera_names = set(frigate_config.cameras.keys())
allowed_cameras = User.get_allowed_cameras(
role, frigate_config.auth.roles, all_camera_names
)
raise HTTPException(
status_code=403,
detail=f"Access denied to camera '{camera_name}'. Allowed: {allowed_cameras}",

291
frigate/api/media_auth.py Normal file
View File

@ -0,0 +1,291 @@
"""URI-aware authorization for nginx-served static media.
The `/auth` endpoint (used as nginx `auth_request` target) calls into this
module to classify the requested URI from the `X-Original-URL` header and, for
camera-scoped resources, decide whether the current role may access them.
Without this, `auth_request` only verifies the JWT every authenticated user
could read clips, recordings, and exports for *any* camera, bypassing the
per-camera authorization the regular API enforces via `require_camera_access`.
"""
from __future__ import annotations
import logging
import os
from enum import Enum
from typing import Optional
from urllib.parse import unquote, urlparse
from peewee import DoesNotExist
from frigate.config import FrigateConfig
from frigate.const import EXPORT_DIR
from frigate.models import Export, User
logger = logging.getLogger(__name__)
class MediaAuthResolution(str, Enum):
"""Classification of an `X-Original-URL` path for media-auth purposes."""
CAMERA = "camera"
ADMIN_ONLY = "admin_only"
LISTING_MULTI_CAMERA = "listing_multi_camera"
LISTING_NEUTRAL = "listing_neutral"
# Under a recognized media root (/clips, /recordings, /exports) but
# unclassifiable (unknown subtree, no matching DB row, DB error).
# Restricted users are denied; admins/full-access roles are allowed
# (nginx will likely return 404 if the file genuinely doesn't exist).
UNRESOLVED_MEDIA = "unresolved_media"
# Not a media URI at all (e.g. /api/events, /login).
UNKNOWN = "unknown"
def extract_path(original_url: Optional[str]) -> Optional[str]:
"""Return the decoded path component of nginx's `X-Original-URL` header.
nginx forwards the *raw* request URI (with `..` segments intact) via
`$request_uri`. nginx normalizes the path before serving the file, so a
request like `/recordings/.../allowed_cam/../forbidden_cam/file.mp4`
would (1) parse as the allowed camera in our auth check, (2) be served
as the forbidden camera by nginx. To close the bypass we reject any URI
whose path contains `.` or `..` segments outright.
"""
if not original_url:
return None
parsed = urlparse(original_url)
raw_path = parsed.path or original_url
decoded = unquote(raw_path)
if not decoded:
return None
if not decoded.startswith("/"):
decoded = "/" + decoded
segments = decoded.split("/")
if ".." in segments or "." in segments:
return None
return decoded
def resolve_media_uri(
uri: str, frigate_config: Optional[FrigateConfig] = None
) -> tuple[MediaAuthResolution, Optional[str]]:
"""Classify a URI and return the owning camera if applicable.
`frigate_config` is used to disambiguate clip/review filenames whose
camera name contains hyphens by matching against the longest configured
camera-name prefix.
"""
if not uri:
return MediaAuthResolution.UNKNOWN, None
parts = [p for p in uri.split("/") if p]
if not parts:
return MediaAuthResolution.UNKNOWN, None
root = parts[0]
if root == "recordings":
return _resolve_recording(parts)
if root == "clips":
return _resolve_clip(parts, frigate_config)
if root == "exports":
return _resolve_export(parts)
return MediaAuthResolution.UNKNOWN, None
def _resolve_recording(
parts: list[str],
) -> tuple[MediaAuthResolution, Optional[str]]:
# /recordings → neutral
# /recordings/{date} → neutral
# /recordings/{date}/{hour} → multi-camera listing
# /recordings/{date}/{hour}/{cam}/... → camera
if len(parts) <= 2:
return MediaAuthResolution.LISTING_NEUTRAL, None
if len(parts) == 3:
return MediaAuthResolution.LISTING_MULTI_CAMERA, None
return MediaAuthResolution.CAMERA, parts[3]
def _resolve_clip(
parts: list[str], frigate_config: Optional[FrigateConfig]
) -> tuple[MediaAuthResolution, Optional[str]]:
# /clips → multi-camera listing
# /clips/thumbs/{cam}/... → camera
# /clips/previews/{cam}/... → camera
# /clips/review/thumb-{cam}-{review_id}.webp → camera (parsed)
# /clips/faces/... → admin-only
# /clips/genai-requests/... → admin-only
# /clips/preview_restart_cache/... → admin-only
# /clips/{model}/train|dataset/... → admin-only
# /clips/{cam}-{event_id}[-clean].{ext} → camera (parsed)
# other /clips/{subdir}/... → unresolved (deny restricted)
if len(parts) == 1:
return MediaAuthResolution.LISTING_MULTI_CAMERA, None
second = parts[1]
if second in ("thumbs", "previews"):
if len(parts) == 2:
return MediaAuthResolution.LISTING_MULTI_CAMERA, None
return MediaAuthResolution.CAMERA, parts[2]
if second == "review":
if len(parts) == 2:
return MediaAuthResolution.LISTING_MULTI_CAMERA, None
camera = _camera_from_thumb_filename(parts[2], frigate_config)
if camera:
return MediaAuthResolution.CAMERA, camera
return MediaAuthResolution.UNRESOLVED_MEDIA, None
if second in ("faces", "genai-requests", "preview_restart_cache"):
return MediaAuthResolution.ADMIN_ONLY, None
if len(parts) >= 3 and parts[2] in ("train", "dataset"):
return MediaAuthResolution.ADMIN_ONLY, None
if len(parts) == 2:
camera = _camera_from_clip_filename(second, frigate_config)
if camera:
return MediaAuthResolution.CAMERA, camera
return MediaAuthResolution.UNRESOLVED_MEDIA, None
return MediaAuthResolution.UNRESOLVED_MEDIA, None
def _longest_prefix_camera(
stem: str, frigate_config: Optional[FrigateConfig]
) -> Optional[str]:
if frigate_config is None:
return None
for cam in sorted(frigate_config.cameras.keys(), key=len, reverse=True):
if stem.startswith(cam + "-"):
return cam
return None
def _camera_from_clip_filename(
filename: str, frigate_config: Optional[FrigateConfig]
) -> Optional[str]:
"""Match a flat clip filename `{camera}-{event_id}[-clean].{ext}` against
configured camera names. Longest-prefix wins so camera names containing
hyphens (e.g. `front-door`) resolve correctly.
"""
dot = filename.rfind(".")
stem = filename[:dot] if dot > 0 else filename
return _longest_prefix_camera(stem, frigate_config)
def _camera_from_thumb_filename(
filename: str, frigate_config: Optional[FrigateConfig]
) -> Optional[str]:
"""Match a review thumbnail filename `thumb-{camera}-{review_id}.webp`."""
if not filename.startswith("thumb-"):
return None
dot = filename.rfind(".")
stem = filename[len("thumb-") : dot] if dot > 0 else filename[len("thumb-") :]
return _longest_prefix_camera(stem, frigate_config)
def _resolve_export(
parts: list[str],
) -> tuple[MediaAuthResolution, Optional[str]]:
# /exports → multi-camera listing
# /exports/{filename}.mp4 → camera (DB lookup by exact path)
if len(parts) == 1:
return MediaAuthResolution.LISTING_MULTI_CAMERA, None
if len(parts) != 2:
return MediaAuthResolution.UNRESOLVED_MEDIA, None
filename = parts[1]
full_path = os.path.join(EXPORT_DIR, filename)
try:
export = Export.get(Export.video_path == full_path)
return MediaAuthResolution.CAMERA, export.camera
except DoesNotExist:
return MediaAuthResolution.UNRESOLVED_MEDIA, None
except Exception as e:
logger.warning("Export DB lookup failed for %s: %s", filename, e)
return MediaAuthResolution.UNRESOLVED_MEDIA, None
def check_camera_access(role: str, camera: str, frigate_config: FrigateConfig) -> bool:
"""Return True iff `role` may access `camera`.
Mirrors the gating logic in `require_camera_access`: admin and any role
without a non-empty allow-list bypass the check.
"""
if role == "admin":
return True
roles_dict = frigate_config.auth.roles
if not roles_dict.get(role):
return True
all_camera_names = set(frigate_config.cameras.keys())
allowed = User.get_allowed_cameras(role, roles_dict, all_camera_names)
return camera in allowed
def is_role_restricted(role: str, frigate_config: FrigateConfig) -> bool:
"""True if `role` has a non-empty allow-list (i.e. not full-access)."""
if role == "admin":
return False
return bool(frigate_config.auth.roles.get(role))
def deny_response_for_media_uri(
original_url: Optional[str], role: Optional[str], frigate_config: FrigateConfig
) -> Optional[int]:
"""Decide whether the current role should be blocked from `original_url`.
Returns an HTTP status code (403) when access should be denied, or `None`
when the request is allowed.
"""
if not original_url:
return None
path = extract_path(original_url)
# `extract_path` returns None for URIs containing `.` or `..` segments.
# For media-root URIs that's a traversal attempt — deny outright. For
# non-media URIs, pass through (nginx / the backend handle them).
if path is None:
raw = urlparse(original_url).path or original_url
decoded = unquote(raw)
first = decoded.lstrip("/").split("/", 1)[0] if decoded else ""
if first in ("clips", "recordings", "exports"):
return 403
return None
resolution, camera = resolve_media_uri(path, frigate_config)
if resolution == MediaAuthResolution.UNKNOWN:
return None
if not role or role == "admin":
return None
if not is_role_restricted(role, frigate_config):
return None
if resolution == MediaAuthResolution.LISTING_NEUTRAL:
return None
if resolution in (
MediaAuthResolution.LISTING_MULTI_CAMERA,
MediaAuthResolution.ADMIN_ONLY,
MediaAuthResolution.UNRESOLVED_MEDIA,
):
return 403
if resolution == MediaAuthResolution.CAMERA:
if camera and check_camera_access(role, camera, frigate_config):
return None
return 403
return 403

View File

@ -0,0 +1,381 @@
"""Unit tests for `frigate.api.media_auth`.
Covers URI classification, the role-vs-camera decision matrix, and the export
DB-lookup path. These are pure functions/DB lookups no HTTP stack involved.
"""
import datetime
import logging
import os
import unittest
from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.api.media_auth import (
MediaAuthResolution,
deny_response_for_media_uri,
extract_path,
resolve_media_uri,
)
from frigate.config import FrigateConfig
from frigate.models import Event, Export, Recordings, ReviewSegment
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
_CONFIG = {
"mqtt": {"host": "mqtt"},
"auth": {"roles": {"limited_user": ["front_door"]}},
"cameras": {
"front_door": {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
"back_door": {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
# Camera name with a hyphen — exercises longest-prefix match.
"back-yard": {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.3:554/video", "roles": ["detect"]}]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
},
}
class TestExtractPath(unittest.TestCase):
def test_full_url(self):
self.assertEqual(
extract_path("http://host:8971/clips/front_door-1.jpg"),
"/clips/front_door-1.jpg",
)
def test_strips_query_string(self):
self.assertEqual(
extract_path("http://h/recordings/2026-05-11/14/front_door/00.00.mp4?t=1"),
"/recordings/2026-05-11/14/front_door/00.00.mp4",
)
def test_path_only(self):
self.assertEqual(extract_path("/exports/x.mp4"), "/exports/x.mp4")
def test_percent_decoded(self):
self.assertEqual(
extract_path("http://h/clips/front%20door-1.jpg"),
"/clips/front door-1.jpg",
)
def test_empty(self):
self.assertIsNone(extract_path(None))
self.assertIsNone(extract_path(""))
class TestResolveMediaUri(unittest.TestCase):
def setUp(self):
self.config = FrigateConfig(**_CONFIG)
def _assert(self, uri, resolution, camera=None):
got_resolution, got_camera = resolve_media_uri(uri, self.config)
self.assertEqual(got_resolution, resolution, uri)
self.assertEqual(got_camera, camera, uri)
def test_unknown_paths(self):
self._assert("/api/events", MediaAuthResolution.UNKNOWN)
self._assert("/", MediaAuthResolution.UNKNOWN)
self._assert("", MediaAuthResolution.UNKNOWN)
def test_recordings(self):
self._assert("/recordings/", MediaAuthResolution.LISTING_NEUTRAL)
self._assert("/recordings/2026-05-11/", MediaAuthResolution.LISTING_NEUTRAL)
self._assert(
"/recordings/2026-05-11/14/", MediaAuthResolution.LISTING_MULTI_CAMERA
)
self._assert(
"/recordings/2026-05-11/14/front_door/",
MediaAuthResolution.CAMERA,
camera="front_door",
)
self._assert(
"/recordings/2026-05-11/14/back_door/00.00.mp4",
MediaAuthResolution.CAMERA,
camera="back_door",
)
def test_clip_flat_filename_resolves_camera(self):
self._assert(
"/clips/front_door-1234.jpg",
MediaAuthResolution.CAMERA,
camera="front_door",
)
self._assert(
"/clips/back_door-1234-clean.webp",
MediaAuthResolution.CAMERA,
camera="back_door",
)
def test_clip_filename_with_hyphenated_camera_name(self):
# Camera name "back-yard" itself contains a hyphen; longest-prefix
# match must pick `back-yard`, not the bogus `back` prefix.
self._assert(
"/clips/back-yard-1234.jpg",
MediaAuthResolution.CAMERA,
camera="back-yard",
)
def test_clip_filename_no_matching_camera(self):
# Looks like a media path but couldn't classify — fail closed for
# restricted users (UNRESOLVED_MEDIA), not pass-through.
self._assert(
"/clips/nonexistent-1234.jpg", MediaAuthResolution.UNRESOLVED_MEDIA
)
def test_clip_thumbs(self):
self._assert("/clips/thumbs/", MediaAuthResolution.LISTING_MULTI_CAMERA)
self._assert(
"/clips/thumbs/front_door/",
MediaAuthResolution.CAMERA,
camera="front_door",
)
self._assert(
"/clips/thumbs/back_door/abc.webp",
MediaAuthResolution.CAMERA,
camera="back_door",
)
def test_clip_previews(self):
self._assert("/clips/previews/", MediaAuthResolution.LISTING_MULTI_CAMERA)
self._assert(
"/clips/previews/front_door/",
MediaAuthResolution.CAMERA,
camera="front_door",
)
self._assert(
"/clips/previews/back_door/segment.mp4",
MediaAuthResolution.CAMERA,
camera="back_door",
)
def test_clip_review_thumbs(self):
# Format: /clips/review/thumb-{camera}-{review_id}.webp (frigate/review/maintainer.py).
self._assert(
"/clips/review/thumb-front_door-abc123.webp",
MediaAuthResolution.CAMERA,
camera="front_door",
)
# Hyphenated camera name — longest-prefix match.
self._assert(
"/clips/review/thumb-back-yard-abc123.webp",
MediaAuthResolution.CAMERA,
camera="back-yard",
)
# Unknown camera prefix → unresolved, not allowed for restricted users.
self._assert(
"/clips/review/thumb-unknown-cam-abc123.webp",
MediaAuthResolution.UNRESOLVED_MEDIA,
)
def test_clip_admin_only_subtrees(self):
self._assert("/clips/faces/train/foo.webp", MediaAuthResolution.ADMIN_ONLY)
self._assert("/clips/faces/", MediaAuthResolution.ADMIN_ONLY)
self._assert("/clips/genai-requests/x/0.webp", MediaAuthResolution.ADMIN_ONLY)
self._assert(
"/clips/preview_restart_cache/x.mp4", MediaAuthResolution.ADMIN_ONLY
)
self._assert("/clips/some_model/train/x.jpg", MediaAuthResolution.ADMIN_ONLY)
self._assert("/clips/some_model/dataset/x.jpg", MediaAuthResolution.ADMIN_ONLY)
def test_clip_unknown_subtree_is_unresolved(self):
# Unknown /clips/{x}/{y}/... subtree falls through as unresolved (not
# admin-only) so restricted users get 403 without admins being denied
# access to legitimate but unrecognized resources.
self._assert("/clips/random_dir/foo.jpg", MediaAuthResolution.UNRESOLVED_MEDIA)
def test_clip_top_level_listing(self):
self._assert("/clips/", MediaAuthResolution.LISTING_MULTI_CAMERA)
def test_exports_listing(self):
self._assert("/exports/", MediaAuthResolution.LISTING_MULTI_CAMERA)
class TestExportResolution(unittest.TestCase):
"""Export resolution requires a DB lookup."""
def setUp(self):
migrate_db = SqliteExtDatabase("test.db")
del logging.getLogger("peewee_migrate").handlers[:]
Router(migrate_db).run()
migrate_db.close()
self.db = SqliteQueueDatabase(TEST_DB)
self.db.bind([Event, ReviewSegment, Recordings, Export])
self.config = FrigateConfig(**_CONFIG)
def tearDown(self):
if not self.db.is_closed():
self.db.close()
for f in TEST_DB_CLEANUPS:
try:
os.remove(f)
except OSError:
pass
def _insert_export(self, export_id, camera, filename):
Export.insert(
id=export_id,
camera=camera,
name=f"export-{export_id}",
date=datetime.datetime.now(),
video_path=f"/media/frigate/exports/{filename}",
thumb_path=f"/media/frigate/exports/{filename}.jpg",
in_progress=False,
).execute()
def test_export_resolves_camera(self):
self._insert_export(
"exp1", "back_door", "back_door_20260511_140000-20260511_150000_abc123.mp4"
)
resolution, camera = resolve_media_uri(
"/exports/back_door_20260511_140000-20260511_150000_abc123.mp4",
self.config,
)
self.assertEqual(resolution, MediaAuthResolution.CAMERA)
self.assertEqual(camera, "back_door")
def test_unknown_export_is_unresolved(self):
# No matching row → UNRESOLVED_MEDIA (fail closed for restricted users),
# not UNKNOWN (which would pass-through).
resolution, camera = resolve_media_uri(
"/exports/does_not_exist.mp4", self.config
)
self.assertEqual(resolution, MediaAuthResolution.UNRESOLVED_MEDIA)
self.assertIsNone(camera)
def test_export_anchored_match_not_endswith(self):
# Anchored exact-path equality must NOT match by filename suffix.
# A request like /exports/clip.mp4 must not authorize against a row at
# /media/frigate/exports/back_door_clip.mp4 just because the suffix matches.
self._insert_export("exp_bd", "back_door", "back_door_clip.mp4")
self._insert_export("exp_fd", "front_door", "front_door_clip.mp4")
resolution, _ = resolve_media_uri("/exports/clip.mp4", self.config)
self.assertEqual(resolution, MediaAuthResolution.UNRESOLVED_MEDIA)
class TestDenyResponseForMediaUri(unittest.TestCase):
"""End-to-end decision check used by /auth."""
def setUp(self):
self.config = FrigateConfig(**_CONFIG)
def _deny(self, url, role):
return deny_response_for_media_uri(url, role, self.config)
def test_admin_always_allowed(self):
self.assertIsNone(self._deny("/clips/back_door-1.jpg", "admin"))
self.assertIsNone(self._deny("/clips/", "admin"))
self.assertIsNone(self._deny("/clips/faces/x.webp", "admin"))
self.assertIsNone(
self._deny("/recordings/2026-05-11/14/back_door/00.00.mp4", "admin")
)
def test_unrestricted_role_allowed(self):
# "viewer" role has no entry in roles_dict → full access (matches the
# behavior of require_camera_access).
self.assertIsNone(self._deny("/clips/back_door-1.jpg", "viewer"))
self.assertIsNone(self._deny("/clips/", "viewer"))
def test_restricted_role_allowed_camera(self):
self.assertIsNone(self._deny("/clips/front_door-1.jpg", "limited_user"))
self.assertIsNone(
self._deny("/recordings/2026-05-11/14/front_door/00.00.mp4", "limited_user")
)
self.assertIsNone(
self._deny("/clips/thumbs/front_door/abc.webp", "limited_user")
)
def test_restricted_role_blocked_other_camera(self):
self.assertEqual(self._deny("/clips/back_door-1.jpg", "limited_user"), 403)
self.assertEqual(
self._deny("/recordings/2026-05-11/14/back_door/00.00.mp4", "limited_user"),
403,
)
self.assertEqual(
self._deny("/clips/thumbs/back_door/abc.webp", "limited_user"), 403
)
def test_restricted_role_blocked_admin_only(self):
self.assertEqual(self._deny("/clips/faces/train/foo.webp", "limited_user"), 403)
def test_restricted_role_blocked_multi_camera_listing(self):
self.assertEqual(self._deny("/clips/", "limited_user"), 403)
self.assertEqual(self._deny("/exports/", "limited_user"), 403)
self.assertEqual(self._deny("/recordings/2026-05-11/14/", "limited_user"), 403)
def test_restricted_role_allowed_neutral_listing(self):
self.assertIsNone(self._deny("/recordings/", "limited_user"))
self.assertIsNone(self._deny("/recordings/2026-05-11/", "limited_user"))
def test_non_media_uri_passes_through(self):
self.assertIsNone(self._deny("/api/events", "limited_user"))
self.assertIsNone(self._deny("http://h/login", "limited_user"))
def test_missing_header(self):
self.assertIsNone(self._deny(None, "limited_user"))
self.assertIsNone(self._deny("", "limited_user"))
def test_traversal_in_media_uri_denied_for_all_roles(self):
# Bypass attempt: parts[3] looks like an allowed camera, but the
# normalized path nginx would serve points at a forbidden camera.
# Both restricted and admin should be denied — the URI is malformed
# and we refuse to make an auth decision against it.
traversal_uris = [
"/recordings/2026-05-11/14/front_door/../back_door/00.00.mp4",
"/clips/front_door-1.jpg/../back_door-1.jpg",
"/exports/../recordings/2026-05-11/14/back_door/00.00.mp4",
"/clips/./back_door-1.jpg",
]
for uri in traversal_uris:
self.assertEqual(self._deny(uri, "limited_user"), 403, uri)
self.assertEqual(self._deny(uri, "admin"), 403, uri)
self.assertEqual(self._deny(uri, "viewer"), 403, uri)
def test_traversal_outside_media_passes_through(self):
# `..` in non-media URIs is not our problem; the backend handles it.
self.assertIsNone(self._deny("/api/foo/../bar", "limited_user"))
def test_percent_encoded_traversal_denied(self):
# nginx may decode percent-encoded `%2E%2E` to `..` before serving;
# we must apply the same denial after percent-decoding.
self.assertEqual(
self._deny(
"/recordings/2026-05-11/14/front_door/%2E%2E/back_door/00.mp4",
"limited_user",
),
403,
)
def test_unresolved_media_fails_closed_for_restricted(self):
# Restricted user requesting a media URI we can't classify (no DB row,
# unknown clip prefix, unknown clip subtree) must be denied.
self.assertEqual(self._deny("/clips/nonexistent-1.jpg", "limited_user"), 403)
self.assertEqual(self._deny("/clips/random_dir/foo.jpg", "limited_user"), 403)
self.assertEqual(
self._deny("/clips/review/thumb-unknown_cam-1.webp", "limited_user"),
403,
)
def test_unresolved_media_allowed_for_admin(self):
# Admin and full-access roles are *not* denied on UNRESOLVED_MEDIA —
# nginx returns 404 if the file doesn't exist on disk anyway, and we
# don't want a stale DB to lock out admins.
self.assertIsNone(self._deny("/clips/nonexistent-1.jpg", "admin"))
self.assertIsNone(self._deny("/clips/nonexistent-1.jpg", "viewer"))
if __name__ == "__main__":
unittest.main()