frigate/frigate/api/camera.py
Josh Hawkins e4eac4ac81
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Add Camera Wizard improvements (#20876)
* backend api endpoint

* don't add no-credentials version of streams to rtsp_candidates

* frontend types

* improve types

* add optional probe dialog to wizard step 1

* i18n

* form description and field change

* add onvif form description

* match onvif probe pane with other steps in the wizard

* refactor to add probe and snapshot as step 2

* consolidate probe dialog

* don't change dialog size

* radio button style

* refactor to select onvif urls via combobox in step 3

* i18n

* add scrollbar container

* i18n cleanup

* fix button activity indicator

* match test parsing in step 3 with step 2

* hide resolution if both width and height are zero

* use drawer for stream selection on mobile in step 3

* suppress double toasts

* api endpoint description
2025-11-10 15:49:52 -06:00

927 lines
35 KiB
Python

"""Camera apis."""
import json
import logging
import re
from importlib.util import find_spec
from pathlib import Path
from urllib.parse import quote_plus
import requests
from fastapi import APIRouter, Depends, Query, Request, Response
from fastapi.responses import JSONResponse
from onvif import ONVIFCamera, ONVIFError
from zeep.exceptions import Fault, TransportError
from frigate.api.auth import require_role
from frigate.api.defs.tags import Tags
from frigate.config.config import FrigateConfig
from frigate.util.builtin import clean_camera_user_pass
from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import ffprobe_stream
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.camera])
def _is_valid_host(host: str) -> bool:
"""
Validate that the host is in a valid format.
Allows private IPs since cameras are typically on local networks.
Only blocks obviously malicious input to prevent injection attacks.
"""
try:
# Remove port if present
host_without_port = host.split(":")[0] if ":" in host else host
# Block whitespace, newlines, and control characters
if not host_without_port or re.search(r"[\s\x00-\x1f]", host_without_port):
return False
# Allow standard hostname/IP characters: alphanumeric, dots, hyphens
if not re.match(r"^[a-zA-Z0-9.-]+$", host_without_port):
return False
return True
except Exception:
return False
@router.get("/go2rtc/streams")
def go2rtc_streams():
r = requests.get("http://127.0.0.1:1984/api/streams")
if not r.ok:
logger.error("Failed to fetch streams from go2rtc")
return JSONResponse(
content=({"success": False, "message": "Error fetching stream data"}),
status_code=500,
)
stream_data = r.json()
for data in stream_data.values():
for producer in data.get("producers") or []:
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
return JSONResponse(content=stream_data)
@router.get("/go2rtc/streams/{camera_name}")
def go2rtc_camera_stream(request: Request, camera_name: str):
r = requests.get(
f"http://127.0.0.1:1984/api/streams?src={camera_name}&video=all&audio=all&microphone"
)
if not r.ok:
camera_config = request.app.frigate_config.cameras.get(camera_name)
if camera_config and camera_config.enabled:
logger.error("Failed to fetch streams from go2rtc")
return JSONResponse(
content=({"success": False, "message": "Error fetching stream data"}),
status_code=500,
)
stream_data = r.json()
for producer in stream_data.get("producers", []):
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
return JSONResponse(content=stream_data)
@router.put(
"/go2rtc/streams/{stream_name}", dependencies=[Depends(require_role(["admin"]))]
)
def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""):
"""Add or update a go2rtc stream configuration."""
try:
params = {"name": stream_name}
if src:
params["src"] = src
r = requests.put(
"http://127.0.0.1:1984/api/streams",
params=params,
timeout=10,
)
if not r.ok:
logger.error(f"Failed to add go2rtc stream {stream_name}: {r.text}")
return JSONResponse(
content=(
{"success": False, "message": f"Failed to add stream: {r.text}"}
),
status_code=r.status_code,
)
return JSONResponse(
content={"success": True, "message": "Stream added successfully"}
)
except requests.RequestException as e:
logger.error(f"Error communicating with go2rtc: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error communicating with go2rtc",
}
),
status_code=500,
)
@router.delete(
"/go2rtc/streams/{stream_name}", dependencies=[Depends(require_role(["admin"]))]
)
def go2rtc_delete_stream(stream_name: str):
"""Delete a go2rtc stream."""
try:
r = requests.delete(
"http://127.0.0.1:1984/api/streams",
params={"src": stream_name},
timeout=10,
)
if not r.ok:
logger.error(f"Failed to delete go2rtc stream {stream_name}: {r.text}")
return JSONResponse(
content=(
{"success": False, "message": f"Failed to delete stream: {r.text}"}
),
status_code=r.status_code,
)
return JSONResponse(
content={"success": True, "message": "Stream deleted successfully"}
)
except requests.RequestException as e:
logger.error(f"Error communicating with go2rtc: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error communicating with go2rtc",
}
),
status_code=500,
)
@router.get("/ffprobe")
def ffprobe(request: Request, paths: str = "", detailed: bool = False):
path_param = paths
if not path_param:
return JSONResponse(
content=({"success": False, "message": "Path needs to be provided."}),
status_code=404,
)
if path_param.startswith("camera"):
camera = path_param[7:]
if camera not in request.app.frigate_config.cameras.keys():
return JSONResponse(
content=(
{"success": False, "message": f"{camera} is not a valid camera."}
),
status_code=404,
)
if not request.app.frigate_config.cameras[camera].enabled:
return JSONResponse(
content=({"success": False, "message": f"{camera} is not enabled."}),
status_code=404,
)
paths = map(
lambda input: input.path,
request.app.frigate_config.cameras[camera].ffmpeg.inputs,
)
elif "," in clean_camera_user_pass(path_param):
paths = path_param.split(",")
else:
paths = [path_param]
# user has multiple streams
output = []
for path in paths:
ffprobe = ffprobe_stream(
request.app.frigate_config.ffmpeg, path.strip(), detailed=detailed
)
if ffprobe.returncode != 0:
try:
stderr_decoded = ffprobe.stderr.decode("utf-8")
except UnicodeDecodeError:
try:
stderr_decoded = ffprobe.stderr.decode("unicode_escape")
except Exception:
stderr_decoded = str(ffprobe.stderr)
stderr_lines = [
line.strip() for line in stderr_decoded.split("\n") if line.strip()
]
result = {
"return_code": ffprobe.returncode,
"stderr": stderr_lines,
"stdout": "",
}
else:
result = {
"return_code": ffprobe.returncode,
"stderr": [],
"stdout": json.loads(ffprobe.stdout.decode("unicode_escape").strip()),
}
# Add detailed metadata if requested and probe was successful
if detailed and ffprobe.returncode == 0 and result["stdout"]:
try:
probe_data = result["stdout"]
metadata = {}
# Extract video stream information
video_stream = None
audio_stream = None
for stream in probe_data.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
elif stream.get("codec_type") == "audio":
audio_stream = stream
# Video metadata
if video_stream:
metadata["video"] = {
"codec": video_stream.get("codec_name"),
"width": video_stream.get("width"),
"height": video_stream.get("height"),
"fps": _extract_fps(video_stream.get("avg_frame_rate")),
"pixel_format": video_stream.get("pix_fmt"),
"profile": video_stream.get("profile"),
"level": video_stream.get("level"),
}
# Calculate resolution string
if video_stream.get("width") and video_stream.get("height"):
metadata["video"]["resolution"] = (
f"{video_stream['width']}x{video_stream['height']}"
)
# Audio metadata
if audio_stream:
metadata["audio"] = {
"codec": audio_stream.get("codec_name"),
"channels": audio_stream.get("channels"),
"sample_rate": audio_stream.get("sample_rate"),
"channel_layout": audio_stream.get("channel_layout"),
}
# Container/format metadata
if probe_data.get("format"):
format_info = probe_data["format"]
metadata["container"] = {
"format": format_info.get("format_name"),
"duration": format_info.get("duration"),
"size": format_info.get("size"),
}
result["metadata"] = metadata
except Exception as e:
logger.warning(f"Failed to extract detailed metadata: {e}")
# Continue without metadata if parsing fails
output.append(result)
return JSONResponse(content=output)
@router.get("/ffprobe/snapshot", dependencies=[Depends(require_role(["admin"]))])
def ffprobe_snapshot(request: Request, url: str = "", timeout: int = 10):
"""Get a snapshot from a stream URL using ffmpeg."""
if not url:
return JSONResponse(
content={"success": False, "message": "URL parameter is required"},
status_code=400,
)
config: FrigateConfig = request.app.frigate_config
image_data, error = run_ffmpeg_snapshot(
config.ffmpeg, url, "mjpeg", timeout=timeout
)
if image_data:
return Response(
image_data,
media_type="image/jpeg",
headers={"Cache-Control": "no-store"},
)
elif error == "timeout":
return JSONResponse(
content={"success": False, "message": "Timeout capturing snapshot"},
status_code=408,
)
else:
logger.error(f"ffmpeg failed: {error}")
return JSONResponse(
content={"success": False, "message": "Failed to capture snapshot"},
status_code=500,
)
@router.get("/reolink/detect", dependencies=[Depends(require_role(["admin"]))])
def reolink_detect(host: str = "", username: str = "", password: str = ""):
"""
Detect Reolink camera capabilities and recommend optimal protocol.
Queries the Reolink camera API to determine the camera's resolution
and recommends either http-flv (for 5MP and below) or rtsp (for higher resolutions).
"""
if not host:
return JSONResponse(
content={"success": False, "message": "Host parameter is required"},
status_code=400,
)
if not username:
return JSONResponse(
content={"success": False, "message": "Username parameter is required"},
status_code=400,
)
if not password:
return JSONResponse(
content={"success": False, "message": "Password parameter is required"},
status_code=400,
)
# Validate host format to prevent injection attacks
if not _is_valid_host(host):
return JSONResponse(
content={"success": False, "message": "Invalid host format"},
status_code=400,
)
try:
# URL-encode credentials to prevent injection
encoded_user = quote_plus(username)
encoded_password = quote_plus(password)
api_url = f"http://{host}/api.cgi?cmd=GetEnc&user={encoded_user}&password={encoded_password}"
response = requests.get(api_url, timeout=5)
if not response.ok:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": f"Failed to connect to camera API: HTTP {response.status_code}",
},
status_code=200,
)
data = response.json()
enc_data = data[0] if isinstance(data, list) and len(data) > 0 else data
stream_info = None
if isinstance(enc_data, dict):
if enc_data.get("value", {}).get("Enc"):
stream_info = enc_data["value"]["Enc"]
elif enc_data.get("Enc"):
stream_info = enc_data["Enc"]
if not stream_info or not stream_info.get("mainStream"):
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Could not find stream information in API response",
}
)
main_stream = stream_info["mainStream"]
width = main_stream.get("width", 0)
height = main_stream.get("height", 0)
if not width or not height:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Could not determine camera resolution",
}
)
megapixels = (width * height) / 1_000_000
protocol = "http-flv" if megapixels <= 5.0 else "rtsp"
return JSONResponse(
content={
"success": True,
"protocol": protocol,
"resolution": f"{width}x{height}",
"megapixels": round(megapixels, 2),
}
)
except requests.exceptions.Timeout:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Connection timeout - camera did not respond",
}
)
except requests.exceptions.RequestException:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Failed to connect to camera",
}
)
except Exception:
logger.exception(f"Error detecting Reolink camera at {host}")
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Unable to detect camera capabilities",
}
)
def _extract_fps(r_frame_rate: str) -> float | None:
"""Extract FPS from ffprobe avg_frame_rate / r_frame_rate string (e.g., '30/1' -> 30.0)"""
if not r_frame_rate:
return None
try:
num, den = r_frame_rate.split("/")
return round(float(num) / float(den), 2)
except (ValueError, ZeroDivisionError):
return None
@router.get(
"/onvif/probe",
dependencies=[Depends(require_role(["admin"]))],
summary="Probe ONVIF device",
description=(
"Probe an ONVIF device to determine capabilities and optionally test available stream URIs. "
"Query params: host (required), port (default 80), username, password, test (boolean)."
),
)
async def onvif_probe(
request: Request,
host: str = Query(None),
port: int = Query(80),
username: str = Query(""),
password: str = Query(""),
test: bool = Query(False),
):
"""
Probe a single ONVIF device to determine capabilities.
Connects to an ONVIF device and queries for:
- Device information (manufacturer, model)
- Media profiles count
- PTZ support
- Available presets
- Autotracking support
Query Parameters:
host: Device host/IP address (required)
port: Device port (default 80)
username: ONVIF username (optional)
password: ONVIF password (optional)
test: run ffprobe on the stream (optional)
Returns:
JSON with device capabilities information
"""
if not host:
return JSONResponse(
content={"success": False, "message": "host parameter is required"},
status_code=400,
)
# Validate host format
if not _is_valid_host(host):
return JSONResponse(
content={"success": False, "message": "Invalid host format"},
status_code=400,
)
onvif_camera = None
try:
logger.debug(f"Probing ONVIF device at {host}:{port}")
try:
wsdl_base = None
spec = find_spec("onvif")
if spec and getattr(spec, "origin", None):
wsdl_base = str(Path(spec.origin).parent / "wsdl")
except Exception:
wsdl_base = None
onvif_camera = ONVIFCamera(
host, port, username or "", password or "", wsdl_dir=wsdl_base
)
await onvif_camera.update_xaddrs()
# Get device information
device_info = {
"manufacturer": "Unknown",
"model": "Unknown",
"firmware_version": "Unknown",
}
try:
device_service = await onvif_camera.create_devicemgmt_service()
device_info_resp = await device_service.GetDeviceInformation()
manufacturer = getattr(device_info_resp, "Manufacturer", None) or (
device_info_resp.get("Manufacturer")
if isinstance(device_info_resp, dict)
else None
)
model = getattr(device_info_resp, "Model", None) or (
device_info_resp.get("Model")
if isinstance(device_info_resp, dict)
else None
)
firmware = getattr(device_info_resp, "FirmwareVersion", None) or (
device_info_resp.get("FirmwareVersion")
if isinstance(device_info_resp, dict)
else None
)
device_info.update(
{
"manufacturer": manufacturer or "Unknown",
"model": model or "Unknown",
"firmware_version": firmware or "Unknown",
}
)
except Exception:
logger.debug("Failed to get device info")
# Get media profiles
profiles = []
profiles_count = 0
first_profile_token = None
ptz_config_token = None
try:
media_service = await onvif_camera.create_media_service()
profiles = await media_service.GetProfiles()
profiles_count = len(profiles) if profiles else 0
if profiles and len(profiles) > 0:
p = profiles[0]
first_profile_token = getattr(p, "token", None) or (
p.get("token") if isinstance(p, dict) else None
)
# Get PTZ configuration token from the profile
ptz_configuration = getattr(p, "PTZConfiguration", None) or (
p.get("PTZConfiguration") if isinstance(p, dict) else None
)
if ptz_configuration:
ptz_config_token = getattr(ptz_configuration, "token", None) or (
ptz_configuration.get("token")
if isinstance(ptz_configuration, dict)
else None
)
except Exception:
logger.debug("Failed to get media profiles")
# Check PTZ support and capabilities
ptz_supported = False
presets_count = 0
autotrack_supported = False
try:
ptz_service = await onvif_camera.create_ptz_service()
# Check if PTZ service is available
try:
await ptz_service.GetServiceCapabilities()
ptz_supported = True
logger.debug("PTZ service is available")
except Exception as e:
logger.debug(f"PTZ service not available: {e}")
ptz_supported = False
# Try to get presets if PTZ is supported and we have a profile
if ptz_supported and first_profile_token:
try:
presets_resp = await ptz_service.GetPresets(
{"ProfileToken": first_profile_token}
)
presets_count = len(presets_resp) if presets_resp else 0
logger.debug(f"Found {presets_count} presets")
except Exception as e:
logger.debug(f"Failed to get presets: {e}")
presets_count = 0
# Check for autotracking support - requires both FOV relative movement and MoveStatus
if ptz_supported and first_profile_token and ptz_config_token:
# First check for FOV relative movement support
pt_r_fov_supported = False
try:
config_request = ptz_service.create_type("GetConfigurationOptions")
config_request.ConfigurationToken = ptz_config_token
ptz_config = await ptz_service.GetConfigurationOptions(
config_request
)
if ptz_config:
# Check for pt-r-fov support
spaces = getattr(ptz_config, "Spaces", None) or (
ptz_config.get("Spaces")
if isinstance(ptz_config, dict)
else None
)
if spaces:
rel_pan_tilt_space = getattr(
spaces, "RelativePanTiltTranslationSpace", None
) or (
spaces.get("RelativePanTiltTranslationSpace")
if isinstance(spaces, dict)
else None
)
if rel_pan_tilt_space:
# Look for FOV space
for i, space in enumerate(rel_pan_tilt_space):
uri = None
if isinstance(space, dict):
uri = space.get("URI")
else:
uri = getattr(space, "URI", None)
if uri and "TranslationSpaceFov" in uri:
pt_r_fov_supported = True
logger.debug(
"FOV relative movement (pt-r-fov) supported"
)
break
logger.debug(f"PTZ config spaces: {ptz_config}")
except Exception as e:
logger.debug(f"Failed to check FOV relative movement: {e}")
pt_r_fov_supported = False
# Now check for MoveStatus support via GetServiceCapabilities
if pt_r_fov_supported:
try:
service_capabilities_request = ptz_service.create_type(
"GetServiceCapabilities"
)
service_capabilities = await ptz_service.GetServiceCapabilities(
service_capabilities_request
)
# Look for MoveStatus in the capabilities
move_status_capable = False
if service_capabilities:
# Try to find MoveStatus key recursively
def find_move_status(obj, key="MoveStatus"):
if isinstance(obj, dict):
if key in obj:
return obj[key]
for v in obj.values():
result = find_move_status(v, key)
if result is not None:
return result
elif hasattr(obj, key):
return getattr(obj, key)
elif hasattr(obj, "__dict__"):
for v in vars(obj).values():
result = find_move_status(v, key)
if result is not None:
return result
return None
move_status_value = find_move_status(service_capabilities)
# MoveStatus should return "true" if supported
if isinstance(move_status_value, bool):
move_status_capable = move_status_value
elif isinstance(move_status_value, str):
move_status_capable = (
move_status_value.lower() == "true"
)
logger.debug(f"MoveStatus capability: {move_status_value}")
# Autotracking is supported if both conditions are met
autotrack_supported = pt_r_fov_supported and move_status_capable
if autotrack_supported:
logger.debug(
"Autotracking fully supported (pt-r-fov + MoveStatus)"
)
else:
logger.debug(
f"Autotracking not fully supported - pt-r-fov: {pt_r_fov_supported}, MoveStatus: {move_status_capable}"
)
except Exception as e:
logger.debug(f"Failed to check MoveStatus support: {e}")
autotrack_supported = False
except Exception as e:
logger.debug(f"Failed to probe PTZ service: {e}")
result = {
"success": True,
"host": host,
"port": port,
"manufacturer": device_info["manufacturer"],
"model": device_info["model"],
"firmware_version": device_info["firmware_version"],
"profiles_count": profiles_count,
"ptz_supported": ptz_supported,
"presets_count": presets_count,
"autotrack_supported": autotrack_supported,
}
# Gather RTSP candidates
rtsp_candidates: list[dict] = []
try:
media_service = await onvif_camera.create_media_service()
if profiles_count and media_service:
for p in profiles or []:
token = getattr(p, "token", None) or (
p.get("token") if isinstance(p, dict) else None
)
if not token:
continue
try:
stream_setup = {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
}
stream_req = {
"ProfileToken": token,
"StreamSetup": stream_setup,
}
stream_uri_resp = await media_service.GetStreamUri(stream_req)
uri = (
stream_uri_resp.get("Uri")
if isinstance(stream_uri_resp, dict)
else getattr(stream_uri_resp, "Uri", None)
)
if uri:
logger.debug(
f"GetStreamUri returned for token {token}: {uri}"
)
# If credentials were provided, do NOT add the unauthenticated URI.
try:
if isinstance(uri, str) and uri.startswith("rtsp://"):
if username and password and "@" not in uri:
# Inject URL-encoded credentials and add only the
# authenticated version.
cred = f"{quote_plus(username)}:{quote_plus(password)}@"
injected = uri.replace(
"rtsp://", f"rtsp://{cred}", 1
)
rtsp_candidates.append(
{
"source": "GetStreamUri",
"profile_token": token,
"uri": injected,
}
)
else:
# No credentials provided or URI already contains
# credentials — add the URI as returned.
rtsp_candidates.append(
{
"source": "GetStreamUri",
"profile_token": token,
"uri": uri,
}
)
else:
# Non-RTSP URIs (e.g., http-flv) — add as returned.
rtsp_candidates.append(
{
"source": "GetStreamUri",
"profile_token": token,
"uri": uri,
}
)
except Exception as e:
logger.debug(
f"Skipping stream URI for token {token} due to processing error: {e}"
)
continue
except Exception:
logger.debug(
f"GetStreamUri failed for token {token}", exc_info=True
)
continue
# Add common RTSP patterns as fallback
if not rtsp_candidates:
common_paths = [
"/h264",
"/live.sdp",
"/media.amp",
"/Streaming/Channels/101",
"/Streaming/Channels/1",
"/stream1",
"/cam/realmonitor?channel=1&subtype=0",
"/11",
]
# Use URL-encoded credentials for pattern fallback URIs when provided
auth_str = (
f"{quote_plus(username)}:{quote_plus(password)}@"
if username and password
else ""
)
rtsp_port = 554
for path in common_paths:
uri = f"rtsp://{auth_str}{host}:{rtsp_port}{path}"
rtsp_candidates.append({"source": "pattern", "uri": uri})
except Exception:
logger.debug("Failed to collect RTSP candidates")
# Optionally test RTSP candidates using ffprobe_stream
tested_candidates = []
if test and rtsp_candidates:
for c in rtsp_candidates:
uri = c["uri"]
to_test = [uri]
try:
if (
username
and password
and isinstance(uri, str)
and uri.startswith("rtsp://")
and "@" not in uri
):
cred = f"{quote_plus(username)}:{quote_plus(password)}@"
cred_uri = uri.replace("rtsp://", f"rtsp://{cred}", 1)
if cred_uri not in to_test:
to_test.append(cred_uri)
except Exception:
pass
for test_uri in to_test:
try:
probe = ffprobe_stream(
request.app.frigate_config.ffmpeg, test_uri, detailed=False
)
print(probe)
ok = probe is not None and getattr(probe, "returncode", 1) == 0
tested_candidates.append(
{
"uri": test_uri,
"source": c.get("source"),
"ok": ok,
"profile_token": c.get("profile_token"),
}
)
except Exception as e:
logger.debug(f"Unable to probe stream: {e}")
tested_candidates.append(
{
"uri": test_uri,
"source": c.get("source"),
"ok": False,
"profile_token": c.get("profile_token"),
}
)
result["rtsp_candidates"] = rtsp_candidates
if test:
result["rtsp_tested"] = tested_candidates
logger.debug(f"ONVIF probe successful: {result}")
return JSONResponse(content=result)
except ONVIFError as e:
logger.warning(f"ONVIF error probing {host}:{port}: {e}")
return JSONResponse(
content={"success": False, "message": "ONVIF error"},
status_code=400,
)
except (Fault, TransportError) as e:
logger.warning(f"Connection error probing {host}:{port}: {e}")
return JSONResponse(
content={"success": False, "message": "Connection error"},
status_code=503,
)
except Exception as e:
logger.warning(f"Error probing ONVIF device at {host}:{port}, {e}")
return JSONResponse(
content={"success": False, "message": "Probe failed"},
status_code=500,
)
finally:
# Best-effort cleanup of ONVIF camera client session
if onvif_camera is not None:
try:
# Check if the camera has a close method and call it
if hasattr(onvif_camera, "close"):
await onvif_camera.close()
except Exception as e:
logger.debug(f"Error closing ONVIF camera session: {e}")