try both onvif WS-Security password encodings when probing in the add camera wizard

This commit is contained in:
Josh Hawkins 2026-05-31 08:16:22 -05:00
parent 08be019bed
commit 418c701f24

View File

@ -529,6 +529,68 @@ def _extract_fps(r_frame_rate: str) -> float | None:
return None
def _build_digest_transport(username: str, password: str) -> AsyncTransport:
"""Build a zeep transport backed by an httpx client using HTTP digest auth."""
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
return AsyncTransport(client=client)
async def _connect_onvif_camera(
host: str,
port: int,
username: str,
password: str,
wsdl_base: str | None,
auth_type: str,
) -> ONVIFCamera:
"""Connect to an ONVIF device, trying both WS-Security password encodings.
Cameras disagree on whether the WS-Security UsernameToken should carry a
hashed PasswordDigest or a plaintext PasswordText. The wizard can't know
which a given camera expects, so we try PasswordDigest first (the common
case) and fall back to PasswordText when the device rejects the token. This
is independent of auth_type, which controls HTTP transport-level auth.
"""
first_error: Fault | None = None
# encrypt=True -> PasswordDigest, encrypt=False -> PasswordText
for encrypt in (True, False):
onvif_camera = ONVIFCamera(
host,
port,
username or "",
password or "",
wsdl_dir=wsdl_base,
encrypt=encrypt,
)
try:
await onvif_camera.update_xaddrs()
except Fault as e:
# A SOAP fault here is how a camera signals the wrong password
# encoding, so retry with the other encoding before giving up.
logger.debug(
"ONVIF connect with %s rejected, trying alternate encoding",
"PasswordDigest" if encrypt else "PasswordText",
)
if first_error is None:
first_error = e
continue
if auth_type == "digest" and username and password:
transport = _build_digest_transport(username, password)
for service in ("devicemgmt", "media", "ptz"):
if hasattr(onvif_camera, service):
getattr(onvif_camera, service).zeep_client.transport = transport
logger.debug("Configured digest authentication")
return onvif_camera
# Both encodings failed authentication; surface the original fault.
raise first_error
@router.get(
"/onvif/probe",
dependencies=[Depends(require_role(["admin"]))],
@ -605,34 +667,10 @@ async def onvif_probe(
except Exception:
wsdl_base = None
onvif_camera = ONVIFCamera(
host, port, username or "", password or "", wsdl_dir=wsdl_base
onvif_camera = await _connect_onvif_camera(
host, port, username, password, wsdl_base, auth_type
)
# Configure digest authentication if requested
if auth_type == "digest" and username and password:
# Create httpx client with digest auth
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
# Replace the transport in the zeep client
transport = AsyncTransport(client=client)
# Update the xaddr before setting transport
await onvif_camera.update_xaddrs()
# Replace transport in all services
if hasattr(onvif_camera, "devicemgmt"):
onvif_camera.devicemgmt.zeep_client.transport = transport
if hasattr(onvif_camera, "media"):
onvif_camera.media.zeep_client.transport = transport
if hasattr(onvif_camera, "ptz"):
onvif_camera.ptz.zeep_client.transport = transport
logger.debug("Configured digest authentication")
else:
await onvif_camera.update_xaddrs()
# Get device information
device_info = {
"manufacturer": "Unknown",
@ -644,10 +682,9 @@ async def onvif_probe(
# Update transport for device service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
device_service.zeep_client.transport = transport
device_service.zeep_client.transport = _build_digest_transport(
username, password
)
device_info_resp = await device_service.GetDeviceInformation()
manufacturer = getattr(device_info_resp, "Manufacturer", None) or (
@ -685,10 +722,9 @@ async def onvif_probe(
# Update transport for media service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
media_service.zeep_client.transport = transport
media_service.zeep_client.transport = _build_digest_transport(
username, password
)
profiles = await media_service.GetProfiles()
profiles_count = len(profiles) if profiles else 0
@ -720,10 +756,9 @@ async def onvif_probe(
# Update transport for PTZ service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
ptz_service.zeep_client.transport = transport
ptz_service.zeep_client.transport = _build_digest_transport(
username, password
)
# Check if PTZ service is available
try:
@ -876,10 +911,9 @@ async def onvif_probe(
# Update transport for media service if digest auth
if auth_type == "digest" and username and password:
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
transport = AsyncTransport(client=client)
media_service.zeep_client.transport = transport
media_service.zeep_client.transport = _build_digest_transport(
username, password
)
if profiles_count and media_service:
for p in profiles or []: