From 59fa93a50a28b9b8d78ceeb2a13838a86f460317 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sat, 8 Nov 2025 21:35:24 -0600 Subject: [PATCH] backend api endpoint --- frigate/api/camera.py | 422 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 421 insertions(+), 1 deletion(-) diff --git a/frigate/api/camera.py b/frigate/api/camera.py index 9a91bd1a9..1b3c1d98a 100644 --- a/frigate/api/camera.py +++ b/frigate/api/camera.py @@ -3,11 +3,15 @@ import json import logging import re +from importlib.util import find_spec +from pathlib import Path from urllib.parse import quote_plus import requests -from fastapi import APIRouter, Depends, Request, Response +from fastapi import APIRouter, Depends, Query, Request, Response from fastapi.responses import JSONResponse +from onvif import ONVIFCamera, ONVIFError +from zeep.exceptions import Fault, TransportError from frigate.api.auth import require_role from frigate.api.defs.tags import Tags @@ -452,3 +456,419 @@ def _extract_fps(r_frame_rate: str) -> float | None: return round(float(num) / float(den), 2) except (ValueError, ZeroDivisionError): return None + + +@router.get("/onvif/probe", dependencies=[Depends(require_role(["admin"]))]) +async def onvif_probe( + request: Request, + host: str = Query(None), + port: int = Query(80), + username: str = Query(""), + password: str = Query(""), + test: bool = Query(False), +): + """ + Probe a single ONVIF device to determine capabilities. + + Connects to an ONVIF device and queries for: + - Device information (manufacturer, model) + - Media profiles count + - PTZ support + - Available presets + - Autotrack move status support + + Query Parameters: + host: Device host/IP address (required) + port: Device port (default 8080) + username: ONVIF username (optional) + password: ONVIF password (optional) + test: run ffprobe on the stream (optional) + + Returns: + JSON with device capabilities information + """ + if not host: + return JSONResponse( + content={"success": False, "message": "host parameter is required"}, + status_code=400, + ) + + # Validate host format + if not _is_valid_host(host): + return JSONResponse( + content={"success": False, "message": "Invalid host format"}, + status_code=400, + ) + + onvif_camera = None + + try: + logger.debug(f"Probing ONVIF device at {host}:{port}") + + # Resolve wsdl directory from the installed onvif package (if available) + try: + wsdl_base = None + spec = find_spec("onvif") + if spec and getattr(spec, "origin", None): + wsdl_base = str(Path(spec.origin).parent / "wsdl") + except Exception: + wsdl_base = None + + onvif_camera = ONVIFCamera( + host, port, username or "", password or "", wsdl_dir=wsdl_base + ) + + await onvif_camera.update_xaddrs() + + # Get device information + device_info = { + "manufacturer": "Unknown", + "model": "Unknown", + "firmware_version": "Unknown", + } + try: + device_service = await onvif_camera.create_devicemgmt_service() + device_info_resp = await device_service.GetDeviceInformation() + manufacturer = getattr(device_info_resp, "Manufacturer", None) or ( + device_info_resp.get("Manufacturer") + if isinstance(device_info_resp, dict) + else None + ) + model = getattr(device_info_resp, "Model", None) or ( + device_info_resp.get("Model") + if isinstance(device_info_resp, dict) + else None + ) + firmware = getattr(device_info_resp, "FirmwareVersion", None) or ( + device_info_resp.get("FirmwareVersion") + if isinstance(device_info_resp, dict) + else None + ) + device_info.update( + { + "manufacturer": manufacturer or "Unknown", + "model": model or "Unknown", + "firmware_version": firmware or "Unknown", + } + ) + except Exception: + logger.debug("Failed to get device info") + + # Get media profiles + profiles = [] + profiles_count = 0 + first_profile_token = None + ptz_config_token = None + try: + media_service = await onvif_camera.create_media_service() + profiles = await media_service.GetProfiles() + profiles_count = len(profiles) if profiles else 0 + if profiles and len(profiles) > 0: + p = profiles[0] + first_profile_token = getattr(p, "token", None) or ( + p.get("token") if isinstance(p, dict) else None + ) + # Get PTZ configuration token from the profile + ptz_configuration = getattr(p, "PTZConfiguration", None) or ( + p.get("PTZConfiguration") if isinstance(p, dict) else None + ) + if ptz_configuration: + ptz_config_token = getattr(ptz_configuration, "token", None) or ( + ptz_configuration.get("token") + if isinstance(ptz_configuration, dict) + else None + ) + except Exception: + logger.debug("Failed to get media profiles") + + # Check PTZ support and capabilities + ptz_supported = False + presets_count = 0 + autotrack_supported = False + + try: + ptz_service = await onvif_camera.create_ptz_service() + + # Check if PTZ service is available + try: + await ptz_service.GetServiceCapabilities() + ptz_supported = True + logger.debug("PTZ service is available") + except Exception as e: + logger.debug(f"PTZ service not available: {e}") + ptz_supported = False + + # Try to get presets if PTZ is supported and we have a profile + if ptz_supported and first_profile_token: + try: + presets_resp = await ptz_service.GetPresets( + {"ProfileToken": first_profile_token} + ) + presets_count = len(presets_resp) if presets_resp else 0 + logger.debug(f"Found {presets_count} presets") + except Exception as e: + logger.debug(f"Failed to get presets: {e}") + presets_count = 0 + + # Check for autotracking support - requires both FOV relative movement and MoveStatus + if ptz_supported and first_profile_token and ptz_config_token: + # First check for FOV relative movement support + pt_r_fov_supported = False + try: + config_request = ptz_service.create_type("GetConfigurationOptions") + config_request.ConfigurationToken = ptz_config_token + ptz_config = await ptz_service.GetConfigurationOptions( + config_request + ) + + if ptz_config: + # Check for pt-r-fov support + spaces = getattr(ptz_config, "Spaces", None) or ( + ptz_config.get("Spaces") + if isinstance(ptz_config, dict) + else None + ) + + if spaces: + rel_pan_tilt_space = getattr( + spaces, "RelativePanTiltTranslationSpace", None + ) or ( + spaces.get("RelativePanTiltTranslationSpace") + if isinstance(spaces, dict) + else None + ) + + if rel_pan_tilt_space: + # Look for FOV space + for i, space in enumerate(rel_pan_tilt_space): + uri = None + if isinstance(space, dict): + uri = space.get("URI") + else: + uri = getattr(space, "URI", None) + + if uri and "TranslationSpaceFov" in uri: + pt_r_fov_supported = True + logger.debug( + "FOV relative movement (pt-r-fov) supported" + ) + break + + logger.debug(f"PTZ config spaces: {ptz_config}") + except Exception as e: + logger.debug(f"Failed to check FOV relative movement: {e}") + pt_r_fov_supported = False + + # Now check for MoveStatus support via GetServiceCapabilities + if pt_r_fov_supported: + try: + service_capabilities_request = ptz_service.create_type( + "GetServiceCapabilities" + ) + service_capabilities = await ptz_service.GetServiceCapabilities( + service_capabilities_request + ) + + # Look for MoveStatus in the capabilities + move_status_capable = False + if service_capabilities: + # Try to find MoveStatus key recursively + def find_move_status(obj, key="MoveStatus"): + if isinstance(obj, dict): + if key in obj: + return obj[key] + for v in obj.values(): + result = find_move_status(v, key) + if result is not None: + return result + elif hasattr(obj, key): + return getattr(obj, key) + elif hasattr(obj, "__dict__"): + for v in vars(obj).values(): + result = find_move_status(v, key) + if result is not None: + return result + return None + + move_status_value = find_move_status(service_capabilities) + + # MoveStatus should return "true" if supported + if isinstance(move_status_value, bool): + move_status_capable = move_status_value + elif isinstance(move_status_value, str): + move_status_capable = ( + move_status_value.lower() == "true" + ) + + logger.debug(f"MoveStatus capability: {move_status_value}") + + # Autotracking is supported if both conditions are met + autotrack_supported = pt_r_fov_supported and move_status_capable + + if autotrack_supported: + logger.debug( + "Autotracking fully supported (pt-r-fov + MoveStatus)" + ) + else: + logger.debug( + f"Autotracking not fully supported - pt-r-fov: {pt_r_fov_supported}, MoveStatus: {move_status_capable}" + ) + except Exception as e: + logger.debug(f"Failed to check MoveStatus support: {e}") + autotrack_supported = False + + except Exception as e: + logger.debug(f"Failed to probe PTZ service: {e}") + + result = { + "success": True, + "host": host, + "port": port, + "manufacturer": device_info["manufacturer"], + "model": device_info["model"], + "firmware_version": device_info["firmware_version"], + "profiles_count": profiles_count, + "ptz_supported": ptz_supported, + "presets_count": presets_count, + "autotrack_supported": autotrack_supported, + } + + # Gather RTSP candidates + rtsp_candidates: list[dict] = [] + try: + media_service = await onvif_camera.create_media_service() + if profiles_count and media_service: + for p in profiles or []: + token = getattr(p, "token", None) or ( + p.get("token") if isinstance(p, dict) else None + ) + if not token: + continue + try: + stream_setup = { + "Stream": "RTP-Unicast", + "Transport": {"Protocol": "RTSP"}, + } + stream_req = { + "ProfileToken": token, + "StreamSetup": stream_setup, + } + stream_uri_resp = await media_service.GetStreamUri(stream_req) + uri = ( + stream_uri_resp.get("Uri") + if isinstance(stream_uri_resp, dict) + else getattr(stream_uri_resp, "Uri", None) + ) + if uri: + rtsp_candidates.append( + { + "source": "GetStreamUri", + "profile_token": token, + "uri": uri, + } + ) + except Exception: + continue + + # Add common RTSP patterns as fallback + if not rtsp_candidates: + common_paths = [ + "/h264", + "/live.sdp", + "/media.amp", + "/Streaming/Channels/101", + "/Streaming/Channels/1", + "/stream1", + "/cam/realmonitor?channel=1&subtype=0", + "/11", + ] + auth_str = f"{username}:{password}@" if username and password else "" + rtsp_port = 554 + for path in common_paths: + uri = f"rtsp://{auth_str}{host}:{rtsp_port}{path}" + rtsp_candidates.append({"source": "pattern", "uri": uri}) + except Exception: + logger.debug("Failed to collect RTSP candidates") + + # Optionally test RTSP candidates using ffprobe_stream + tested_candidates = [] + if test and rtsp_candidates: + for c in rtsp_candidates: + uri = c["uri"] + to_test = [uri] + try: + if ( + username + and password + and isinstance(uri, str) + and uri.startswith("rtsp://") + and "@" not in uri + ): + cred = f"{quote_plus(username)}:{quote_plus(password)}@" + cred_uri = uri.replace("rtsp://", f"rtsp://{cred}", 1) + if cred_uri not in to_test: + to_test.append(cred_uri) + except Exception: + pass + + for test_uri in to_test: + try: + probe = ffprobe_stream( + request.app.frigate_config.ffmpeg, test_uri, detailed=False + ) + print(probe) + ok = probe is not None and getattr(probe, "returncode", 1) == 0 + tested_candidates.append( + { + "uri": test_uri, + "source": c.get("source"), + "ok": ok, + "profile_token": c.get("profile_token"), + } + ) + except Exception as e: + logger.debug(f"Unable to probe stream: {e}") + tested_candidates.append( + { + "uri": test_uri, + "source": c.get("source"), + "ok": False, + "profile_token": c.get("profile_token"), + } + ) + + result["rtsp_candidates"] = rtsp_candidates + if test: + result["rtsp_tested"] = tested_candidates + + logger.debug(f"ONVIF probe successful: {result}") + return JSONResponse(content=result) + + except ONVIFError as e: + logger.warning(f"ONVIF error probing {host}:{port}: {e}") + return JSONResponse( + content={"success": False, "message": "ONVIF error"}, + status_code=400, + ) + except (Fault, TransportError) as e: + logger.warning(f"Connection error probing {host}:{port}: {e}") + return JSONResponse( + content={"success": False, "message": "Connection error"}, + status_code=503, + ) + except Exception as e: + logger.warning(f"Error probing ONVIF device at {host}:{port}, {e}") + return JSONResponse( + content={"success": False, "message": "Probe failed"}, + status_code=500, + ) + + finally: + # Best-effort cleanup of ONVIF camera client session + if onvif_camera is not None: + try: + # Check if the camera has a close method and call it + if hasattr(onvif_camera, "close"): + await onvif_camera.close() + except Exception as e: + logger.debug(f"Error closing ONVIF camera session: {e}")