diff --git a/frigate/api/auth.py b/frigate/api/auth.py index 641131208..eca51df1a 100644 --- a/frigate/api/auth.py +++ b/frigate/api/auth.py @@ -26,6 +26,7 @@ from frigate.api.defs.request.app_body import ( AppPutRoleBody, ) from frigate.api.defs.tags import Tags +from frigate.api.media_auth import check_camera_access, deny_response_for_media_uri from frigate.config import AuthConfig, NetworkingConfig, ProxyConfig from frigate.const import CONFIG_DIR, JWT_SECRET_ENV_VAR, PASSWORD_HASH_ALGORITHM from frigate.models import User @@ -633,6 +634,9 @@ def auth(request: Request): logger.debug("X-Proxy-Secret header does not match configured secret value") return fail_response + original_url = request.headers.get("x-original-url") + frigate_config = request.app.frigate_config + # if auth is disabled, just apply the proxy header map and return success if not auth_config.enabled: # pass the user header value from the upstream proxy if a mapping is specified @@ -649,6 +653,11 @@ def auth(request: Request): role = resolve_role(request.headers, proxy_config, config_roles_set) success_response.headers["remote-role"] = role + + deny_status = deny_response_for_media_uri(original_url, role, frigate_config) + if deny_status is not None: + return Response("", status_code=deny_status) + return success_response # now apply authentication @@ -743,6 +752,11 @@ def auth(request: Request): success_response.headers["remote-user"] = user success_response.headers["remote-role"] = role + + deny_status = deny_response_for_media_uri(original_url, role, frigate_config) + if deny_status is not None: + return Response("", status_code=deny_status) + return success_response except Exception as e: logger.error(f"Error parsing jwt: {e}") @@ -1069,19 +1083,19 @@ async def require_camera_access( raise HTTPException(status_code=current_user.status_code, detail=detail) role = current_user["role"] - all_camera_names = set(request.app.frigate_config.cameras.keys()) - roles_dict = request.app.frigate_config.auth.roles - allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names) + frigate_config = request.app.frigate_config - # Admin or full access bypasses - if role == "admin" or not roles_dict.get(role): + if check_camera_access(role, camera_name, frigate_config): return - if camera_name not in allowed_cameras: - raise HTTPException( - status_code=403, - detail=f"Access denied to camera '{camera_name}'. Allowed: {allowed_cameras}", - ) + all_camera_names = set(frigate_config.cameras.keys()) + allowed_cameras = User.get_allowed_cameras( + role, frigate_config.auth.roles, all_camera_names + ) + raise HTTPException( + status_code=403, + detail=f"Access denied to camera '{camera_name}'. Allowed: {allowed_cameras}", + ) def _get_stream_owner_cameras(request: Request, stream_name: str) -> set[str]: diff --git a/frigate/api/media_auth.py b/frigate/api/media_auth.py new file mode 100644 index 000000000..cc06eb75c --- /dev/null +++ b/frigate/api/media_auth.py @@ -0,0 +1,291 @@ +"""URI-aware authorization for nginx-served static media. + +The `/auth` endpoint (used as nginx `auth_request` target) calls into this +module to classify the requested URI from the `X-Original-URL` header and, for +camera-scoped resources, decide whether the current role may access them. + +Without this, `auth_request` only verifies the JWT — every authenticated user +could read clips, recordings, and exports for *any* camera, bypassing the +per-camera authorization the regular API enforces via `require_camera_access`. +""" + +from __future__ import annotations + +import logging +import os +from enum import Enum +from typing import Optional +from urllib.parse import unquote, urlparse + +from peewee import DoesNotExist + +from frigate.config import FrigateConfig +from frigate.const import EXPORT_DIR +from frigate.models import Export, User + +logger = logging.getLogger(__name__) + + +class MediaAuthResolution(str, Enum): + """Classification of an `X-Original-URL` path for media-auth purposes.""" + + CAMERA = "camera" + ADMIN_ONLY = "admin_only" + LISTING_MULTI_CAMERA = "listing_multi_camera" + LISTING_NEUTRAL = "listing_neutral" + # Under a recognized media root (/clips, /recordings, /exports) but + # unclassifiable (unknown subtree, no matching DB row, DB error). + # Restricted users are denied; admins/full-access roles are allowed + # (nginx will likely return 404 if the file genuinely doesn't exist). + UNRESOLVED_MEDIA = "unresolved_media" + # Not a media URI at all (e.g. /api/events, /login). + UNKNOWN = "unknown" + + +def extract_path(original_url: Optional[str]) -> Optional[str]: + """Return the decoded path component of nginx's `X-Original-URL` header. + + nginx forwards the *raw* request URI (with `..` segments intact) via + `$request_uri`. nginx normalizes the path before serving the file, so a + request like `/recordings/.../allowed_cam/../forbidden_cam/file.mp4` + would (1) parse as the allowed camera in our auth check, (2) be served + as the forbidden camera by nginx. To close the bypass we reject any URI + whose path contains `.` or `..` segments outright. + """ + if not original_url: + return None + + parsed = urlparse(original_url) + raw_path = parsed.path or original_url + decoded = unquote(raw_path) + if not decoded: + return None + + if not decoded.startswith("/"): + decoded = "/" + decoded + + segments = decoded.split("/") + if ".." in segments or "." in segments: + return None + + return decoded + + +def resolve_media_uri( + uri: str, frigate_config: Optional[FrigateConfig] = None +) -> tuple[MediaAuthResolution, Optional[str]]: + """Classify a URI and return the owning camera if applicable. + + `frigate_config` is used to disambiguate clip/review filenames whose + camera name contains hyphens by matching against the longest configured + camera-name prefix. + """ + if not uri: + return MediaAuthResolution.UNKNOWN, None + + parts = [p for p in uri.split("/") if p] + if not parts: + return MediaAuthResolution.UNKNOWN, None + + root = parts[0] + if root == "recordings": + return _resolve_recording(parts) + if root == "clips": + return _resolve_clip(parts, frigate_config) + if root == "exports": + return _resolve_export(parts) + + return MediaAuthResolution.UNKNOWN, None + + +def _resolve_recording( + parts: list[str], +) -> tuple[MediaAuthResolution, Optional[str]]: + # /recordings → neutral + # /recordings/{date} → neutral + # /recordings/{date}/{hour} → multi-camera listing + # /recordings/{date}/{hour}/{cam}/... → camera + if len(parts) <= 2: + return MediaAuthResolution.LISTING_NEUTRAL, None + if len(parts) == 3: + return MediaAuthResolution.LISTING_MULTI_CAMERA, None + return MediaAuthResolution.CAMERA, parts[3] + + +def _resolve_clip( + parts: list[str], frigate_config: Optional[FrigateConfig] +) -> tuple[MediaAuthResolution, Optional[str]]: + # /clips → multi-camera listing + # /clips/thumbs/{cam}/... → camera + # /clips/previews/{cam}/... → camera + # /clips/review/thumb-{cam}-{review_id}.webp → camera (parsed) + # /clips/faces/... → admin-only + # /clips/genai-requests/... → admin-only + # /clips/preview_restart_cache/... → admin-only + # /clips/{model}/train|dataset/... → admin-only + # /clips/{cam}-{event_id}[-clean].{ext} → camera (parsed) + # other /clips/{subdir}/... → unresolved (deny restricted) + if len(parts) == 1: + return MediaAuthResolution.LISTING_MULTI_CAMERA, None + + second = parts[1] + + if second in ("thumbs", "previews"): + if len(parts) == 2: + return MediaAuthResolution.LISTING_MULTI_CAMERA, None + return MediaAuthResolution.CAMERA, parts[2] + + if second == "review": + if len(parts) == 2: + return MediaAuthResolution.LISTING_MULTI_CAMERA, None + camera = _camera_from_thumb_filename(parts[2], frigate_config) + if camera: + return MediaAuthResolution.CAMERA, camera + return MediaAuthResolution.UNRESOLVED_MEDIA, None + + if second in ("faces", "genai-requests", "preview_restart_cache"): + return MediaAuthResolution.ADMIN_ONLY, None + + if len(parts) >= 3 and parts[2] in ("train", "dataset"): + return MediaAuthResolution.ADMIN_ONLY, None + + if len(parts) == 2: + camera = _camera_from_clip_filename(second, frigate_config) + if camera: + return MediaAuthResolution.CAMERA, camera + return MediaAuthResolution.UNRESOLVED_MEDIA, None + + return MediaAuthResolution.UNRESOLVED_MEDIA, None + + +def _longest_prefix_camera( + stem: str, frigate_config: Optional[FrigateConfig] +) -> Optional[str]: + if frigate_config is None: + return None + for cam in sorted(frigate_config.cameras.keys(), key=len, reverse=True): + if stem.startswith(cam + "-"): + return cam + return None + + +def _camera_from_clip_filename( + filename: str, frigate_config: Optional[FrigateConfig] +) -> Optional[str]: + """Match a flat clip filename `{camera}-{event_id}[-clean].{ext}` against + configured camera names. Longest-prefix wins so camera names containing + hyphens (e.g. `front-door`) resolve correctly. + """ + dot = filename.rfind(".") + stem = filename[:dot] if dot > 0 else filename + return _longest_prefix_camera(stem, frigate_config) + + +def _camera_from_thumb_filename( + filename: str, frigate_config: Optional[FrigateConfig] +) -> Optional[str]: + """Match a review thumbnail filename `thumb-{camera}-{review_id}.webp`.""" + if not filename.startswith("thumb-"): + return None + dot = filename.rfind(".") + stem = filename[len("thumb-") : dot] if dot > 0 else filename[len("thumb-") :] + return _longest_prefix_camera(stem, frigate_config) + + +def _resolve_export( + parts: list[str], +) -> tuple[MediaAuthResolution, Optional[str]]: + # /exports → multi-camera listing + # /exports/{filename}.mp4 → camera (DB lookup by exact path) + if len(parts) == 1: + return MediaAuthResolution.LISTING_MULTI_CAMERA, None + if len(parts) != 2: + return MediaAuthResolution.UNRESOLVED_MEDIA, None + + filename = parts[1] + full_path = os.path.join(EXPORT_DIR, filename) + try: + export = Export.get(Export.video_path == full_path) + return MediaAuthResolution.CAMERA, export.camera + except DoesNotExist: + return MediaAuthResolution.UNRESOLVED_MEDIA, None + except Exception as e: + logger.warning("Export DB lookup failed for %s: %s", filename, e) + return MediaAuthResolution.UNRESOLVED_MEDIA, None + + +def check_camera_access(role: str, camera: str, frigate_config: FrigateConfig) -> bool: + """Return True iff `role` may access `camera`. + + Mirrors the gating logic in `require_camera_access`: admin and any role + without a non-empty allow-list bypass the check. + """ + if role == "admin": + return True + + roles_dict = frigate_config.auth.roles + if not roles_dict.get(role): + return True + + all_camera_names = set(frigate_config.cameras.keys()) + allowed = User.get_allowed_cameras(role, roles_dict, all_camera_names) + return camera in allowed + + +def is_role_restricted(role: str, frigate_config: FrigateConfig) -> bool: + """True if `role` has a non-empty allow-list (i.e. not full-access).""" + if role == "admin": + return False + return bool(frigate_config.auth.roles.get(role)) + + +def deny_response_for_media_uri( + original_url: Optional[str], role: Optional[str], frigate_config: FrigateConfig +) -> Optional[int]: + """Decide whether the current role should be blocked from `original_url`. + + Returns an HTTP status code (403) when access should be denied, or `None` + when the request is allowed. + """ + if not original_url: + return None + + path = extract_path(original_url) + + # `extract_path` returns None for URIs containing `.` or `..` segments. + # For media-root URIs that's a traversal attempt — deny outright. For + # non-media URIs, pass through (nginx / the backend handle them). + if path is None: + raw = urlparse(original_url).path or original_url + decoded = unquote(raw) + first = decoded.lstrip("/").split("/", 1)[0] if decoded else "" + if first in ("clips", "recordings", "exports"): + return 403 + return None + + resolution, camera = resolve_media_uri(path, frigate_config) + if resolution == MediaAuthResolution.UNKNOWN: + return None + + if not role or role == "admin": + return None + + if not is_role_restricted(role, frigate_config): + return None + + if resolution == MediaAuthResolution.LISTING_NEUTRAL: + return None + + if resolution in ( + MediaAuthResolution.LISTING_MULTI_CAMERA, + MediaAuthResolution.ADMIN_ONLY, + MediaAuthResolution.UNRESOLVED_MEDIA, + ): + return 403 + + if resolution == MediaAuthResolution.CAMERA: + if camera and check_camera_access(role, camera, frigate_config): + return None + return 403 + + return 403 diff --git a/frigate/test/test_media_auth.py b/frigate/test/test_media_auth.py new file mode 100644 index 000000000..80dc6fb8b --- /dev/null +++ b/frigate/test/test_media_auth.py @@ -0,0 +1,381 @@ +"""Unit tests for `frigate.api.media_auth`. + +Covers URI classification, the role-vs-camera decision matrix, and the export +DB-lookup path. These are pure functions/DB lookups — no HTTP stack involved. +""" + +import datetime +import logging +import os +import unittest + +from peewee_migrate import Router +from playhouse.sqlite_ext import SqliteExtDatabase +from playhouse.sqliteq import SqliteQueueDatabase + +from frigate.api.media_auth import ( + MediaAuthResolution, + deny_response_for_media_uri, + extract_path, + resolve_media_uri, +) +from frigate.config import FrigateConfig +from frigate.models import Event, Export, Recordings, ReviewSegment +from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS + +_CONFIG = { + "mqtt": {"host": "mqtt"}, + "auth": {"roles": {"limited_user": ["front_door"]}}, + "cameras": { + "front_door": { + "ffmpeg": { + "inputs": [{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}] + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + "back_door": { + "ffmpeg": { + "inputs": [{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}] + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + # Camera name with a hyphen — exercises longest-prefix match. + "back-yard": { + "ffmpeg": { + "inputs": [{"path": "rtsp://10.0.0.3:554/video", "roles": ["detect"]}] + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + }, +} + + +class TestExtractPath(unittest.TestCase): + def test_full_url(self): + self.assertEqual( + extract_path("http://host:8971/clips/front_door-1.jpg"), + "/clips/front_door-1.jpg", + ) + + def test_strips_query_string(self): + self.assertEqual( + extract_path("http://h/recordings/2026-05-11/14/front_door/00.00.mp4?t=1"), + "/recordings/2026-05-11/14/front_door/00.00.mp4", + ) + + def test_path_only(self): + self.assertEqual(extract_path("/exports/x.mp4"), "/exports/x.mp4") + + def test_percent_decoded(self): + self.assertEqual( + extract_path("http://h/clips/front%20door-1.jpg"), + "/clips/front door-1.jpg", + ) + + def test_empty(self): + self.assertIsNone(extract_path(None)) + self.assertIsNone(extract_path("")) + + +class TestResolveMediaUri(unittest.TestCase): + def setUp(self): + self.config = FrigateConfig(**_CONFIG) + + def _assert(self, uri, resolution, camera=None): + got_resolution, got_camera = resolve_media_uri(uri, self.config) + self.assertEqual(got_resolution, resolution, uri) + self.assertEqual(got_camera, camera, uri) + + def test_unknown_paths(self): + self._assert("/api/events", MediaAuthResolution.UNKNOWN) + self._assert("/", MediaAuthResolution.UNKNOWN) + self._assert("", MediaAuthResolution.UNKNOWN) + + def test_recordings(self): + self._assert("/recordings/", MediaAuthResolution.LISTING_NEUTRAL) + self._assert("/recordings/2026-05-11/", MediaAuthResolution.LISTING_NEUTRAL) + self._assert( + "/recordings/2026-05-11/14/", MediaAuthResolution.LISTING_MULTI_CAMERA + ) + self._assert( + "/recordings/2026-05-11/14/front_door/", + MediaAuthResolution.CAMERA, + camera="front_door", + ) + self._assert( + "/recordings/2026-05-11/14/back_door/00.00.mp4", + MediaAuthResolution.CAMERA, + camera="back_door", + ) + + def test_clip_flat_filename_resolves_camera(self): + self._assert( + "/clips/front_door-1234.jpg", + MediaAuthResolution.CAMERA, + camera="front_door", + ) + self._assert( + "/clips/back_door-1234-clean.webp", + MediaAuthResolution.CAMERA, + camera="back_door", + ) + + def test_clip_filename_with_hyphenated_camera_name(self): + # Camera name "back-yard" itself contains a hyphen; longest-prefix + # match must pick `back-yard`, not the bogus `back` prefix. + self._assert( + "/clips/back-yard-1234.jpg", + MediaAuthResolution.CAMERA, + camera="back-yard", + ) + + def test_clip_filename_no_matching_camera(self): + # Looks like a media path but couldn't classify — fail closed for + # restricted users (UNRESOLVED_MEDIA), not pass-through. + self._assert( + "/clips/nonexistent-1234.jpg", MediaAuthResolution.UNRESOLVED_MEDIA + ) + + def test_clip_thumbs(self): + self._assert("/clips/thumbs/", MediaAuthResolution.LISTING_MULTI_CAMERA) + self._assert( + "/clips/thumbs/front_door/", + MediaAuthResolution.CAMERA, + camera="front_door", + ) + self._assert( + "/clips/thumbs/back_door/abc.webp", + MediaAuthResolution.CAMERA, + camera="back_door", + ) + + def test_clip_previews(self): + self._assert("/clips/previews/", MediaAuthResolution.LISTING_MULTI_CAMERA) + self._assert( + "/clips/previews/front_door/", + MediaAuthResolution.CAMERA, + camera="front_door", + ) + self._assert( + "/clips/previews/back_door/segment.mp4", + MediaAuthResolution.CAMERA, + camera="back_door", + ) + + def test_clip_review_thumbs(self): + # Format: /clips/review/thumb-{camera}-{review_id}.webp (frigate/review/maintainer.py). + self._assert( + "/clips/review/thumb-front_door-abc123.webp", + MediaAuthResolution.CAMERA, + camera="front_door", + ) + # Hyphenated camera name — longest-prefix match. + self._assert( + "/clips/review/thumb-back-yard-abc123.webp", + MediaAuthResolution.CAMERA, + camera="back-yard", + ) + # Unknown camera prefix → unresolved, not allowed for restricted users. + self._assert( + "/clips/review/thumb-unknown-cam-abc123.webp", + MediaAuthResolution.UNRESOLVED_MEDIA, + ) + + def test_clip_admin_only_subtrees(self): + self._assert("/clips/faces/train/foo.webp", MediaAuthResolution.ADMIN_ONLY) + self._assert("/clips/faces/", MediaAuthResolution.ADMIN_ONLY) + self._assert("/clips/genai-requests/x/0.webp", MediaAuthResolution.ADMIN_ONLY) + self._assert( + "/clips/preview_restart_cache/x.mp4", MediaAuthResolution.ADMIN_ONLY + ) + self._assert("/clips/some_model/train/x.jpg", MediaAuthResolution.ADMIN_ONLY) + self._assert("/clips/some_model/dataset/x.jpg", MediaAuthResolution.ADMIN_ONLY) + + def test_clip_unknown_subtree_is_unresolved(self): + # Unknown /clips/{x}/{y}/... subtree falls through as unresolved (not + # admin-only) so restricted users get 403 without admins being denied + # access to legitimate but unrecognized resources. + self._assert("/clips/random_dir/foo.jpg", MediaAuthResolution.UNRESOLVED_MEDIA) + + def test_clip_top_level_listing(self): + self._assert("/clips/", MediaAuthResolution.LISTING_MULTI_CAMERA) + + def test_exports_listing(self): + self._assert("/exports/", MediaAuthResolution.LISTING_MULTI_CAMERA) + + +class TestExportResolution(unittest.TestCase): + """Export resolution requires a DB lookup.""" + + def setUp(self): + migrate_db = SqliteExtDatabase("test.db") + del logging.getLogger("peewee_migrate").handlers[:] + Router(migrate_db).run() + migrate_db.close() + self.db = SqliteQueueDatabase(TEST_DB) + self.db.bind([Event, ReviewSegment, Recordings, Export]) + self.config = FrigateConfig(**_CONFIG) + + def tearDown(self): + if not self.db.is_closed(): + self.db.close() + for f in TEST_DB_CLEANUPS: + try: + os.remove(f) + except OSError: + pass + + def _insert_export(self, export_id, camera, filename): + Export.insert( + id=export_id, + camera=camera, + name=f"export-{export_id}", + date=datetime.datetime.now(), + video_path=f"/media/frigate/exports/{filename}", + thumb_path=f"/media/frigate/exports/{filename}.jpg", + in_progress=False, + ).execute() + + def test_export_resolves_camera(self): + self._insert_export( + "exp1", "back_door", "back_door_20260511_140000-20260511_150000_abc123.mp4" + ) + resolution, camera = resolve_media_uri( + "/exports/back_door_20260511_140000-20260511_150000_abc123.mp4", + self.config, + ) + self.assertEqual(resolution, MediaAuthResolution.CAMERA) + self.assertEqual(camera, "back_door") + + def test_unknown_export_is_unresolved(self): + # No matching row → UNRESOLVED_MEDIA (fail closed for restricted users), + # not UNKNOWN (which would pass-through). + resolution, camera = resolve_media_uri( + "/exports/does_not_exist.mp4", self.config + ) + self.assertEqual(resolution, MediaAuthResolution.UNRESOLVED_MEDIA) + self.assertIsNone(camera) + + def test_export_anchored_match_not_endswith(self): + # Anchored exact-path equality must NOT match by filename suffix. + # A request like /exports/clip.mp4 must not authorize against a row at + # /media/frigate/exports/back_door_clip.mp4 just because the suffix matches. + self._insert_export("exp_bd", "back_door", "back_door_clip.mp4") + self._insert_export("exp_fd", "front_door", "front_door_clip.mp4") + resolution, _ = resolve_media_uri("/exports/clip.mp4", self.config) + self.assertEqual(resolution, MediaAuthResolution.UNRESOLVED_MEDIA) + + +class TestDenyResponseForMediaUri(unittest.TestCase): + """End-to-end decision check used by /auth.""" + + def setUp(self): + self.config = FrigateConfig(**_CONFIG) + + def _deny(self, url, role): + return deny_response_for_media_uri(url, role, self.config) + + def test_admin_always_allowed(self): + self.assertIsNone(self._deny("/clips/back_door-1.jpg", "admin")) + self.assertIsNone(self._deny("/clips/", "admin")) + self.assertIsNone(self._deny("/clips/faces/x.webp", "admin")) + self.assertIsNone( + self._deny("/recordings/2026-05-11/14/back_door/00.00.mp4", "admin") + ) + + def test_unrestricted_role_allowed(self): + # "viewer" role has no entry in roles_dict → full access (matches the + # behavior of require_camera_access). + self.assertIsNone(self._deny("/clips/back_door-1.jpg", "viewer")) + self.assertIsNone(self._deny("/clips/", "viewer")) + + def test_restricted_role_allowed_camera(self): + self.assertIsNone(self._deny("/clips/front_door-1.jpg", "limited_user")) + self.assertIsNone( + self._deny("/recordings/2026-05-11/14/front_door/00.00.mp4", "limited_user") + ) + self.assertIsNone( + self._deny("/clips/thumbs/front_door/abc.webp", "limited_user") + ) + + def test_restricted_role_blocked_other_camera(self): + self.assertEqual(self._deny("/clips/back_door-1.jpg", "limited_user"), 403) + self.assertEqual( + self._deny("/recordings/2026-05-11/14/back_door/00.00.mp4", "limited_user"), + 403, + ) + self.assertEqual( + self._deny("/clips/thumbs/back_door/abc.webp", "limited_user"), 403 + ) + + def test_restricted_role_blocked_admin_only(self): + self.assertEqual(self._deny("/clips/faces/train/foo.webp", "limited_user"), 403) + + def test_restricted_role_blocked_multi_camera_listing(self): + self.assertEqual(self._deny("/clips/", "limited_user"), 403) + self.assertEqual(self._deny("/exports/", "limited_user"), 403) + self.assertEqual(self._deny("/recordings/2026-05-11/14/", "limited_user"), 403) + + def test_restricted_role_allowed_neutral_listing(self): + self.assertIsNone(self._deny("/recordings/", "limited_user")) + self.assertIsNone(self._deny("/recordings/2026-05-11/", "limited_user")) + + def test_non_media_uri_passes_through(self): + self.assertIsNone(self._deny("/api/events", "limited_user")) + self.assertIsNone(self._deny("http://h/login", "limited_user")) + + def test_missing_header(self): + self.assertIsNone(self._deny(None, "limited_user")) + self.assertIsNone(self._deny("", "limited_user")) + + def test_traversal_in_media_uri_denied_for_all_roles(self): + # Bypass attempt: parts[3] looks like an allowed camera, but the + # normalized path nginx would serve points at a forbidden camera. + # Both restricted and admin should be denied — the URI is malformed + # and we refuse to make an auth decision against it. + traversal_uris = [ + "/recordings/2026-05-11/14/front_door/../back_door/00.00.mp4", + "/clips/front_door-1.jpg/../back_door-1.jpg", + "/exports/../recordings/2026-05-11/14/back_door/00.00.mp4", + "/clips/./back_door-1.jpg", + ] + for uri in traversal_uris: + self.assertEqual(self._deny(uri, "limited_user"), 403, uri) + self.assertEqual(self._deny(uri, "admin"), 403, uri) + self.assertEqual(self._deny(uri, "viewer"), 403, uri) + + def test_traversal_outside_media_passes_through(self): + # `..` in non-media URIs is not our problem; the backend handles it. + self.assertIsNone(self._deny("/api/foo/../bar", "limited_user")) + + def test_percent_encoded_traversal_denied(self): + # nginx may decode percent-encoded `%2E%2E` to `..` before serving; + # we must apply the same denial after percent-decoding. + self.assertEqual( + self._deny( + "/recordings/2026-05-11/14/front_door/%2E%2E/back_door/00.mp4", + "limited_user", + ), + 403, + ) + + def test_unresolved_media_fails_closed_for_restricted(self): + # Restricted user requesting a media URI we can't classify (no DB row, + # unknown clip prefix, unknown clip subtree) must be denied. + self.assertEqual(self._deny("/clips/nonexistent-1.jpg", "limited_user"), 403) + self.assertEqual(self._deny("/clips/random_dir/foo.jpg", "limited_user"), 403) + self.assertEqual( + self._deny("/clips/review/thumb-unknown_cam-1.webp", "limited_user"), + 403, + ) + + def test_unresolved_media_allowed_for_admin(self): + # Admin and full-access roles are *not* denied on UNRESOLVED_MEDIA — + # nginx returns 404 if the file doesn't exist on disk anyway, and we + # don't want a stale DB to lock out admins. + self.assertIsNone(self._deny("/clips/nonexistent-1.jpg", "admin")) + self.assertIsNone(self._deny("/clips/nonexistent-1.jpg", "viewer")) + + +if __name__ == "__main__": + unittest.main()