digest auth backend

This commit is contained in:
Josh Hawkins 2025-11-10 18:46:23 -06:00
parent e4eac4ac81
commit 10bc306b33

View File

@ -7,11 +7,13 @@ from importlib.util import find_spec
from pathlib import Path from pathlib import Path
from urllib.parse import quote_plus from urllib.parse import quote_plus
import httpx
import requests import requests
from fastapi import APIRouter, Depends, Query, Request, Response from fastapi import APIRouter, Depends, Query, Request, Response
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from onvif import ONVIFCamera, ONVIFError from onvif import ONVIFCamera, ONVIFError
from zeep.exceptions import Fault, TransportError from zeep.exceptions import Fault, TransportError
from zeep.transports import AsyncTransport
from frigate.api.auth import require_role from frigate.api.auth import require_role
from frigate.api.defs.tags import Tags from frigate.api.defs.tags import Tags
@ -464,7 +466,8 @@ def _extract_fps(r_frame_rate: str) -> float | None:
summary="Probe ONVIF device", summary="Probe ONVIF device",
description=( description=(
"Probe an ONVIF device to determine capabilities and optionally test available stream URIs. " "Probe an ONVIF device to determine capabilities and optionally test available stream URIs. "
"Query params: host (required), port (default 80), username, password, test (boolean)." "Query params: host (required), port (default 80), username, password, test (boolean), "
"auth_type (basic or digest, default basic)."
), ),
) )
async def onvif_probe( async def onvif_probe(
@ -474,6 +477,7 @@ async def onvif_probe(
username: str = Query(""), username: str = Query(""),
password: str = Query(""), password: str = Query(""),
test: bool = Query(False), test: bool = Query(False),
auth_type: str = Query("basic"), # Add auth_type parameter
): ):
""" """
Probe a single ONVIF device to determine capabilities. Probe a single ONVIF device to determine capabilities.
@ -491,6 +495,7 @@ async def onvif_probe(
username: ONVIF username (optional) username: ONVIF username (optional)
password: ONVIF password (optional) password: ONVIF password (optional)
test: run ffprobe on the stream (optional) test: run ffprobe on the stream (optional)
auth_type: Authentication type - "basic" or "digest" (default "basic")
Returns: Returns:
JSON with device capabilities information JSON with device capabilities information
@ -508,10 +513,20 @@ async def onvif_probe(
status_code=400, status_code=400,
) )
# Validate auth_type
if auth_type not in ["basic", "digest"]:
return JSONResponse(
content={
"success": False,
"message": "auth_type must be 'basic' or 'digest'",
},
status_code=400,
)
onvif_camera = None onvif_camera = None
try: try:
logger.debug(f"Probing ONVIF device at {host}:{port}") logger.debug(f"Probing ONVIF device at {host}:{port} with {auth_type} auth")
try: try:
wsdl_base = None wsdl_base = None
@ -525,7 +540,29 @@ async def onvif_probe(
host, port, username or "", password or "", wsdl_dir=wsdl_base host, port, username or "", password or "", wsdl_dir=wsdl_base
) )
await onvif_camera.update_xaddrs() # Configure digest authentication if requested
if auth_type == "digest" and username and password:
# Create httpx client with digest auth
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
# Replace the transport in the zeep client
transport = AsyncTransport(client=client)
# Update the xaddr before setting transport
await onvif_camera.update_xaddrs()
# Replace transport in all services
if hasattr(onvif_camera, "devicemgmt"):
onvif_camera.devicemgmt.zeep_client.transport = transport
if hasattr(onvif_camera, "media"):
onvif_camera.media.zeep_client.transport = transport
if hasattr(onvif_camera, "ptz"):
onvif_camera.ptz.zeep_client.transport = transport
logger.debug("Configured digest authentication")
else:
await onvif_camera.update_xaddrs()
# Get device information # Get device information
device_info = { device_info = {
@ -535,6 +572,14 @@ async def onvif_probe(
} }
try: try:
device_service = await onvif_camera.create_devicemgmt_service() device_service = await onvif_camera.create_devicemgmt_service()
# Update transport for device service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
device_service.zeep_client.transport = transport
device_info_resp = await device_service.GetDeviceInformation() device_info_resp = await device_service.GetDeviceInformation()
manufacturer = getattr(device_info_resp, "Manufacturer", None) or ( manufacturer = getattr(device_info_resp, "Manufacturer", None) or (
device_info_resp.get("Manufacturer") device_info_resp.get("Manufacturer")
@ -558,8 +603,8 @@ async def onvif_probe(
"firmware_version": firmware or "Unknown", "firmware_version": firmware or "Unknown",
} }
) )
except Exception: except Exception as e:
logger.debug("Failed to get device info") logger.debug(f"Failed to get device info: {e}")
# Get media profiles # Get media profiles
profiles = [] profiles = []
@ -568,6 +613,14 @@ async def onvif_probe(
ptz_config_token = None ptz_config_token = None
try: try:
media_service = await onvif_camera.create_media_service() media_service = await onvif_camera.create_media_service()
# Update transport for media service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
media_service.zeep_client.transport = transport
profiles = await media_service.GetProfiles() profiles = await media_service.GetProfiles()
profiles_count = len(profiles) if profiles else 0 profiles_count = len(profiles) if profiles else 0
if profiles and len(profiles) > 0: if profiles and len(profiles) > 0:
@ -585,8 +638,8 @@ async def onvif_probe(
if isinstance(ptz_configuration, dict) if isinstance(ptz_configuration, dict)
else None else None
) )
except Exception: except Exception as e:
logger.debug("Failed to get media profiles") logger.debug(f"Failed to get media profiles: {e}")
# Check PTZ support and capabilities # Check PTZ support and capabilities
ptz_supported = False ptz_supported = False
@ -596,6 +649,13 @@ async def onvif_probe(
try: try:
ptz_service = await onvif_camera.create_ptz_service() ptz_service = await onvif_camera.create_ptz_service()
# Update transport for PTZ service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
ptz_service.zeep_client.transport = transport
# Check if PTZ service is available # Check if PTZ service is available
try: try:
await ptz_service.GetServiceCapabilities() await ptz_service.GetServiceCapabilities()
@ -744,6 +804,14 @@ async def onvif_probe(
rtsp_candidates: list[dict] = [] rtsp_candidates: list[dict] = []
try: try:
media_service = await onvif_camera.create_media_service() media_service = await onvif_camera.create_media_service()
# Update transport for media service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
media_service.zeep_client.transport = transport
if profiles_count and media_service: if profiles_count and media_service:
for p in profiles or []: for p in profiles or []:
token = getattr(p, "token", None) or ( token = getattr(p, "token", None) or (