mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-12-06 13:34:13 +03:00
backend api endpoint
This commit is contained in:
parent
d41ee4ff88
commit
59fa93a50a
@ -3,11 +3,15 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from importlib.util import find_spec
|
||||
from pathlib import Path
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
import requests
|
||||
from fastapi import APIRouter, Depends, Request, Response
|
||||
from fastapi import APIRouter, Depends, Query, Request, Response
|
||||
from fastapi.responses import JSONResponse
|
||||
from onvif import ONVIFCamera, ONVIFError
|
||||
from zeep.exceptions import Fault, TransportError
|
||||
|
||||
from frigate.api.auth import require_role
|
||||
from frigate.api.defs.tags import Tags
|
||||
@ -452,3 +456,419 @@ def _extract_fps(r_frame_rate: str) -> float | None:
|
||||
return round(float(num) / float(den), 2)
|
||||
except (ValueError, ZeroDivisionError):
|
||||
return None
|
||||
|
||||
|
||||
@router.get("/onvif/probe", dependencies=[Depends(require_role(["admin"]))])
|
||||
async def onvif_probe(
|
||||
request: Request,
|
||||
host: str = Query(None),
|
||||
port: int = Query(80),
|
||||
username: str = Query(""),
|
||||
password: str = Query(""),
|
||||
test: bool = Query(False),
|
||||
):
|
||||
"""
|
||||
Probe a single ONVIF device to determine capabilities.
|
||||
|
||||
Connects to an ONVIF device and queries for:
|
||||
- Device information (manufacturer, model)
|
||||
- Media profiles count
|
||||
- PTZ support
|
||||
- Available presets
|
||||
- Autotrack move status support
|
||||
|
||||
Query Parameters:
|
||||
host: Device host/IP address (required)
|
||||
port: Device port (default 8080)
|
||||
username: ONVIF username (optional)
|
||||
password: ONVIF password (optional)
|
||||
test: run ffprobe on the stream (optional)
|
||||
|
||||
Returns:
|
||||
JSON with device capabilities information
|
||||
"""
|
||||
if not host:
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": "host parameter is required"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
# Validate host format
|
||||
if not _is_valid_host(host):
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": "Invalid host format"},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
onvif_camera = None
|
||||
|
||||
try:
|
||||
logger.debug(f"Probing ONVIF device at {host}:{port}")
|
||||
|
||||
# Resolve wsdl directory from the installed onvif package (if available)
|
||||
try:
|
||||
wsdl_base = None
|
||||
spec = find_spec("onvif")
|
||||
if spec and getattr(spec, "origin", None):
|
||||
wsdl_base = str(Path(spec.origin).parent / "wsdl")
|
||||
except Exception:
|
||||
wsdl_base = None
|
||||
|
||||
onvif_camera = ONVIFCamera(
|
||||
host, port, username or "", password or "", wsdl_dir=wsdl_base
|
||||
)
|
||||
|
||||
await onvif_camera.update_xaddrs()
|
||||
|
||||
# Get device information
|
||||
device_info = {
|
||||
"manufacturer": "Unknown",
|
||||
"model": "Unknown",
|
||||
"firmware_version": "Unknown",
|
||||
}
|
||||
try:
|
||||
device_service = await onvif_camera.create_devicemgmt_service()
|
||||
device_info_resp = await device_service.GetDeviceInformation()
|
||||
manufacturer = getattr(device_info_resp, "Manufacturer", None) or (
|
||||
device_info_resp.get("Manufacturer")
|
||||
if isinstance(device_info_resp, dict)
|
||||
else None
|
||||
)
|
||||
model = getattr(device_info_resp, "Model", None) or (
|
||||
device_info_resp.get("Model")
|
||||
if isinstance(device_info_resp, dict)
|
||||
else None
|
||||
)
|
||||
firmware = getattr(device_info_resp, "FirmwareVersion", None) or (
|
||||
device_info_resp.get("FirmwareVersion")
|
||||
if isinstance(device_info_resp, dict)
|
||||
else None
|
||||
)
|
||||
device_info.update(
|
||||
{
|
||||
"manufacturer": manufacturer or "Unknown",
|
||||
"model": model or "Unknown",
|
||||
"firmware_version": firmware or "Unknown",
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to get device info")
|
||||
|
||||
# Get media profiles
|
||||
profiles = []
|
||||
profiles_count = 0
|
||||
first_profile_token = None
|
||||
ptz_config_token = None
|
||||
try:
|
||||
media_service = await onvif_camera.create_media_service()
|
||||
profiles = await media_service.GetProfiles()
|
||||
profiles_count = len(profiles) if profiles else 0
|
||||
if profiles and len(profiles) > 0:
|
||||
p = profiles[0]
|
||||
first_profile_token = getattr(p, "token", None) or (
|
||||
p.get("token") if isinstance(p, dict) else None
|
||||
)
|
||||
# Get PTZ configuration token from the profile
|
||||
ptz_configuration = getattr(p, "PTZConfiguration", None) or (
|
||||
p.get("PTZConfiguration") if isinstance(p, dict) else None
|
||||
)
|
||||
if ptz_configuration:
|
||||
ptz_config_token = getattr(ptz_configuration, "token", None) or (
|
||||
ptz_configuration.get("token")
|
||||
if isinstance(ptz_configuration, dict)
|
||||
else None
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to get media profiles")
|
||||
|
||||
# Check PTZ support and capabilities
|
||||
ptz_supported = False
|
||||
presets_count = 0
|
||||
autotrack_supported = False
|
||||
|
||||
try:
|
||||
ptz_service = await onvif_camera.create_ptz_service()
|
||||
|
||||
# Check if PTZ service is available
|
||||
try:
|
||||
await ptz_service.GetServiceCapabilities()
|
||||
ptz_supported = True
|
||||
logger.debug("PTZ service is available")
|
||||
except Exception as e:
|
||||
logger.debug(f"PTZ service not available: {e}")
|
||||
ptz_supported = False
|
||||
|
||||
# Try to get presets if PTZ is supported and we have a profile
|
||||
if ptz_supported and first_profile_token:
|
||||
try:
|
||||
presets_resp = await ptz_service.GetPresets(
|
||||
{"ProfileToken": first_profile_token}
|
||||
)
|
||||
presets_count = len(presets_resp) if presets_resp else 0
|
||||
logger.debug(f"Found {presets_count} presets")
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to get presets: {e}")
|
||||
presets_count = 0
|
||||
|
||||
# Check for autotracking support - requires both FOV relative movement and MoveStatus
|
||||
if ptz_supported and first_profile_token and ptz_config_token:
|
||||
# First check for FOV relative movement support
|
||||
pt_r_fov_supported = False
|
||||
try:
|
||||
config_request = ptz_service.create_type("GetConfigurationOptions")
|
||||
config_request.ConfigurationToken = ptz_config_token
|
||||
ptz_config = await ptz_service.GetConfigurationOptions(
|
||||
config_request
|
||||
)
|
||||
|
||||
if ptz_config:
|
||||
# Check for pt-r-fov support
|
||||
spaces = getattr(ptz_config, "Spaces", None) or (
|
||||
ptz_config.get("Spaces")
|
||||
if isinstance(ptz_config, dict)
|
||||
else None
|
||||
)
|
||||
|
||||
if spaces:
|
||||
rel_pan_tilt_space = getattr(
|
||||
spaces, "RelativePanTiltTranslationSpace", None
|
||||
) or (
|
||||
spaces.get("RelativePanTiltTranslationSpace")
|
||||
if isinstance(spaces, dict)
|
||||
else None
|
||||
)
|
||||
|
||||
if rel_pan_tilt_space:
|
||||
# Look for FOV space
|
||||
for i, space in enumerate(rel_pan_tilt_space):
|
||||
uri = None
|
||||
if isinstance(space, dict):
|
||||
uri = space.get("URI")
|
||||
else:
|
||||
uri = getattr(space, "URI", None)
|
||||
|
||||
if uri and "TranslationSpaceFov" in uri:
|
||||
pt_r_fov_supported = True
|
||||
logger.debug(
|
||||
"FOV relative movement (pt-r-fov) supported"
|
||||
)
|
||||
break
|
||||
|
||||
logger.debug(f"PTZ config spaces: {ptz_config}")
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to check FOV relative movement: {e}")
|
||||
pt_r_fov_supported = False
|
||||
|
||||
# Now check for MoveStatus support via GetServiceCapabilities
|
||||
if pt_r_fov_supported:
|
||||
try:
|
||||
service_capabilities_request = ptz_service.create_type(
|
||||
"GetServiceCapabilities"
|
||||
)
|
||||
service_capabilities = await ptz_service.GetServiceCapabilities(
|
||||
service_capabilities_request
|
||||
)
|
||||
|
||||
# Look for MoveStatus in the capabilities
|
||||
move_status_capable = False
|
||||
if service_capabilities:
|
||||
# Try to find MoveStatus key recursively
|
||||
def find_move_status(obj, key="MoveStatus"):
|
||||
if isinstance(obj, dict):
|
||||
if key in obj:
|
||||
return obj[key]
|
||||
for v in obj.values():
|
||||
result = find_move_status(v, key)
|
||||
if result is not None:
|
||||
return result
|
||||
elif hasattr(obj, key):
|
||||
return getattr(obj, key)
|
||||
elif hasattr(obj, "__dict__"):
|
||||
for v in vars(obj).values():
|
||||
result = find_move_status(v, key)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
move_status_value = find_move_status(service_capabilities)
|
||||
|
||||
# MoveStatus should return "true" if supported
|
||||
if isinstance(move_status_value, bool):
|
||||
move_status_capable = move_status_value
|
||||
elif isinstance(move_status_value, str):
|
||||
move_status_capable = (
|
||||
move_status_value.lower() == "true"
|
||||
)
|
||||
|
||||
logger.debug(f"MoveStatus capability: {move_status_value}")
|
||||
|
||||
# Autotracking is supported if both conditions are met
|
||||
autotrack_supported = pt_r_fov_supported and move_status_capable
|
||||
|
||||
if autotrack_supported:
|
||||
logger.debug(
|
||||
"Autotracking fully supported (pt-r-fov + MoveStatus)"
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"Autotracking not fully supported - pt-r-fov: {pt_r_fov_supported}, MoveStatus: {move_status_capable}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to check MoveStatus support: {e}")
|
||||
autotrack_supported = False
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to probe PTZ service: {e}")
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"host": host,
|
||||
"port": port,
|
||||
"manufacturer": device_info["manufacturer"],
|
||||
"model": device_info["model"],
|
||||
"firmware_version": device_info["firmware_version"],
|
||||
"profiles_count": profiles_count,
|
||||
"ptz_supported": ptz_supported,
|
||||
"presets_count": presets_count,
|
||||
"autotrack_supported": autotrack_supported,
|
||||
}
|
||||
|
||||
# Gather RTSP candidates
|
||||
rtsp_candidates: list[dict] = []
|
||||
try:
|
||||
media_service = await onvif_camera.create_media_service()
|
||||
if profiles_count and media_service:
|
||||
for p in profiles or []:
|
||||
token = getattr(p, "token", None) or (
|
||||
p.get("token") if isinstance(p, dict) else None
|
||||
)
|
||||
if not token:
|
||||
continue
|
||||
try:
|
||||
stream_setup = {
|
||||
"Stream": "RTP-Unicast",
|
||||
"Transport": {"Protocol": "RTSP"},
|
||||
}
|
||||
stream_req = {
|
||||
"ProfileToken": token,
|
||||
"StreamSetup": stream_setup,
|
||||
}
|
||||
stream_uri_resp = await media_service.GetStreamUri(stream_req)
|
||||
uri = (
|
||||
stream_uri_resp.get("Uri")
|
||||
if isinstance(stream_uri_resp, dict)
|
||||
else getattr(stream_uri_resp, "Uri", None)
|
||||
)
|
||||
if uri:
|
||||
rtsp_candidates.append(
|
||||
{
|
||||
"source": "GetStreamUri",
|
||||
"profile_token": token,
|
||||
"uri": uri,
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# Add common RTSP patterns as fallback
|
||||
if not rtsp_candidates:
|
||||
common_paths = [
|
||||
"/h264",
|
||||
"/live.sdp",
|
||||
"/media.amp",
|
||||
"/Streaming/Channels/101",
|
||||
"/Streaming/Channels/1",
|
||||
"/stream1",
|
||||
"/cam/realmonitor?channel=1&subtype=0",
|
||||
"/11",
|
||||
]
|
||||
auth_str = f"{username}:{password}@" if username and password else ""
|
||||
rtsp_port = 554
|
||||
for path in common_paths:
|
||||
uri = f"rtsp://{auth_str}{host}:{rtsp_port}{path}"
|
||||
rtsp_candidates.append({"source": "pattern", "uri": uri})
|
||||
except Exception:
|
||||
logger.debug("Failed to collect RTSP candidates")
|
||||
|
||||
# Optionally test RTSP candidates using ffprobe_stream
|
||||
tested_candidates = []
|
||||
if test and rtsp_candidates:
|
||||
for c in rtsp_candidates:
|
||||
uri = c["uri"]
|
||||
to_test = [uri]
|
||||
try:
|
||||
if (
|
||||
username
|
||||
and password
|
||||
and isinstance(uri, str)
|
||||
and uri.startswith("rtsp://")
|
||||
and "@" not in uri
|
||||
):
|
||||
cred = f"{quote_plus(username)}:{quote_plus(password)}@"
|
||||
cred_uri = uri.replace("rtsp://", f"rtsp://{cred}", 1)
|
||||
if cred_uri not in to_test:
|
||||
to_test.append(cred_uri)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for test_uri in to_test:
|
||||
try:
|
||||
probe = ffprobe_stream(
|
||||
request.app.frigate_config.ffmpeg, test_uri, detailed=False
|
||||
)
|
||||
print(probe)
|
||||
ok = probe is not None and getattr(probe, "returncode", 1) == 0
|
||||
tested_candidates.append(
|
||||
{
|
||||
"uri": test_uri,
|
||||
"source": c.get("source"),
|
||||
"ok": ok,
|
||||
"profile_token": c.get("profile_token"),
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"Unable to probe stream: {e}")
|
||||
tested_candidates.append(
|
||||
{
|
||||
"uri": test_uri,
|
||||
"source": c.get("source"),
|
||||
"ok": False,
|
||||
"profile_token": c.get("profile_token"),
|
||||
}
|
||||
)
|
||||
|
||||
result["rtsp_candidates"] = rtsp_candidates
|
||||
if test:
|
||||
result["rtsp_tested"] = tested_candidates
|
||||
|
||||
logger.debug(f"ONVIF probe successful: {result}")
|
||||
return JSONResponse(content=result)
|
||||
|
||||
except ONVIFError as e:
|
||||
logger.warning(f"ONVIF error probing {host}:{port}: {e}")
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": "ONVIF error"},
|
||||
status_code=400,
|
||||
)
|
||||
except (Fault, TransportError) as e:
|
||||
logger.warning(f"Connection error probing {host}:{port}: {e}")
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": "Connection error"},
|
||||
status_code=503,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error probing ONVIF device at {host}:{port}, {e}")
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": "Probe failed"},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
finally:
|
||||
# Best-effort cleanup of ONVIF camera client session
|
||||
if onvif_camera is not None:
|
||||
try:
|
||||
# Check if the camera has a close method and call it
|
||||
if hasattr(onvif_camera, "close"):
|
||||
await onvif_camera.close()
|
||||
except Exception as e:
|
||||
logger.debug(f"Error closing ONVIF camera session: {e}")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user