frigate/frigate/api/camera.py
2026-06-16 13:07:20 -04:00

1410 lines
52 KiB
Python

"""Camera apis."""
import asyncio
import json
import logging
import re
from importlib.util import find_spec
from pathlib import Path
from urllib.parse import quote_plus
import httpx
import requests
from fastapi import APIRouter, Depends, Query, Request, Response
from fastapi.responses import JSONResponse
from filelock import FileLock, Timeout
from onvif import ONVIFCamera, ONVIFError
from ruamel.yaml import YAML
from zeep.exceptions import Fault, TransportError
from zeep.transports import AsyncTransport
from frigate.api.auth import (
_get_stream_owner_cameras,
allow_any_authenticated,
get_current_user,
require_go2rtc_stream_access,
require_role,
)
from frigate.api.defs.request.app_body import CameraSetBody
from frigate.api.defs.tags import Tags
from frigate.config import FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateTopic,
)
from frigate.config.env import substitute_frigate_vars
from frigate.models import User
from frigate.util.builtin import clean_camera_user_pass, get_record_segment_time
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
from frigate.util.config import find_config_file
from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import (
analyze_record_keyframes,
ffprobe_stream,
is_restricted_go2rtc_source,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.camera])
def _is_valid_host(host: str) -> bool:
"""
Validate that the host is in a valid format.
Allows private IPs since cameras are typically on local networks.
Only blocks obviously malicious input to prevent injection attacks.
"""
try:
# Remove port if present
host_without_port = host.split(":")[0] if ":" in host else host
# Block whitespace, newlines, and control characters
if not host_without_port or re.search(r"[\s\x00-\x1f]", host_without_port):
return False
# Allow standard hostname/IP characters: alphanumeric, dots, hyphens
if not re.match(r"^[a-zA-Z0-9.-]+$", host_without_port):
return False
return True
except Exception:
return False
@router.get("/go2rtc/streams", dependencies=[Depends(allow_any_authenticated())])
async def go2rtc_streams(request: Request):
r = requests.get("http://127.0.0.1:1984/api/streams")
if not r.ok:
logger.error("Failed to fetch streams from go2rtc")
return JSONResponse(
content=({"success": False, "message": "Error fetching stream data"}),
status_code=500,
)
stream_data = r.json()
# Roles with an explicit camera list see only streams owned by an allowed
# camera. Admin and full-access roles (no list / empty list) see all streams.
current_user = await get_current_user(request)
if not isinstance(current_user, JSONResponse):
role = current_user["role"]
roles_dict = request.app.frigate_config.auth.roles
if role != "admin" and roles_dict.get(role):
all_camera_names = set(request.app.frigate_config.cameras.keys())
allowed_cameras = set(
User.get_allowed_cameras(role, roles_dict, all_camera_names)
)
stream_data = {
name: data
for name, data in stream_data.items()
if _get_stream_owner_cameras(request, name) & allowed_cameras
}
for data in stream_data.values():
for producer in data.get("producers") or []:
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
return JSONResponse(content=stream_data)
@router.get(
"/go2rtc/streams/{stream_name}",
dependencies=[Depends(require_go2rtc_stream_access)],
)
def go2rtc_camera_stream(request: Request, stream_name: str):
r = requests.get(
"http://127.0.0.1:1984/api/streams",
params={
"src": stream_name,
"video": "all",
"audio": "all",
"microphone": "",
},
)
if not r.ok:
camera_config = request.app.frigate_config.cameras.get(stream_name)
if camera_config is None:
for camera_name, camera in request.app.frigate_config.cameras.items():
if stream_name in camera.live.streams.values():
camera_config = request.app.frigate_config.cameras.get(camera_name)
break
if camera_config and camera_config.enabled:
logger.error("Failed to fetch streams from go2rtc")
return JSONResponse(
content=({"success": False, "message": "Error fetching stream data"}),
status_code=500,
)
stream_data = r.json()
for producer in stream_data.get("producers", []):
producer["url"] = clean_camera_user_pass(producer.get("url", ""))
return JSONResponse(content=stream_data)
@router.put(
"/go2rtc/streams/{stream_name}", dependencies=[Depends(require_role(["admin"]))]
)
def go2rtc_add_stream(request: Request, stream_name: str, src: str = ""):
"""Add or update a go2rtc stream configuration."""
try:
params = {"name": stream_name}
if src:
try:
resolved_src = substitute_frigate_vars(src)
except KeyError:
resolved_src = src
if is_restricted_go2rtc_source(resolved_src):
logger.warning(
"Rejected go2rtc stream '%s' with restricted source type (echo/expr/exec)",
stream_name,
)
return JSONResponse(
content={
"success": False,
"message": "Restricted stream source type",
},
status_code=400,
)
params["src"] = resolved_src
r = requests.put(
"http://127.0.0.1:1984/api/streams",
params=params,
timeout=10,
)
if not r.ok:
logger.error(f"Failed to add go2rtc stream {stream_name}: {r.text}")
return JSONResponse(
content=(
{"success": False, "message": f"Failed to add stream: {r.text}"}
),
status_code=r.status_code,
)
return JSONResponse(
content={"success": True, "message": "Stream added successfully"}
)
except requests.RequestException as e:
logger.error(f"Error communicating with go2rtc: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error communicating with go2rtc",
}
),
status_code=500,
)
@router.delete(
"/go2rtc/streams/{stream_name}", dependencies=[Depends(require_role(["admin"]))]
)
def go2rtc_delete_stream(stream_name: str):
"""Delete a go2rtc stream."""
try:
r = requests.delete(
"http://127.0.0.1:1984/api/streams",
params={"src": stream_name},
timeout=10,
)
if not r.ok:
logger.error(f"Failed to delete go2rtc stream {stream_name}: {r.text}")
return JSONResponse(
content=(
{"success": False, "message": f"Failed to delete stream: {r.text}"}
),
status_code=r.status_code,
)
return JSONResponse(
content={"success": True, "message": "Stream deleted successfully"}
)
except requests.RequestException as e:
logger.error(f"Error communicating with go2rtc: {e}")
return JSONResponse(
content=(
{
"success": False,
"message": "Error communicating with go2rtc",
}
),
status_code=500,
)
@router.get("/ffprobe", dependencies=[Depends(require_role(["admin"]))])
def ffprobe(request: Request, paths: str = "", detailed: bool = False):
path_param = paths
if not path_param:
return JSONResponse(
content=({"success": False, "message": "Path needs to be provided."}),
status_code=404,
)
if path_param.startswith("camera"):
camera = path_param[7:]
if camera not in request.app.frigate_config.cameras.keys():
return JSONResponse(
content=(
{"success": False, "message": f"{camera} is not a valid camera."}
),
status_code=404,
)
if not request.app.frigate_config.cameras[camera].enabled:
return JSONResponse(
content=({"success": False, "message": f"{camera} is not enabled."}),
status_code=404,
)
paths = map(
lambda input: input.path,
request.app.frigate_config.cameras[camera].ffmpeg.inputs,
)
elif "," in clean_camera_user_pass(path_param):
paths = path_param.split(",")
else:
paths = [path_param]
# user has multiple streams
output = []
for path in paths:
ffprobe = ffprobe_stream(
request.app.frigate_config.ffmpeg, path.strip(), detailed=detailed
)
if ffprobe.returncode != 0:
try:
stderr_decoded = ffprobe.stderr.decode("utf-8")
except UnicodeDecodeError:
try:
stderr_decoded = ffprobe.stderr.decode("unicode_escape")
except Exception:
stderr_decoded = str(ffprobe.stderr)
stderr_lines = [
line.strip() for line in stderr_decoded.split("\n") if line.strip()
]
result = {
"return_code": ffprobe.returncode,
"stderr": stderr_lines,
"stdout": "",
}
else:
result = {
"return_code": ffprobe.returncode,
"stderr": [],
"stdout": json.loads(ffprobe.stdout.decode("unicode_escape").strip()),
}
# Add detailed metadata if requested and probe was successful
if detailed and ffprobe.returncode == 0 and result["stdout"]:
try:
probe_data = result["stdout"]
metadata = {}
# Extract video stream information
video_stream = None
audio_stream = None
for stream in probe_data.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
elif stream.get("codec_type") == "audio":
audio_stream = stream
# Video metadata
if video_stream:
metadata["video"] = {
"codec": video_stream.get("codec_name"),
"width": video_stream.get("width"),
"height": video_stream.get("height"),
"fps": _extract_fps(video_stream.get("avg_frame_rate")),
"pixel_format": video_stream.get("pix_fmt"),
"profile": video_stream.get("profile"),
"level": video_stream.get("level"),
}
# Calculate resolution string
if video_stream.get("width") and video_stream.get("height"):
metadata["video"]["resolution"] = (
f"{video_stream['width']}x{video_stream['height']}"
)
# Audio metadata
if audio_stream:
metadata["audio"] = {
"codec": audio_stream.get("codec_name"),
"channels": audio_stream.get("channels"),
"sample_rate": audio_stream.get("sample_rate"),
"channel_layout": audio_stream.get("channel_layout"),
}
# Container/format metadata
if probe_data.get("format"):
format_info = probe_data["format"]
metadata["container"] = {
"format": format_info.get("format_name"),
"duration": format_info.get("duration"),
"size": format_info.get("size"),
}
result["metadata"] = metadata
except Exception as e:
logger.warning(f"Failed to extract detailed metadata: {e}")
# Continue without metadata if parsing fails
output.append(result)
return JSONResponse(content=output)
@router.get("/keyframe_analysis", dependencies=[Depends(require_role(["admin"]))])
async def keyframe_analysis(request: Request, camera: str = ""):
"""Probe a camera's record stream and classify its keyframe spacing.
Detects smart/+ codecs and long/variable GOPs that degrade recording.
"""
config: FrigateConfig = request.app.frigate_config
if camera not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"{camera} is not a valid camera."},
status_code=404,
)
camera_config = config.cameras[camera]
if not camera_config.enabled:
return JSONResponse(
content={"success": False, "message": f"{camera} is not enabled."},
status_code=404,
)
# keyframe spacing only matters when this camera is recording
if not camera_config.record.enabled:
return JSONResponse(content={"severity": "record_disabled"})
# recording guarantees an input carries the record role; its index matches
# the "Stream N" numbering the ffprobe endpoint surfaces (same input order)
record_index, record_input = next(
(idx, i)
for idx, i in enumerate(camera_config.ffmpeg.inputs)
if "record" in i.roles
)
segment_time = get_record_segment_time(camera_config)
result = await analyze_record_keyframes(
config.ffmpeg, record_input.path, segment_time
)
result["stream_index"] = record_index
return JSONResponse(content=result)
@router.get("/ffprobe/snapshot", dependencies=[Depends(require_role(["admin"]))])
def ffprobe_snapshot(request: Request, url: str = "", timeout: int = 10):
"""Get a snapshot from a stream URL using ffmpeg."""
if not url:
return JSONResponse(
content={"success": False, "message": "URL parameter is required"},
status_code=400,
)
config: FrigateConfig = request.app.frigate_config
image_data, error = run_ffmpeg_snapshot(
config.ffmpeg, url, "mjpeg", timeout=timeout
)
if image_data:
return Response(
image_data,
media_type="image/jpeg",
headers={"Cache-Control": "no-store"},
)
elif error == "timeout":
return JSONResponse(
content={"success": False, "message": "Timeout capturing snapshot"},
status_code=408,
)
else:
logger.error(f"ffmpeg failed: {error}")
return JSONResponse(
content={"success": False, "message": "Failed to capture snapshot"},
status_code=500,
)
@router.get("/reolink/detect", dependencies=[Depends(require_role(["admin"]))])
def reolink_detect(host: str = "", username: str = "", password: str = ""):
"""
Detect Reolink camera capabilities and recommend optimal protocol.
Queries the Reolink camera API to determine the camera's resolution
and recommends either http-flv (for 5MP and below) or rtsp (for higher resolutions).
"""
if not host:
return JSONResponse(
content={"success": False, "message": "Host parameter is required"},
status_code=400,
)
if not username:
return JSONResponse(
content={"success": False, "message": "Username parameter is required"},
status_code=400,
)
if not password:
return JSONResponse(
content={"success": False, "message": "Password parameter is required"},
status_code=400,
)
# Validate host format to prevent injection attacks
if not _is_valid_host(host):
return JSONResponse(
content={"success": False, "message": "Invalid host format"},
status_code=400,
)
try:
# URL-encode credentials to prevent injection
encoded_user = quote_plus(username)
encoded_password = quote_plus(password)
api_url = f"http://{host}/api.cgi?cmd=GetEnc&user={encoded_user}&password={encoded_password}"
response = requests.get(api_url, timeout=5)
if not response.ok:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": f"Failed to connect to camera API: HTTP {response.status_code}",
},
status_code=200,
)
data = response.json()
enc_data = data[0] if isinstance(data, list) and len(data) > 0 else data
stream_info = None
if isinstance(enc_data, dict):
if enc_data.get("value", {}).get("Enc"):
stream_info = enc_data["value"]["Enc"]
elif enc_data.get("Enc"):
stream_info = enc_data["Enc"]
if not stream_info or not stream_info.get("mainStream"):
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Could not find stream information in API response",
}
)
main_stream = stream_info["mainStream"]
width = main_stream.get("width", 0)
height = main_stream.get("height", 0)
if not width or not height:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Could not determine camera resolution",
}
)
megapixels = (width * height) / 1_000_000
protocol = "http-flv" if megapixels <= 5.0 else "rtsp"
return JSONResponse(
content={
"success": True,
"protocol": protocol,
"resolution": f"{width}x{height}",
"megapixels": round(megapixels, 2),
}
)
except requests.exceptions.Timeout:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Connection timeout - camera did not respond",
}
)
except requests.exceptions.RequestException:
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Failed to connect to camera",
}
)
except Exception:
logger.exception(f"Error detecting Reolink camera at {host}")
return JSONResponse(
content={
"success": False,
"protocol": None,
"message": "Unable to detect camera capabilities",
}
)
def _extract_fps(r_frame_rate: str) -> float | None:
"""Extract FPS from ffprobe avg_frame_rate / r_frame_rate string (e.g., '30/1' -> 30.0)"""
if not r_frame_rate:
return None
try:
num, den = r_frame_rate.split("/")
return round(float(num) / float(den), 2)
except (ValueError, ZeroDivisionError):
return None
def _build_digest_transport(username: str, password: str) -> AsyncTransport:
"""Build a zeep transport backed by an httpx client using HTTP digest auth."""
auth = httpx.DigestAuth(username, password)
client = httpx.AsyncClient(auth=auth, timeout=10.0)
return AsyncTransport(client=client)
async def _connect_onvif_camera(
host: str,
port: int,
username: str,
password: str,
wsdl_base: str | None,
auth_type: str,
) -> ONVIFCamera:
"""Connect to an ONVIF device, trying both WS-Security password encodings.
Cameras disagree on whether the WS-Security UsernameToken should carry a
hashed PasswordDigest or a plaintext PasswordText. The wizard can't know
which a given camera expects, so we try PasswordDigest first (the common
case) and fall back to PasswordText when the device rejects the token. This
is independent of auth_type, which controls HTTP transport-level auth.
"""
first_error: Fault | None = None
# encrypt=True -> PasswordDigest, encrypt=False -> PasswordText
for encrypt in (True, False):
onvif_camera = ONVIFCamera(
host,
port,
username or "",
password or "",
wsdl_dir=wsdl_base,
encrypt=encrypt,
)
try:
await onvif_camera.update_xaddrs()
except Fault as e:
# A SOAP fault here is how a camera signals the wrong password
# encoding, so retry with the other encoding before giving up.
logger.debug(
"ONVIF connect with %s rejected, trying alternate encoding",
"PasswordDigest" if encrypt else "PasswordText",
)
if first_error is None:
first_error = e
continue
if auth_type == "digest" and username and password:
transport = _build_digest_transport(username, password)
for service in ("devicemgmt", "media", "ptz"):
if hasattr(onvif_camera, service):
getattr(onvif_camera, service).zeep_client.transport = transport
logger.debug("Configured digest authentication")
return onvif_camera
# Both encodings failed authentication; surface the original fault.
raise first_error
def _supports_continuous_pan_tilt(nodes) -> bool:
"""Whether any PTZ node advertises continuous pan/tilt velocity.
The web UI's directional controls issue ContinuousMove with a PanTilt
velocity, so continuous pan/tilt is what makes those controls usable. This
is intentionally narrower than ptz_supported, which is true for any device
exposing the ONVIF PTZ service - including zoom/focus-only varifocal lenses.
"""
for node in nodes or []:
spaces = getattr(node, "SupportedPTZSpaces", None) or (
node.get("SupportedPTZSpaces") if isinstance(node, dict) else None
)
if spaces is None:
continue
continuous = getattr(spaces, "ContinuousPanTiltVelocitySpace", None) or (
spaces.get("ContinuousPanTiltVelocitySpace")
if isinstance(spaces, dict)
else None
)
if continuous:
return True
return False
@router.get(
"/onvif/probe",
dependencies=[Depends(require_role(["admin"]))],
summary="Probe ONVIF device",
description=(
"Probe an ONVIF device to determine capabilities and optionally test available stream URIs. "
"Query params: host (required), port (default 80), username, password, test (boolean), "
"auth_type (basic or digest, default basic)."
),
)
async def onvif_probe(
request: Request,
host: str = Query(None),
port: int = Query(80),
username: str = Query(""),
password: str = Query(""),
test: bool = Query(False),
auth_type: str = Query("basic"), # Add auth_type parameter
):
"""
Probe a single ONVIF device to determine capabilities.
Connects to an ONVIF device and queries for:
- Device information (manufacturer, model)
- Media profiles count
- PTZ support
- Available presets
- Autotracking support
Query Parameters:
host: Device host/IP address (required)
port: Device port (default 80)
username: ONVIF username (optional)
password: ONVIF password (optional)
test: run ffprobe on the stream (optional)
auth_type: Authentication type - "basic" or "digest" (default "basic")
Returns:
JSON with device capabilities information
"""
if not host:
return JSONResponse(
content={"success": False, "message": "host parameter is required"},
status_code=400,
)
# Validate host format
if not _is_valid_host(host):
return JSONResponse(
content={"success": False, "message": "Invalid host format"},
status_code=400,
)
# Validate auth_type
if auth_type not in ["basic", "digest"]:
return JSONResponse(
content={
"success": False,
"message": "auth_type must be 'basic' or 'digest'",
},
status_code=400,
)
onvif_camera = None
try:
logger.debug(f"Probing ONVIF device at {host}:{port} with {auth_type} auth")
try:
wsdl_base = None
spec = find_spec("onvif")
if spec and getattr(spec, "origin", None):
wsdl_base = str(Path(spec.origin).parent / "wsdl")
except Exception:
wsdl_base = None
onvif_camera = await _connect_onvif_camera(
host, port, username, password, wsdl_base, auth_type
)
# Get device information
device_info = {
"manufacturer": "Unknown",
"model": "Unknown",
"firmware_version": "Unknown",
}
try:
device_service = await onvif_camera.create_devicemgmt_service()
# Update transport for device service if digest auth
if auth_type == "digest" and username and password:
device_service.zeep_client.transport = _build_digest_transport(
username, password
)
device_info_resp = await device_service.GetDeviceInformation()
manufacturer = getattr(device_info_resp, "Manufacturer", None) or (
device_info_resp.get("Manufacturer")
if isinstance(device_info_resp, dict)
else None
)
model = getattr(device_info_resp, "Model", None) or (
device_info_resp.get("Model")
if isinstance(device_info_resp, dict)
else None
)
firmware = getattr(device_info_resp, "FirmwareVersion", None) or (
device_info_resp.get("FirmwareVersion")
if isinstance(device_info_resp, dict)
else None
)
device_info.update(
{
"manufacturer": manufacturer or "Unknown",
"model": model or "Unknown",
"firmware_version": firmware or "Unknown",
}
)
except Exception as e:
logger.debug(f"Failed to get device info: {e}")
# Get media profiles
profiles = []
profiles_count = 0
first_profile_token = None
ptz_config_token = None
try:
media_service = await onvif_camera.create_media_service()
# Update transport for media service if digest auth
if auth_type == "digest" and username and password:
media_service.zeep_client.transport = _build_digest_transport(
username, password
)
profiles = await media_service.GetProfiles()
profiles_count = len(profiles) if profiles else 0
if profiles and len(profiles) > 0:
p = profiles[0]
first_profile_token = getattr(p, "token", None) or (
p.get("token") if isinstance(p, dict) else None
)
# Get PTZ configuration token from the profile
ptz_configuration = getattr(p, "PTZConfiguration", None) or (
p.get("PTZConfiguration") if isinstance(p, dict) else None
)
if ptz_configuration:
ptz_config_token = getattr(ptz_configuration, "token", None) or (
ptz_configuration.get("token")
if isinstance(ptz_configuration, dict)
else None
)
except Exception as e:
logger.debug(f"Failed to get media profiles: {e}")
# Check PTZ support and capabilities
ptz_supported = False
pan_tilt_supported = False
presets_count = 0
autotrack_supported = False
try:
ptz_service = await onvif_camera.create_ptz_service()
# Update transport for PTZ service if digest auth
if auth_type == "digest" and username and password:
ptz_service.zeep_client.transport = _build_digest_transport(
username, password
)
# Check if PTZ service is available
try:
await ptz_service.GetServiceCapabilities()
ptz_supported = True
logger.debug("PTZ service is available")
except Exception as e:
logger.debug(f"PTZ service not available: {e}")
ptz_supported = False
# Try to get presets if PTZ is supported and we have a profile
if ptz_supported and first_profile_token:
try:
presets_resp = await ptz_service.GetPresets(
{"ProfileToken": first_profile_token}
)
presets_count = len(presets_resp) if presets_resp else 0
logger.debug(f"Found {presets_count} presets")
except Exception as e:
logger.debug(f"Failed to get presets: {e}")
presets_count = 0
# Check for real (continuous) pan/tilt, which the UI controls need
if ptz_supported:
try:
nodes = await ptz_service.GetNodes()
pan_tilt_supported = _supports_continuous_pan_tilt(nodes)
logger.debug(f"Continuous pan/tilt supported: {pan_tilt_supported}")
except Exception as e:
logger.debug(f"Failed to read PTZ nodes for pan/tilt support: {e}")
# Check for autotracking support - requires both FOV relative movement and MoveStatus
if ptz_supported and first_profile_token and ptz_config_token:
# First check for FOV relative movement support
pt_r_fov_supported = False
try:
config_request = ptz_service.create_type("GetConfigurationOptions")
config_request.ConfigurationToken = ptz_config_token
ptz_config = await ptz_service.GetConfigurationOptions(
config_request
)
if ptz_config:
# Check for pt-r-fov support
spaces = getattr(ptz_config, "Spaces", None) or (
ptz_config.get("Spaces")
if isinstance(ptz_config, dict)
else None
)
if spaces:
rel_pan_tilt_space = getattr(
spaces, "RelativePanTiltTranslationSpace", None
) or (
spaces.get("RelativePanTiltTranslationSpace")
if isinstance(spaces, dict)
else None
)
if rel_pan_tilt_space:
# Look for FOV space
for i, space in enumerate(rel_pan_tilt_space):
uri = None
if isinstance(space, dict):
uri = space.get("URI")
else:
uri = getattr(space, "URI", None)
if uri and "TranslationSpaceFov" in uri:
pt_r_fov_supported = True
logger.debug(
"FOV relative movement (pt-r-fov) supported"
)
break
logger.debug(f"PTZ config spaces: {ptz_config}")
except Exception as e:
logger.debug(f"Failed to check FOV relative movement: {e}")
pt_r_fov_supported = False
# Now check for MoveStatus support via GetServiceCapabilities
if pt_r_fov_supported:
try:
service_capabilities_request = ptz_service.create_type(
"GetServiceCapabilities"
)
service_capabilities = await ptz_service.GetServiceCapabilities(
service_capabilities_request
)
# Look for MoveStatus in the capabilities
move_status_capable = False
if service_capabilities:
# Try to find MoveStatus key recursively
def find_move_status(obj, key="MoveStatus"):
if isinstance(obj, dict):
if key in obj:
return obj[key]
for v in obj.values():
result = find_move_status(v, key)
if result is not None:
return result
elif hasattr(obj, key):
return getattr(obj, key)
elif hasattr(obj, "__dict__"):
for v in vars(obj).values():
result = find_move_status(v, key)
if result is not None:
return result
return None
move_status_value = find_move_status(service_capabilities)
# MoveStatus should return "true" if supported
if isinstance(move_status_value, bool):
move_status_capable = move_status_value
elif isinstance(move_status_value, str):
move_status_capable = (
move_status_value.lower() == "true"
)
logger.debug(f"MoveStatus capability: {move_status_value}")
# Autotracking is supported if both conditions are met
autotrack_supported = pt_r_fov_supported and move_status_capable
if autotrack_supported:
logger.debug(
"Autotracking fully supported (pt-r-fov + MoveStatus)"
)
else:
logger.debug(
f"Autotracking not fully supported - pt-r-fov: {pt_r_fov_supported}, MoveStatus: {move_status_capable}"
)
except Exception as e:
logger.debug(f"Failed to check MoveStatus support: {e}")
autotrack_supported = False
except Exception as e:
logger.debug(f"Failed to probe PTZ service: {e}")
result = {
"success": True,
"host": host,
"port": port,
"manufacturer": device_info["manufacturer"],
"model": device_info["model"],
"firmware_version": device_info["firmware_version"],
"profiles_count": profiles_count,
"ptz_supported": ptz_supported,
"pan_tilt_supported": pan_tilt_supported,
"presets_count": presets_count,
"autotrack_supported": autotrack_supported,
}
# Gather RTSP candidates
rtsp_candidates: list[dict] = []
try:
media_service = await onvif_camera.create_media_service()
# Update transport for media service if digest auth
if auth_type == "digest" and username and password:
media_service.zeep_client.transport = _build_digest_transport(
username, password
)
if profiles_count and media_service:
for p in profiles or []:
token = getattr(p, "token", None) or (
p.get("token") if isinstance(p, dict) else None
)
if not token:
continue
try:
stream_setup = {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
}
stream_req = {
"ProfileToken": token,
"StreamSetup": stream_setup,
}
stream_uri_resp = await media_service.GetStreamUri(stream_req)
uri = (
stream_uri_resp.get("Uri")
if isinstance(stream_uri_resp, dict)
else getattr(stream_uri_resp, "Uri", None)
)
if uri:
logger.debug(
f"GetStreamUri returned for token {token}: {uri}"
)
# If credentials were provided, do NOT add the unauthenticated URI.
try:
if isinstance(uri, str) and uri.startswith("rtsp://"):
if username and password and "@" not in uri:
# Inject raw credentials and add only the
# authenticated version. The credentials will be encoded
# later by ffprobe_stream or the config system.
cred = f"{username}:{password}@"
injected = uri.replace(
"rtsp://", f"rtsp://{cred}", 1
)
rtsp_candidates.append(
{
"source": "GetStreamUri",
"profile_token": token,
"uri": injected,
}
)
else:
# No credentials provided or URI already contains
# credentials — add the URI as returned.
rtsp_candidates.append(
{
"source": "GetStreamUri",
"profile_token": token,
"uri": uri,
}
)
else:
# Non-RTSP URIs (e.g., http-flv) — add as returned.
rtsp_candidates.append(
{
"source": "GetStreamUri",
"profile_token": token,
"uri": uri,
}
)
except Exception as e:
logger.debug(
f"Skipping stream URI for token {token} due to processing error: {e}"
)
continue
except Exception:
logger.debug(
f"GetStreamUri failed for token {token}", exc_info=True
)
continue
# Add common RTSP patterns as fallback
if not rtsp_candidates:
common_paths = [
"/h264",
"/live.sdp",
"/media.amp",
"/Streaming/Channels/101",
"/Streaming/Channels/1",
"/stream1",
"/cam/realmonitor?channel=1&subtype=0",
"/11",
]
# Use raw credentials for pattern fallback URIs when provided
auth_str = f"{username}:{password}@" if username and password else ""
rtsp_port = 554
for path in common_paths:
uri = f"rtsp://{auth_str}{host}:{rtsp_port}{path}"
rtsp_candidates.append({"source": "pattern", "uri": uri})
except Exception:
logger.debug("Failed to collect RTSP candidates")
# Optionally test RTSP candidates using ffprobe_stream
tested_candidates = []
if test and rtsp_candidates:
for c in rtsp_candidates:
uri = c["uri"]
to_test = [uri]
try:
if (
username
and password
and isinstance(uri, str)
and uri.startswith("rtsp://")
and "@" not in uri
):
cred = f"{username}:{password}@"
cred_uri = uri.replace("rtsp://", f"rtsp://{cred}", 1)
if cred_uri not in to_test:
to_test.append(cred_uri)
except Exception:
pass
for test_uri in to_test:
try:
probe = ffprobe_stream(
request.app.frigate_config.ffmpeg, test_uri, detailed=False
)
ok = probe is not None and getattr(probe, "returncode", 1) == 0
tested_candidates.append(
{
"uri": test_uri,
"source": c.get("source"),
"ok": ok,
"profile_token": c.get("profile_token"),
}
)
except Exception as e:
logger.debug(f"Unable to probe stream: {e}")
tested_candidates.append(
{
"uri": test_uri,
"source": c.get("source"),
"ok": False,
"profile_token": c.get("profile_token"),
}
)
result["rtsp_candidates"] = rtsp_candidates
if test:
result["rtsp_tested"] = tested_candidates
logger.debug(f"ONVIF probe successful: {result}")
return JSONResponse(content=result)
except ONVIFError as e:
logger.warning(f"ONVIF error probing {host}:{port}: {e}")
return JSONResponse(
content={"success": False, "message": "ONVIF error"},
status_code=400,
)
except (Fault, TransportError) as e:
logger.warning(f"Connection error probing {host}:{port}: {e}")
return JSONResponse(
content={"success": False, "message": "Connection error"},
status_code=503,
)
except Exception as e:
logger.warning(f"Error probing ONVIF device at {host}:{port}, {e}")
return JSONResponse(
content={"success": False, "message": "Probe failed"},
status_code=500,
)
finally:
# Best-effort cleanup of ONVIF camera client session
if onvif_camera is not None:
try:
# Check if the camera has a close method and call it
if hasattr(onvif_camera, "close"):
await onvif_camera.close()
except Exception as e:
logger.debug(f"Error closing ONVIF camera session: {e}")
@router.delete(
"/cameras/{camera_name}",
dependencies=[Depends(require_role(["admin"]))],
)
async def delete_camera(
request: Request,
camera_name: str,
delete_exports: bool = Query(default=False),
):
"""Delete a camera and all its associated data.
Removes the camera from config, stops processes, and cleans up
all database entries and media files.
Args:
camera_name: Name of the camera to delete
delete_exports: Whether to also delete exports for this camera
"""
frigate_config: FrigateConfig = request.app.frigate_config
if camera_name not in frigate_config.cameras:
return JSONResponse(
content={
"success": False,
"message": f"Camera {camera_name} not found",
},
status_code=404,
)
old_camera_config = frigate_config.cameras[camera_name]
config_file = find_config_file()
lock = FileLock(f"{config_file}.lock", timeout=5)
try:
with lock:
with open(config_file, "r") as f:
old_raw_config = f.read()
try:
yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)
with open(config_file, "r") as f:
data = yaml.load(f)
# Remove camera from config
if "cameras" in data and camera_name in data["cameras"]:
del data["cameras"][camera_name]
# Remove camera from auth roles
auth = data.get("auth", {})
if auth and "roles" in auth:
empty_roles = []
for role_name, cameras_list in auth["roles"].items():
if (
isinstance(cameras_list, list)
and camera_name in cameras_list
):
cameras_list.remove(camera_name)
# Custom roles can't be empty; mark for removal
if not cameras_list and role_name not in (
"admin",
"viewer",
):
empty_roles.append(role_name)
for role_name in empty_roles:
del auth["roles"][role_name]
with open(config_file, "w") as f:
yaml.dump(data, f)
with open(config_file, "r") as f:
new_raw_config = f.read()
try:
config = FrigateConfig.parse(new_raw_config)
except Exception:
with open(config_file, "w") as f:
f.write(old_raw_config)
logger.exception(
"Config error after removing camera %s",
camera_name,
)
return JSONResponse(
content={
"success": False,
"message": "Error parsing config after camera removal",
},
status_code=400,
)
except Exception as e:
logger.error(
"Error updating config to remove camera %s: %s", camera_name, e
)
return JSONResponse(
content={
"success": False,
"message": "Error updating config",
},
status_code=500,
)
# Update runtime config
request.app.frigate_config = config
request.app.genai_manager.update_config(config)
# Publish removal to stop ffmpeg processes and clean up runtime state
request.app.config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, camera_name),
old_camera_config,
)
except Timeout:
return JSONResponse(
content={
"success": False,
"message": "Another process is currently updating the config",
},
status_code=409,
)
# Clean up database entries
counts, export_paths = await asyncio.to_thread(
cleanup_camera_db, camera_name, delete_exports
)
# Clean up media files in background thread
await asyncio.to_thread(
cleanup_camera_files, camera_name, export_paths if delete_exports else None
)
# Best-effort go2rtc stream removal
try:
requests.delete(
"http://127.0.0.1:1984/api/streams",
params={"src": camera_name},
timeout=5,
)
except Exception:
logger.debug("Failed to remove go2rtc stream for %s", camera_name)
return JSONResponse(
content={
"success": True,
"message": f"Camera {camera_name} has been deleted",
"cleanup": counts,
},
status_code=200,
)
_SUB_COMMAND_FEATURES = {"motion_mask", "object_mask", "zone"}
@router.put(
"/camera/{camera_name}/set/{feature}",
dependencies=[Depends(require_role(["admin"]))],
)
@router.put(
"/camera/{camera_name}/set/{feature}/{sub_command}",
dependencies=[Depends(require_role(["admin"]))],
)
def camera_set(
request: Request,
camera_name: str,
feature: str,
body: CameraSetBody,
sub_command: str | None = None,
):
"""Set a camera feature state. Use camera_name='*' to target all cameras."""
dispatcher = request.app.dispatcher
frigate_config: FrigateConfig = request.app.frigate_config
if feature == "profile":
if camera_name != "*":
return JSONResponse(
content={
"success": False,
"message": "Profile feature requires camera_name='*'",
},
status_code=400,
)
dispatcher._receive("profile/set", body.value)
return JSONResponse(content={"success": True})
if feature not in dispatcher._camera_settings_handlers:
return JSONResponse(
content={"success": False, "message": f"Unknown feature: {feature}"},
status_code=400,
)
if sub_command and feature not in _SUB_COMMAND_FEATURES:
return JSONResponse(
content={
"success": False,
"message": f"Feature '{feature}' does not support sub-commands",
},
status_code=400,
)
if not sub_command and feature in _SUB_COMMAND_FEATURES:
return JSONResponse(
content={
"success": False,
"message": f"Feature '{feature}' requires a sub-command (e.g. mask or zone name)",
},
status_code=400,
)
if camera_name == "*":
cameras = list(frigate_config.cameras.keys())
elif camera_name not in frigate_config.cameras:
return JSONResponse(
content={
"success": False,
"message": f"Camera '{camera_name}' not found",
},
status_code=404,
)
else:
cameras = [camera_name]
for cam in cameras:
topic = (
f"{cam}/{feature}/{sub_command}/set"
if sub_command
else f"{cam}/{feature}/set"
)
dispatcher._receive(topic, body.value)
return JSONResponse(content={"success": True})