From 10bc306b332f1a940ee5b4cab7534f86a1372696 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 10 Nov 2025 18:46:23 -0600 Subject: [PATCH] digest auth backend --- frigate/api/camera.py | 82 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 75 insertions(+), 7 deletions(-) diff --git a/frigate/api/camera.py b/frigate/api/camera.py index d2c9f60be..ef55a283e 100644 --- a/frigate/api/camera.py +++ b/frigate/api/camera.py @@ -7,11 +7,13 @@ from importlib.util import find_spec from pathlib import Path from urllib.parse import quote_plus +import httpx import requests from fastapi import APIRouter, Depends, Query, Request, Response from fastapi.responses import JSONResponse from onvif import ONVIFCamera, ONVIFError from zeep.exceptions import Fault, TransportError +from zeep.transports import AsyncTransport from frigate.api.auth import require_role from frigate.api.defs.tags import Tags @@ -464,7 +466,8 @@ def _extract_fps(r_frame_rate: str) -> float | None: summary="Probe ONVIF device", description=( "Probe an ONVIF device to determine capabilities and optionally test available stream URIs. " - "Query params: host (required), port (default 80), username, password, test (boolean)." + "Query params: host (required), port (default 80), username, password, test (boolean), " + "auth_type (basic or digest, default basic)." ), ) async def onvif_probe( @@ -474,6 +477,7 @@ async def onvif_probe( username: str = Query(""), password: str = Query(""), test: bool = Query(False), + auth_type: str = Query("basic"), # Add auth_type parameter ): """ Probe a single ONVIF device to determine capabilities. @@ -491,6 +495,7 @@ async def onvif_probe( username: ONVIF username (optional) password: ONVIF password (optional) test: run ffprobe on the stream (optional) + auth_type: Authentication type - "basic" or "digest" (default "basic") Returns: JSON with device capabilities information @@ -508,10 +513,20 @@ async def onvif_probe( status_code=400, ) + # Validate auth_type + if auth_type not in ["basic", "digest"]: + return JSONResponse( + content={ + "success": False, + "message": "auth_type must be 'basic' or 'digest'", + }, + status_code=400, + ) + onvif_camera = None try: - logger.debug(f"Probing ONVIF device at {host}:{port}") + logger.debug(f"Probing ONVIF device at {host}:{port} with {auth_type} auth") try: wsdl_base = None @@ -525,7 +540,29 @@ async def onvif_probe( host, port, username or "", password or "", wsdl_dir=wsdl_base ) - await onvif_camera.update_xaddrs() + # Configure digest authentication if requested + if auth_type == "digest" and username and password: + # Create httpx client with digest auth + auth = httpx.DigestAuth(username, password) + client = httpx.AsyncClient(auth=auth, timeout=10.0) + + # Replace the transport in the zeep client + transport = AsyncTransport(client=client) + + # Update the xaddr before setting transport + await onvif_camera.update_xaddrs() + + # Replace transport in all services + if hasattr(onvif_camera, "devicemgmt"): + onvif_camera.devicemgmt.zeep_client.transport = transport + if hasattr(onvif_camera, "media"): + onvif_camera.media.zeep_client.transport = transport + if hasattr(onvif_camera, "ptz"): + onvif_camera.ptz.zeep_client.transport = transport + + logger.debug("Configured digest authentication") + else: + await onvif_camera.update_xaddrs() # Get device information device_info = { @@ -535,6 +572,14 @@ async def onvif_probe( } try: device_service = await onvif_camera.create_devicemgmt_service() + + # Update transport for device service if digest auth + if auth_type == "digest" and username and password: + auth = httpx.DigestAuth(username, password) + client = httpx.AsyncClient(auth=auth, timeout=10.0) + transport = AsyncTransport(client=client) + device_service.zeep_client.transport = transport + device_info_resp = await device_service.GetDeviceInformation() manufacturer = getattr(device_info_resp, "Manufacturer", None) or ( device_info_resp.get("Manufacturer") @@ -558,8 +603,8 @@ async def onvif_probe( "firmware_version": firmware or "Unknown", } ) - except Exception: - logger.debug("Failed to get device info") + except Exception as e: + logger.debug(f"Failed to get device info: {e}") # Get media profiles profiles = [] @@ -568,6 +613,14 @@ async def onvif_probe( ptz_config_token = None try: media_service = await onvif_camera.create_media_service() + + # Update transport for media service if digest auth + if auth_type == "digest" and username and password: + auth = httpx.DigestAuth(username, password) + client = httpx.AsyncClient(auth=auth, timeout=10.0) + transport = AsyncTransport(client=client) + media_service.zeep_client.transport = transport + profiles = await media_service.GetProfiles() profiles_count = len(profiles) if profiles else 0 if profiles and len(profiles) > 0: @@ -585,8 +638,8 @@ async def onvif_probe( if isinstance(ptz_configuration, dict) else None ) - except Exception: - logger.debug("Failed to get media profiles") + except Exception as e: + logger.debug(f"Failed to get media profiles: {e}") # Check PTZ support and capabilities ptz_supported = False @@ -596,6 +649,13 @@ async def onvif_probe( try: ptz_service = await onvif_camera.create_ptz_service() + # Update transport for PTZ service if digest auth + if auth_type == "digest" and username and password: + auth = httpx.DigestAuth(username, password) + client = httpx.AsyncClient(auth=auth, timeout=10.0) + transport = AsyncTransport(client=client) + ptz_service.zeep_client.transport = transport + # Check if PTZ service is available try: await ptz_service.GetServiceCapabilities() @@ -744,6 +804,14 @@ async def onvif_probe( rtsp_candidates: list[dict] = [] try: media_service = await onvif_camera.create_media_service() + + # Update transport for media service if digest auth + if auth_type == "digest" and username and password: + auth = httpx.DigestAuth(username, password) + client = httpx.AsyncClient(auth=auth, timeout=10.0) + transport = AsyncTransport(client=client) + media_service.zeep_client.transport = transport + if profiles_count and media_service: for p in profiles or []: token = getattr(p, "token", None) or (