mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-07-02 18:11:13 +03:00
Compare commits
4 Commits
5ade3ad577
...
4428b40440
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4428b40440 | ||
|
|
d7ad3ba699 | ||
|
|
e6601d50a6 | ||
|
|
b420efdebd |
@ -198,6 +198,46 @@ When the skip threshold is exceeded, **no motion is reported** for that frame, m
|
||||
|
||||
:::
|
||||
|
||||
## Using Camera-Side ONVIF Motion Detection
|
||||
|
||||
For cameras that publish their own ONVIF cell-motion analytics (e.g. OpenIPC firmware for HiSilicon, Ingenic and SigmaStar SoCs, plus most ONVIF Profile-M devices from Hikvision, Reolink, Foscam, Amcrest, etc.), Frigate can use the camera's hardware motion engine instead of running per-frame analysis on the host CPU. This both removes CPU load from the Frigate machine and gives a more accurate motion signal than encoded-stream analysis can produce.
|
||||
|
||||
Frigate consumes the two standard ONVIF transports:
|
||||
|
||||
- **PullPoint** event subscription on `tns1:RuleEngine/CellMotionDetector/Motion` carries the binary on/off state (the legacy `tns1:VideoSource/MotionAlarm` payload is also accepted).
|
||||
- **RTSP analytics metadata stream** (the `application/vnd.onvif.metadata` track on the primary RTSP profile) carries the per-frame cell grid (`tt:MotionInCells`) which Frigate decodes (base64 + PackBits) and maps through the `CellLayout` transformation into Frigate's detect-frame pixel coordinates.
|
||||
|
||||
```yaml
|
||||
cameras:
|
||||
back_door:
|
||||
onvif:
|
||||
host: 10.0.0.10
|
||||
port: 80
|
||||
user: root
|
||||
password: "secret"
|
||||
events:
|
||||
# Subscribe to camera-side motion events.
|
||||
enabled: true
|
||||
# Seconds before the PullPoint subscription expires (we renew at half this).
|
||||
subscription_timeout: 60
|
||||
# Open the RTSP analytics metadata stream for per-cell motion coordinates.
|
||||
# Disable if your camera only publishes the binary event topic.
|
||||
use_metadata_stream: true
|
||||
motion:
|
||||
# Use the camera's ONVIF events as Frigate's motion signal. The internal
|
||||
# CPU motion detector is skipped.
|
||||
source: onvif
|
||||
detect:
|
||||
enabled: true
|
||||
```
|
||||
|
||||
When `motion.source: onvif`:
|
||||
|
||||
- Frigate's internal `ImprovedMotionDetector` is **not** run on the camera's frames.
|
||||
- Object detection still runs every detection frame; motion boxes are used for region clustering exactly as with the internal detector.
|
||||
- If `use_metadata_stream: true` but the camera doesn't advertise the metadata track (or PackBits decoding fails for a frame), Frigate falls back to a full-frame motion box while the binary event signal is active.
|
||||
- The validator requires `onvif.events.enabled: true` whenever `motion.source: onvif`.
|
||||
|
||||
## Reviewing Detected Motion
|
||||
|
||||
To review what the detector picked up — or to search past recordings for motion in a specific region — see [Reviewing Motion](/usage/review#reviewing-motion) on the Review page.
|
||||
|
||||
@ -121,6 +121,12 @@ If segments are only ~1 second instead of ~10 seconds, the camera is sending cor
|
||||
- **Changing codec, bitrate, or resolution mid-stream** — Any encoding changes during an active stream can cause unpredictable segment splitting.
|
||||
- **Camera firmware bugs** — Check for firmware updates from your camera manufacturer.
|
||||
|
||||
:::tip
|
||||
|
||||
You don't have to run `ffprobe` by hand to catch this. Open a camera's **Camera Probe Info** dialog (the info icon on the System → Metrics → Cameras page) and check the **Keyframe analysis** section. It probes the record stream and flags sparse or variable keyframes, which is what smart/"+" codecs (H.264+/H.265+) and long keyframe intervals produce.
|
||||
|
||||
:::
|
||||
|
||||
### Step 4: Check for a stuck detector
|
||||
|
||||
If the detect stream is not processing frames, segments will accumulate. Common causes:
|
||||
|
||||
29
docs/static/frigate-api.yaml
vendored
29
docs/static/frigate-api.yaml
vendored
@ -400,6 +400,35 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/HTTPValidationError"
|
||||
/keyframe_analysis:
|
||||
get:
|
||||
tags:
|
||||
- Camera
|
||||
summary: Keyframe Analysis
|
||||
description: >-
|
||||
Probe a camera's record stream and classify its keyframe spacing.
|
||||
Detects smart/+ codecs and long/variable GOPs that degrade recording.
|
||||
operationId: keyframe_analysis_keyframe_analysis_get
|
||||
parameters:
|
||||
- name: camera
|
||||
in: query
|
||||
required: false
|
||||
schema:
|
||||
type: string
|
||||
default: ""
|
||||
title: Camera
|
||||
responses:
|
||||
"200":
|
||||
description: Successful Response
|
||||
content:
|
||||
application/json:
|
||||
schema: {}
|
||||
"422":
|
||||
description: Validation Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/HTTPValidationError"
|
||||
/ffprobe/snapshot:
|
||||
get:
|
||||
tags:
|
||||
|
||||
@ -34,11 +34,15 @@ from frigate.config.camera.updater import (
|
||||
)
|
||||
from frigate.config.env import substitute_frigate_vars
|
||||
from frigate.models import User
|
||||
from frigate.util.builtin import clean_camera_user_pass
|
||||
from frigate.util.builtin import clean_camera_user_pass, get_record_segment_time
|
||||
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
|
||||
from frigate.util.config import find_config_file
|
||||
from frigate.util.image import run_ffmpeg_snapshot
|
||||
from frigate.util.services import ffprobe_stream, is_restricted_go2rtc_source
|
||||
from frigate.util.services import (
|
||||
analyze_record_keyframes,
|
||||
ffprobe_stream,
|
||||
is_restricted_go2rtc_source,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -362,6 +366,48 @@ def ffprobe(request: Request, paths: str = "", detailed: bool = False):
|
||||
return JSONResponse(content=output)
|
||||
|
||||
|
||||
@router.get("/keyframe_analysis", dependencies=[Depends(require_role(["admin"]))])
|
||||
async def keyframe_analysis(request: Request, camera: str = ""):
|
||||
"""Probe a camera's record stream and classify its keyframe spacing.
|
||||
|
||||
Detects smart/+ codecs and long/variable GOPs that degrade recording.
|
||||
"""
|
||||
config: FrigateConfig = request.app.frigate_config
|
||||
|
||||
if camera not in config.cameras:
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": f"{camera} is not a valid camera."},
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
camera_config = config.cameras[camera]
|
||||
|
||||
if not camera_config.enabled:
|
||||
return JSONResponse(
|
||||
content={"success": False, "message": f"{camera} is not enabled."},
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
# keyframe spacing only matters when this camera is recording
|
||||
if not camera_config.record.enabled:
|
||||
return JSONResponse(content={"severity": "record_disabled"})
|
||||
|
||||
# recording guarantees an input carries the record role; its index matches
|
||||
# the "Stream N" numbering the ffprobe endpoint surfaces (same input order)
|
||||
record_index, record_input = next(
|
||||
(idx, i)
|
||||
for idx, i in enumerate(camera_config.ffmpeg.inputs)
|
||||
if "record" in i.roles
|
||||
)
|
||||
|
||||
segment_time = get_record_segment_time(camera_config)
|
||||
result = await analyze_record_keyframes(
|
||||
config.ffmpeg, record_input.path, segment_time
|
||||
)
|
||||
result["stream_index"] = record_index
|
||||
return JSONResponse(content=result)
|
||||
|
||||
|
||||
@router.get("/ffprobe/snapshot", dependencies=[Depends(require_role(["admin"]))])
|
||||
def ffprobe_snapshot(request: Request, url: str = "", timeout: int = 10):
|
||||
"""Get a snapshot from a stream URL using ffmpeg."""
|
||||
|
||||
@ -7,7 +7,7 @@ import operator
|
||||
import time
|
||||
from datetime import datetime
|
||||
from functools import reduce
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
import cv2
|
||||
from fastapi import APIRouter, Body, Depends, HTTPException, Request
|
||||
@ -59,7 +59,7 @@ class ToolExecuteRequest(BaseModel):
|
||||
"""Request model for tool execution."""
|
||||
|
||||
tool_name: str
|
||||
arguments: Dict[str, Any]
|
||||
arguments: dict[str, Any]
|
||||
|
||||
|
||||
class VLMMonitorRequest(BaseModel):
|
||||
@ -68,8 +68,8 @@ class VLMMonitorRequest(BaseModel):
|
||||
camera: str
|
||||
condition: str
|
||||
max_duration_minutes: int = 60
|
||||
labels: List[str] = []
|
||||
zones: List[str] = []
|
||||
labels: list[str] = []
|
||||
zones: list[str] = []
|
||||
|
||||
|
||||
@router.get(
|
||||
@ -91,10 +91,10 @@ def get_tools(request: Request) -> JSONResponse:
|
||||
|
||||
|
||||
def _resolve_zones(
|
||||
zones: List[str],
|
||||
zones: list[str],
|
||||
config: FrigateConfig,
|
||||
target_cameras: List[str],
|
||||
) -> List[str]:
|
||||
target_cameras: list[str],
|
||||
) -> list[str]:
|
||||
"""Map zone names to their canonical config keys, case-insensitively.
|
||||
|
||||
LLMs frequently echo a user's casing ("Front Yard") instead of the
|
||||
@ -107,7 +107,7 @@ def _resolve_zones(
|
||||
if not zones:
|
||||
return zones
|
||||
|
||||
lookup: Dict[str, str] = {}
|
||||
lookup: dict[str, str] = {}
|
||||
for camera_id in target_cameras:
|
||||
camera_config = config.cameras.get(camera_id)
|
||||
if camera_config is None:
|
||||
@ -120,8 +120,8 @@ def _resolve_zones(
|
||||
|
||||
async def _execute_search_objects(
|
||||
request: Request,
|
||||
arguments: Dict[str, Any],
|
||||
allowed_cameras: List[str],
|
||||
arguments: dict[str, Any],
|
||||
allowed_cameras: list[str],
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
Execute the search_objects tool.
|
||||
@ -213,8 +213,8 @@ async def _execute_search_objects(
|
||||
|
||||
async def _execute_search_objects_semantic(
|
||||
request: Request,
|
||||
arguments: Dict[str, Any],
|
||||
allowed_cameras: List[str],
|
||||
arguments: dict[str, Any],
|
||||
allowed_cameras: list[str],
|
||||
semantic_query: str,
|
||||
) -> JSONResponse:
|
||||
"""Search objects via fused thumbnail + description embeddings.
|
||||
@ -263,8 +263,8 @@ async def _execute_search_objects_semantic(
|
||||
limit = int(arguments.get("limit", 25))
|
||||
limit = max(1, min(limit, 100))
|
||||
|
||||
visual_distances: Dict[str, float] = {}
|
||||
description_distances: Dict[str, float] = {}
|
||||
visual_distances: dict[str, float] = {}
|
||||
description_distances: dict[str, float] = {}
|
||||
try:
|
||||
rows = context.search_thumbnail(semantic_query)
|
||||
visual_distances = {row[0]: row[1] for row in rows}
|
||||
@ -305,7 +305,7 @@ async def _execute_search_objects_semantic(
|
||||
|
||||
eligible = {e.id: e for e in Event.select().where(reduce(operator.and_, clauses))}
|
||||
|
||||
scored: List[tuple[str, float]] = []
|
||||
scored: list[tuple[str, float]] = []
|
||||
for eid in eligible:
|
||||
v_score = (
|
||||
distance_to_score(visual_distances[eid], context.thumb_stats)
|
||||
@ -331,9 +331,9 @@ async def _execute_search_objects_semantic(
|
||||
|
||||
async def _execute_find_similar_objects(
|
||||
request: Request,
|
||||
arguments: Dict[str, Any],
|
||||
allowed_cameras: List[str],
|
||||
) -> Dict[str, Any]:
|
||||
arguments: dict[str, Any],
|
||||
allowed_cameras: list[str],
|
||||
) -> dict[str, Any]:
|
||||
"""Execute the find_similar_objects tool.
|
||||
|
||||
Returns a plain dict (not JSONResponse) so the chat loop can embed it
|
||||
@ -403,8 +403,8 @@ async def _execute_find_similar_objects(
|
||||
# version (see frigate/embeddings/__init__.py). Mirror the pattern used by
|
||||
# frigate/api/event.py events_search: fetch top-k globally, then intersect
|
||||
# with the structured filters via Peewee.
|
||||
visual_distances: Dict[str, float] = {}
|
||||
description_distances: Dict[str, float] = {}
|
||||
visual_distances: dict[str, float] = {}
|
||||
description_distances: dict[str, float] = {}
|
||||
|
||||
try:
|
||||
if similarity_mode in ("visual", "fused"):
|
||||
@ -462,7 +462,7 @@ async def _execute_find_similar_objects(
|
||||
eligible = {e.id: e for e in Event.select().where(reduce(operator.and_, clauses))}
|
||||
|
||||
# 6. Fuse and rank.
|
||||
scored: List[tuple[str, float]] = []
|
||||
scored: list[tuple[str, float]] = []
|
||||
for eid in eligible:
|
||||
v_score = (
|
||||
distance_to_score(visual_distances[eid], context.thumb_stats)
|
||||
@ -503,7 +503,7 @@ async def _execute_find_similar_objects(
|
||||
async def execute_tool(
|
||||
request: Request,
|
||||
body: ToolExecuteRequest = Body(...),
|
||||
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
|
||||
allowed_cameras: list[str] = Depends(get_allowed_cameras_for_filter),
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
Execute a tool function call.
|
||||
@ -545,8 +545,8 @@ async def execute_tool(
|
||||
async def _execute_get_live_context(
|
||||
request: Request,
|
||||
camera: str,
|
||||
allowed_cameras: List[str],
|
||||
) -> Dict[str, Any]:
|
||||
allowed_cameras: list[str],
|
||||
) -> dict[str, Any]:
|
||||
# Reject wildcards explicitly so models retry with a real camera name
|
||||
# instead of silently fanning out across every camera.
|
||||
if camera in ("*", "all"):
|
||||
@ -593,7 +593,7 @@ async def _execute_get_live_context(
|
||||
"stationary": obj_dict.get("stationary", False),
|
||||
}
|
||||
|
||||
result: Dict[str, Any] = {
|
||||
result: dict[str, Any] = {
|
||||
"camera": camera,
|
||||
"timestamp": frame_time,
|
||||
"detections": list(tracked_objects_dict.values()),
|
||||
@ -620,7 +620,7 @@ async def _execute_get_live_context(
|
||||
async def _get_live_frame_image_url(
|
||||
request: Request,
|
||||
camera: str,
|
||||
allowed_cameras: List[str],
|
||||
allowed_cameras: list[str],
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Fetch the current live frame for a camera as a base64 data URL.
|
||||
@ -659,8 +659,8 @@ async def _get_live_frame_image_url(
|
||||
|
||||
async def _execute_set_camera_state(
|
||||
request: Request,
|
||||
arguments: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
arguments: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
role = request.headers.get("remote-role", "")
|
||||
if "admin" not in [r.strip() for r in role.split(",")]:
|
||||
return {"error": "Admin privileges required to change camera settings."}
|
||||
@ -699,10 +699,10 @@ async def _execute_set_camera_state(
|
||||
|
||||
async def _execute_tool_internal(
|
||||
tool_name: str,
|
||||
arguments: Dict[str, Any],
|
||||
arguments: dict[str, Any],
|
||||
request: Request,
|
||||
allowed_cameras: List[str],
|
||||
) -> Dict[str, Any]:
|
||||
allowed_cameras: list[str],
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Internal helper to execute a tool and return the result as a dict.
|
||||
|
||||
@ -763,8 +763,8 @@ async def _execute_tool_internal(
|
||||
|
||||
async def _execute_start_camera_watch(
|
||||
request: Request,
|
||||
arguments: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
arguments: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
camera = arguments.get("camera", "").strip()
|
||||
condition = arguments.get("condition", "").strip()
|
||||
max_duration_minutes = int(arguments.get("max_duration_minutes", 60))
|
||||
@ -814,14 +814,14 @@ async def _execute_start_camera_watch(
|
||||
}
|
||||
|
||||
|
||||
def _execute_stop_camera_watch() -> Dict[str, Any]:
|
||||
def _execute_stop_camera_watch() -> dict[str, Any]:
|
||||
cancelled = stop_vlm_watch_job()
|
||||
if cancelled:
|
||||
return {"success": True, "message": "Watch job cancelled."}
|
||||
return {"success": False, "message": "No active watch job to cancel."}
|
||||
|
||||
|
||||
def _execute_get_profile_status(request: Request) -> Dict[str, Any]:
|
||||
def _execute_get_profile_status(request: Request) -> dict[str, Any]:
|
||||
"""Return profile status including active profile and activation timestamps."""
|
||||
profile_manager = getattr(request.app, "profile_manager", None)
|
||||
if profile_manager is None:
|
||||
@ -846,9 +846,9 @@ def _execute_get_profile_status(request: Request) -> Dict[str, Any]:
|
||||
|
||||
|
||||
def _execute_get_recap(
|
||||
arguments: Dict[str, Any],
|
||||
allowed_cameras: List[str],
|
||||
) -> Dict[str, Any]:
|
||||
arguments: dict[str, Any],
|
||||
allowed_cameras: list[str],
|
||||
) -> dict[str, Any]:
|
||||
"""Fetch review segments with GenAI metadata for a time period."""
|
||||
from functools import reduce
|
||||
|
||||
@ -909,7 +909,7 @@ def _execute_get_recap(
|
||||
.iterator()
|
||||
)
|
||||
|
||||
events: List[Dict[str, Any]] = []
|
||||
events: list[dict[str, Any]] = []
|
||||
|
||||
for row in rows:
|
||||
data = row.get("data") or {}
|
||||
@ -920,7 +920,7 @@ def _execute_get_recap(
|
||||
data = {}
|
||||
|
||||
camera = row["camera"]
|
||||
event: Dict[str, Any] = {
|
||||
event: dict[str, Any] = {
|
||||
"camera": camera.replace("_", " ").title(),
|
||||
"severity": row.get("severity", "detection"),
|
||||
}
|
||||
@ -984,10 +984,10 @@ def _execute_get_recap(
|
||||
|
||||
|
||||
async def _execute_pending_tools(
|
||||
pending_tool_calls: List[Dict[str, Any]],
|
||||
pending_tool_calls: list[dict[str, Any]],
|
||||
request: Request,
|
||||
allowed_cameras: List[str],
|
||||
) -> tuple[List[ToolCall], List[Dict[str, Any]], List[Dict[str, Any]]]:
|
||||
allowed_cameras: list[str],
|
||||
) -> tuple[list[ToolCall], list[dict[str, Any]], list[dict[str, Any]]]:
|
||||
"""
|
||||
Execute a list of tool calls.
|
||||
|
||||
@ -996,9 +996,9 @@ async def _execute_pending_tools(
|
||||
tool result dicts for conversation,
|
||||
extra messages to inject after tool results — e.g. user messages with images)
|
||||
"""
|
||||
tool_calls_out: List[ToolCall] = []
|
||||
tool_results: List[Dict[str, Any]] = []
|
||||
extra_messages: List[Dict[str, Any]] = []
|
||||
tool_calls_out: list[ToolCall] = []
|
||||
tool_results: list[dict[str, Any]] = []
|
||||
extra_messages: list[dict[str, Any]] = []
|
||||
for tool_call in pending_tool_calls:
|
||||
tool_name = tool_call["name"]
|
||||
tool_args = tool_call.get("arguments") or {}
|
||||
@ -1106,7 +1106,7 @@ async def _execute_pending_tools(
|
||||
async def chat_completion(
|
||||
request: Request,
|
||||
body: ChatCompletionRequest = Body(...),
|
||||
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
|
||||
allowed_cameras: list[str] = Depends(get_allowed_cameras_for_filter),
|
||||
):
|
||||
"""
|
||||
Chat completion endpoint with tool calling support.
|
||||
@ -1138,19 +1138,23 @@ async def chat_completion(
|
||||
)
|
||||
conversation = []
|
||||
|
||||
system_prompt = build_chat_system_prompt(
|
||||
config=config,
|
||||
allowed_cameras=allowed_cameras,
|
||||
semantic_search_enabled=semantic_search_enabled,
|
||||
attribute_classifications=attribute_classifications,
|
||||
)
|
||||
|
||||
conversation.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": system_prompt,
|
||||
}
|
||||
)
|
||||
# Build the system message only when the client hasn't already pinned one.
|
||||
# The first turn has no system message; we generate it (with the current
|
||||
# timestamp) and return the whole chain so the client persists it. Later
|
||||
# turns send it back verbatim, freezing the timestamp so the prompt prefix
|
||||
# stays byte-identical and the model server's prompt cache keeps hitting.
|
||||
if not body.messages or body.messages[0].role != "system":
|
||||
conversation.append(
|
||||
{
|
||||
"role": "system",
|
||||
"content": build_chat_system_prompt(
|
||||
config=config,
|
||||
allowed_cameras=allowed_cameras,
|
||||
semantic_search_enabled=semantic_search_enabled,
|
||||
attribute_classifications=attribute_classifications,
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
for msg in body.messages:
|
||||
msg_dict = {
|
||||
@ -1161,11 +1165,13 @@ async def chat_completion(
|
||||
msg_dict["tool_call_id"] = msg.tool_call_id
|
||||
if msg.name:
|
||||
msg_dict["name"] = msg.name
|
||||
if msg.tool_calls is not None:
|
||||
msg_dict["tool_calls"] = msg.tool_calls
|
||||
|
||||
conversation.append(msg_dict)
|
||||
|
||||
tool_iterations = 0
|
||||
tool_calls: List[ToolCall] = []
|
||||
tool_calls: list[ToolCall] = []
|
||||
max_iterations = body.max_tool_iterations
|
||||
|
||||
logger.debug(
|
||||
@ -1175,11 +1181,20 @@ async def chat_completion(
|
||||
|
||||
# True LLM streaming when client supports it and stream requested
|
||||
if body.stream and hasattr(genai_client, "chat_with_tools_stream"):
|
||||
stream_tool_calls: List[ToolCall] = []
|
||||
stream_iterations = 0
|
||||
|
||||
async def stream_body_llm():
|
||||
nonlocal conversation, stream_tool_calls, stream_iterations
|
||||
nonlocal conversation, stream_iterations
|
||||
|
||||
def _emit_chain(extra: Optional[list[dict[str, Any]]] = None):
|
||||
# Return the full conversation (including the system message) so
|
||||
# the client persists and replays it verbatim next turn.
|
||||
chain = conversation + (extra or [])
|
||||
return (
|
||||
json.dumps({"type": "messages", "messages": chain}).encode("utf-8")
|
||||
+ b"\n"
|
||||
)
|
||||
|
||||
while stream_iterations < max_iterations:
|
||||
if await request.is_disconnected():
|
||||
logger.debug("Client disconnected, stopping chat stream")
|
||||
@ -1244,31 +1259,33 @@ async def chat_completion(
|
||||
)
|
||||
return
|
||||
(
|
||||
executed_calls,
|
||||
_executed_calls,
|
||||
tool_results,
|
||||
extra_msgs,
|
||||
) = await _execute_pending_tools(
|
||||
pending, request, allowed_cameras
|
||||
)
|
||||
stream_tool_calls.extend(executed_calls)
|
||||
conversation.extend(tool_results)
|
||||
conversation.extend(extra_msgs)
|
||||
yield (
|
||||
json.dumps(
|
||||
{
|
||||
"type": "tool_calls",
|
||||
"tool_calls": [
|
||||
tc.model_dump() for tc in stream_tool_calls
|
||||
],
|
||||
}
|
||||
).encode("utf-8")
|
||||
+ b"\n"
|
||||
)
|
||||
# Emit the running chain so the client can render tool
|
||||
# calls live and replay them verbatim next turn.
|
||||
yield _emit_chain()
|
||||
break
|
||||
else:
|
||||
# Streaming never appends the final assistant message
|
||||
# to the conversation, so add it to the chain.
|
||||
yield _emit_chain(
|
||||
extra=[
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": msg.get("content"),
|
||||
}
|
||||
]
|
||||
)
|
||||
yield (json.dumps({"type": "done"}).encode("utf-8") + b"\n")
|
||||
return
|
||||
else:
|
||||
yield _emit_chain()
|
||||
yield json.dumps({"type": "done"}).encode("utf-8") + b"\n"
|
||||
|
||||
return StreamingResponse(
|
||||
@ -1315,19 +1332,15 @@ async def chat_completion(
|
||||
if body.stream:
|
||||
final_reasoning = response.get("reasoning")
|
||||
|
||||
chain = list(conversation)
|
||||
|
||||
async def stream_body() -> Any:
|
||||
if tool_calls:
|
||||
yield (
|
||||
json.dumps(
|
||||
{
|
||||
"type": "tool_calls",
|
||||
"tool_calls": [
|
||||
tc.model_dump() for tc in tool_calls
|
||||
],
|
||||
}
|
||||
).encode("utf-8")
|
||||
+ b"\n"
|
||||
yield (
|
||||
json.dumps({"type": "messages", "messages": chain}).encode(
|
||||
"utf-8"
|
||||
)
|
||||
+ b"\n"
|
||||
)
|
||||
# Emit the full reasoning trace up front when the
|
||||
# underlying client did not stream it
|
||||
if final_reasoning:
|
||||
@ -1363,6 +1376,7 @@ async def chat_completion(
|
||||
finish_reason=response.get("finish_reason", "stop"),
|
||||
tool_iterations=tool_iterations,
|
||||
tool_calls=tool_calls,
|
||||
messages=list(conversation),
|
||||
).model_dump(),
|
||||
)
|
||||
|
||||
@ -1395,6 +1409,7 @@ async def chat_completion(
|
||||
finish_reason="length",
|
||||
tool_iterations=tool_iterations,
|
||||
tool_calls=tool_calls,
|
||||
messages=list(conversation),
|
||||
).model_dump(),
|
||||
)
|
||||
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
"""Chat API request models."""
|
||||
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
@ -11,13 +11,29 @@ class ChatMessage(BaseModel):
|
||||
role: str = Field(
|
||||
description="Message role: 'user', 'assistant', 'system', or 'tool'"
|
||||
)
|
||||
content: str = Field(description="Message content")
|
||||
content: Optional[Any] = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"Message content. Usually a string, but may be a multimodal content "
|
||||
"list (e.g. text + image_url) or null for assistant turns that only "
|
||||
"request tool calls."
|
||||
),
|
||||
)
|
||||
tool_call_id: Optional[str] = Field(
|
||||
default=None, description="For tool messages, the ID of the tool call"
|
||||
)
|
||||
name: Optional[str] = Field(
|
||||
default=None, description="For tool messages, the tool name"
|
||||
)
|
||||
tool_calls: Optional[list[dict[str, Any]]] = Field(
|
||||
default=None,
|
||||
description=(
|
||||
"For assistant messages replayed from prior turns, the OpenAI-format "
|
||||
"tool calls the model previously requested. Replaying these verbatim "
|
||||
"keeps the conversation prefix byte-for-byte identical so the model "
|
||||
"server's prompt cache hits on follow-up turns."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class ChatCompletionRequest(BaseModel):
|
||||
|
||||
@ -56,3 +56,12 @@ class ChatCompletionResponse(BaseModel):
|
||||
default_factory=list,
|
||||
description="List of tool calls that were executed during this completion",
|
||||
)
|
||||
messages: list[dict[str, Any]] = Field(
|
||||
default_factory=list,
|
||||
description=(
|
||||
"The full conversation chain, including the system message. Persist "
|
||||
"and replay this verbatim on the next request so the prompt prefix "
|
||||
"stays byte-identical and the model server's prompt cache keeps "
|
||||
"hitting."
|
||||
),
|
||||
)
|
||||
|
||||
@ -309,7 +309,9 @@ class FrigateApp:
|
||||
self.detection_proxy = DetectorProxy()
|
||||
|
||||
def init_onvif(self) -> None:
|
||||
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
|
||||
self.onvif_controller = OnvifController(
|
||||
self.config, self.ptz_metrics, self.camera_metrics
|
||||
)
|
||||
|
||||
def init_dispatcher(self) -> None:
|
||||
comms: list[Communicator] = []
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import multiprocessing as mp
|
||||
import queue
|
||||
from multiprocessing.managers import SyncManager, ValueProxy
|
||||
from multiprocessing.managers import ListProxy, SyncManager, ValueProxy
|
||||
from multiprocessing.sharedctypes import Synchronized
|
||||
from multiprocessing.synchronize import Event
|
||||
|
||||
@ -23,6 +23,14 @@ class CameraMetrics:
|
||||
reconnects_last_hour: ValueProxy[int]
|
||||
stalls_last_hour: ValueProxy[int]
|
||||
|
||||
# External motion published by OnvifController when motion.source=onvif.
|
||||
# external_motion_active mirrors the PullPoint IsMotion state.
|
||||
# external_motion_boxes carries the per-frame cell-derived rectangles in
|
||||
# detect-frame pixel coordinates; empty list means no current spatial
|
||||
# data (consumer should fall back to a full-frame box when active=1).
|
||||
external_motion_active: ValueProxy[int]
|
||||
external_motion_boxes: ListProxy
|
||||
|
||||
def __init__(self, manager: SyncManager):
|
||||
self.camera_fps = manager.Value("d", 0)
|
||||
self.detection_fps = manager.Value("d", 0)
|
||||
@ -41,6 +49,9 @@ class CameraMetrics:
|
||||
self.reconnects_last_hour = manager.Value("i", 0)
|
||||
self.stalls_last_hour = manager.Value("i", 0)
|
||||
|
||||
self.external_motion_active = manager.Value("b", 0)
|
||||
self.external_motion_boxes = manager.list()
|
||||
|
||||
|
||||
class PTZMetrics:
|
||||
autotracker_enabled: Synchronized
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Optional
|
||||
|
||||
from pydantic import Field, field_serializer
|
||||
@ -5,7 +6,12 @@ from pydantic import Field, field_serializer
|
||||
from ..base import FrigateBaseModel
|
||||
from .mask import MotionMaskConfig
|
||||
|
||||
__all__ = ["MotionConfig"]
|
||||
__all__ = ["MotionConfig", "MotionSourceEnum"]
|
||||
|
||||
|
||||
class MotionSourceEnum(str, Enum):
|
||||
internal = "internal"
|
||||
onvif = "onvif"
|
||||
|
||||
|
||||
class MotionConfig(FrigateBaseModel):
|
||||
@ -14,6 +20,11 @@ class MotionConfig(FrigateBaseModel):
|
||||
title="Enable motion detection",
|
||||
description="Enable or disable motion detection for all cameras; can be overridden per-camera.",
|
||||
)
|
||||
source: MotionSourceEnum = Field(
|
||||
default=MotionSourceEnum.internal,
|
||||
title="Motion source",
|
||||
description="Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled).",
|
||||
)
|
||||
threshold: int = Field(
|
||||
default=30,
|
||||
title="Motion threshold",
|
||||
|
||||
@ -7,7 +7,12 @@ from ..base import FrigateBaseModel
|
||||
from ..env import EnvString
|
||||
from .objects import DEFAULT_TRACKED_OBJECTS
|
||||
|
||||
__all__ = ["OnvifConfig", "PtzAutotrackConfig", "ZoomingModeEnum"]
|
||||
__all__ = [
|
||||
"OnvifConfig",
|
||||
"OnvifEventsConfig",
|
||||
"PtzAutotrackConfig",
|
||||
"ZoomingModeEnum",
|
||||
]
|
||||
|
||||
|
||||
class ZoomingModeEnum(str, Enum):
|
||||
@ -91,6 +96,26 @@ class PtzAutotrackConfig(FrigateBaseModel):
|
||||
return weights
|
||||
|
||||
|
||||
class OnvifEventsConfig(FrigateBaseModel):
|
||||
enabled: bool = Field(
|
||||
default=False,
|
||||
title="Enable ONVIF events",
|
||||
description="Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal.",
|
||||
)
|
||||
subscription_timeout: int = Field(
|
||||
default=60,
|
||||
ge=10,
|
||||
le=600,
|
||||
title="Subscription timeout",
|
||||
description="Seconds before the PullPoint subscription expires and is renewed.",
|
||||
)
|
||||
use_metadata_stream: bool = Field(
|
||||
default=True,
|
||||
title="Use metadata stream",
|
||||
description="Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track.",
|
||||
)
|
||||
|
||||
|
||||
class OnvifConfig(FrigateBaseModel):
|
||||
host: EnvString = Field(
|
||||
default="",
|
||||
@ -127,6 +152,11 @@ class OnvifConfig(FrigateBaseModel):
|
||||
title="Autotracking",
|
||||
description="Automatically track moving objects and keep them centered in the frame using PTZ camera movements.",
|
||||
)
|
||||
events: OnvifEventsConfig = Field(
|
||||
default_factory=OnvifEventsConfig,
|
||||
title="ONVIF events",
|
||||
description="Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
|
||||
)
|
||||
ignore_time_mismatch: bool = Field(
|
||||
default=False,
|
||||
title="Ignore time mismatch",
|
||||
|
||||
@ -47,7 +47,7 @@ from .camera.detect import DetectConfig
|
||||
from .camera.ffmpeg import FfmpegConfig
|
||||
from .camera.genai import GenAIConfig, GenAIRoleEnum
|
||||
from .camera.mask import ObjectMaskConfig
|
||||
from .camera.motion import MotionConfig
|
||||
from .camera.motion import MotionConfig, MotionSourceEnum
|
||||
from .camera.notification import NotificationConfig
|
||||
from .camera.objects import FilterConfig, ObjectConfig
|
||||
from .camera.record import RecordConfig
|
||||
@ -380,10 +380,19 @@ def verify_autotrack_zones(camera_config: CameraConfig) -> ValueError | None:
|
||||
|
||||
def verify_motion_and_detect(camera_config: CameraConfig) -> ValueError | None:
|
||||
"""Verify that motion detection is not disabled and object detection is enabled."""
|
||||
if camera_config.detect.enabled and not camera_config.motion.enabled:
|
||||
motion_via_onvif = camera_config.motion.source == MotionSourceEnum.onvif
|
||||
if (
|
||||
camera_config.detect.enabled
|
||||
and not camera_config.motion.enabled
|
||||
and not motion_via_onvif
|
||||
):
|
||||
raise ValueError(
|
||||
f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection."
|
||||
)
|
||||
if motion_via_onvif and not camera_config.onvif.events.enabled:
|
||||
raise ValueError(
|
||||
f"Camera {camera_config.name} has motion.source=onvif but onvif.events.enabled is false; enable ONVIF events to use them as the motion source."
|
||||
)
|
||||
|
||||
|
||||
def verify_objects_track(
|
||||
|
||||
@ -13,17 +13,39 @@ import numpy
|
||||
from onvif import ONVIFCamera, ONVIFError, ONVIFService
|
||||
from zeep.exceptions import Fault, TransportError
|
||||
|
||||
from frigate.camera import PTZMetrics
|
||||
from frigate.camera import CameraMetrics, PTZMetrics
|
||||
from frigate.config import FrigateConfig, ZoomingModeEnum
|
||||
from frigate.config.camera.updater import (
|
||||
CameraConfigUpdateEnum,
|
||||
CameraConfigUpdateSubscriber,
|
||||
)
|
||||
from frigate.ptz.onvif_events import run_pullpoint_subscription
|
||||
from frigate.ptz.onvif_metadata import run_metadata_stream
|
||||
from frigate.util.builtin import find_by_key
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _inject_rtsp_credentials(url: str, user: str | None, password: str | None) -> str:
|
||||
"""Insert user:password into an rtsp:// URL if not already present.
|
||||
|
||||
The ONVIF GetStreamUri response typically returns an rtsp URL without
|
||||
credentials, but downstream consumers (ffmpeg, RTSP libs) need them in
|
||||
the URL because the camera challenges Basic/Digest on DESCRIBE.
|
||||
"""
|
||||
if not user or not password:
|
||||
return url
|
||||
if "@" in url.split("://", 1)[-1].split("/", 1)[0]:
|
||||
# URL already has user:pass — don't touch it.
|
||||
return url
|
||||
if "://" not in url:
|
||||
return url
|
||||
scheme, rest = url.split("://", 1)
|
||||
from urllib.parse import quote
|
||||
|
||||
return f"{scheme}://{quote(user, safe='')}:{quote(password, safe='')}@{rest}"
|
||||
|
||||
|
||||
class OnvifCommandEnum(str, Enum):
|
||||
"""Holds all possible move commands"""
|
||||
|
||||
@ -45,7 +67,10 @@ class OnvifController:
|
||||
ptz_metrics: dict[str, PTZMetrics]
|
||||
|
||||
def __init__(
|
||||
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics]
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
ptz_metrics: dict[str, PTZMetrics],
|
||||
camera_metrics: dict[str, CameraMetrics] | None = None,
|
||||
) -> None:
|
||||
self.cams: dict[str, dict] = {}
|
||||
self.failed_cams: dict[str, dict] = {}
|
||||
@ -53,6 +78,7 @@ class OnvifController:
|
||||
self.reset_timeout = 900 # 15 minutes
|
||||
self.config = config
|
||||
self.ptz_metrics = ptz_metrics
|
||||
self.camera_metrics = camera_metrics or {}
|
||||
|
||||
self.status_locks: dict[str, asyncio.Lock] = {}
|
||||
|
||||
@ -107,7 +133,28 @@ class OnvifController:
|
||||
async def _close_camera(self, cam_name: str) -> None:
|
||||
"""Close the ONVIF client session for a camera."""
|
||||
cam_state = self.cams.get(cam_name)
|
||||
if cam_state and "onvif" in cam_state:
|
||||
if not cam_state:
|
||||
return
|
||||
# Stop any long-running event-consumption tasks first so they release
|
||||
# any resources held against the ONVIFCamera session before we close it.
|
||||
for key in ("pullpoint", "metadata"):
|
||||
handle = cam_state.get(key)
|
||||
if not handle:
|
||||
continue
|
||||
task, stop_event = handle
|
||||
try:
|
||||
stop_event.set()
|
||||
except Exception:
|
||||
pass
|
||||
task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(task, timeout=5.0)
|
||||
except (asyncio.CancelledError, asyncio.TimeoutError):
|
||||
pass
|
||||
except Exception:
|
||||
logger.debug(f"Error awaiting {key} task for {cam_name}")
|
||||
cam_state.pop(key, None)
|
||||
if "onvif" in cam_state:
|
||||
try:
|
||||
await cam_state["onvif"].close()
|
||||
except Exception:
|
||||
@ -187,6 +234,172 @@ class OnvifController:
|
||||
logger.error(f"Onvif connection failed for {camera_name}: {e}")
|
||||
return False
|
||||
|
||||
# Events init runs first, independent of PTZ capability. Many ONVIF
|
||||
# cameras don't expose PTZ and would otherwise be skipped at the
|
||||
# get_definition("ptz") check below.
|
||||
await self._init_onvif_events(camera_name)
|
||||
|
||||
return await self._init_onvif_ptz(camera_name)
|
||||
|
||||
async def _init_onvif_events(self, camera_name: str) -> None:
|
||||
"""Subscribe to PullPoint motion events and optionally open the
|
||||
analytics metadata stream. Failure here is non-fatal — PTZ init still
|
||||
proceeds and the camera continues to work without external motion."""
|
||||
cam_cfg = self.config.cameras[camera_name]
|
||||
if not cam_cfg.onvif.events.enabled:
|
||||
return
|
||||
|
||||
cm = self.camera_metrics.get(camera_name)
|
||||
if cm is None:
|
||||
logger.warning(
|
||||
f"ONVIF events enabled for {camera_name} but no CameraMetrics "
|
||||
"available; external motion will not be published"
|
||||
)
|
||||
return
|
||||
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
|
||||
cell_layout = await self._discover_cell_layout(onvif, camera_name)
|
||||
self.cams[camera_name]["cell_layout"] = cell_layout
|
||||
|
||||
def on_state(active: bool) -> None:
|
||||
cm.external_motion_active.value = 1 if active else 0
|
||||
if not active:
|
||||
# Drop spatial data when motion ends — keep the consumer's
|
||||
# snapshot consistent with the binary state.
|
||||
try:
|
||||
cm.external_motion_boxes[:] = []
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
pp_stop = asyncio.Event()
|
||||
pp_task = asyncio.create_task(
|
||||
run_pullpoint_subscription(
|
||||
onvif,
|
||||
camera_name,
|
||||
cam_cfg.onvif.events.subscription_timeout,
|
||||
on_state,
|
||||
pp_stop,
|
||||
)
|
||||
)
|
||||
self.cams[camera_name]["pullpoint"] = (pp_task, pp_stop)
|
||||
logger.info(f"ONVIF events: PullPoint subscriber started for {camera_name}")
|
||||
|
||||
if not cam_cfg.onvif.events.use_metadata_stream or cell_layout is None:
|
||||
return
|
||||
|
||||
rtsp_url = await self._discover_primary_rtsp_url(onvif, camera_name)
|
||||
if not rtsp_url:
|
||||
logger.warning(
|
||||
f"ONVIF events for {camera_name}: no primary RTSP URL "
|
||||
"available; skipping metadata stream"
|
||||
)
|
||||
return
|
||||
rtsp_url = _inject_rtsp_credentials(
|
||||
rtsp_url, cam_cfg.onvif.user, cam_cfg.onvif.password
|
||||
)
|
||||
|
||||
detect_size = (cam_cfg.detect.width, cam_cfg.detect.height)
|
||||
|
||||
def on_boxes(boxes: list[tuple[int, int, int, int]]) -> None:
|
||||
try:
|
||||
cm.external_motion_boxes[:] = boxes
|
||||
except Exception:
|
||||
logger.debug(f"Failed to publish boxes for {camera_name}")
|
||||
|
||||
md_stop = asyncio.Event()
|
||||
md_task = asyncio.create_task(
|
||||
run_metadata_stream(
|
||||
rtsp_url,
|
||||
camera_name,
|
||||
cell_layout,
|
||||
detect_size,
|
||||
on_boxes,
|
||||
md_stop,
|
||||
)
|
||||
)
|
||||
self.cams[camera_name]["metadata"] = (md_task, md_stop)
|
||||
logger.info(f"ONVIF events: metadata stream consumer started for {camera_name}")
|
||||
|
||||
async def _discover_cell_layout(
|
||||
self, onvif: ONVIFCamera, camera_name: str
|
||||
) -> tuple[int, int, tuple[float, float], tuple[float, float]] | None:
|
||||
"""Query AnalyticsService.GetAnalyticsModules and extract the
|
||||
CellMotionEngine's CellLayout (Columns, Rows, Translate, Scale).
|
||||
Returns None on failure — caller should fall back to a full-frame box."""
|
||||
try:
|
||||
analytics = await onvif.create_analytics_service()
|
||||
modules = await analytics.GetAnalyticsModules(
|
||||
{"ConfigurationToken": "VA_CFG_000"}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"ONVIF analytics service unavailable for {camera_name}: {e}")
|
||||
return None
|
||||
|
||||
try:
|
||||
for mod in modules or []:
|
||||
mod_type = getattr(mod, "Type", None) or getattr(mod, "_attr_1", None)
|
||||
if mod_type and "CellMotionEngine" not in str(mod_type):
|
||||
continue
|
||||
element_items = getattr(mod.Parameters, "ElementItem", None) or []
|
||||
for item in element_items:
|
||||
if item.Name != "Layout":
|
||||
continue
|
||||
raw = item._value_1
|
||||
if raw is None or not hasattr(raw, "attrib"):
|
||||
continue
|
||||
cols = int(raw.attrib.get("Columns", 0))
|
||||
rows = int(raw.attrib.get("Rows", 0))
|
||||
if cols <= 0 or rows <= 0:
|
||||
continue
|
||||
tx = ty = 0.0
|
||||
sx = sy = 0.0
|
||||
for child in raw.iter():
|
||||
if child.tag.endswith("}Translate"):
|
||||
tx = float(child.attrib.get("x", 0))
|
||||
ty = float(child.attrib.get("y", 0))
|
||||
elif child.tag.endswith("}Scale"):
|
||||
sx = float(child.attrib.get("x", 0))
|
||||
sy = float(child.attrib.get("y", 0))
|
||||
logger.info(
|
||||
f"ONVIF cell layout for {camera_name}: {cols}x{rows} "
|
||||
f"translate=({tx},{ty}) scale=({sx},{sy})"
|
||||
)
|
||||
return (cols, rows, (tx, ty), (sx, sy))
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"Failed parsing CellMotionEngine layout for {camera_name}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
async def _discover_primary_rtsp_url(
|
||||
self, onvif: ONVIFCamera, camera_name: str
|
||||
) -> str | None:
|
||||
"""Return the RTSP URL for the primary profile. The ONVIF analytics
|
||||
metadata track is typically bound to the primary media profile only
|
||||
(sub-streams may omit it)."""
|
||||
try:
|
||||
media = await onvif.create_media_service()
|
||||
profiles = await media.GetProfiles()
|
||||
if not profiles:
|
||||
return None
|
||||
uri = await media.GetStreamUri(
|
||||
{
|
||||
"StreamSetup": {
|
||||
"Stream": "RTP-Unicast",
|
||||
"Transport": {"Protocol": "RTSP"},
|
||||
},
|
||||
"ProfileToken": profiles[0].token,
|
||||
}
|
||||
)
|
||||
return uri.Uri
|
||||
except Exception as e:
|
||||
logger.debug(f"GetStreamUri failed for {camera_name}: {e}")
|
||||
return None
|
||||
|
||||
async def _init_onvif_ptz(self, camera_name: str) -> bool:
|
||||
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
|
||||
|
||||
# create init services
|
||||
media: ONVIFService = await onvif.create_media_service()
|
||||
logger.debug(f"Onvif media xaddr for {camera_name}: {media.xaddr}")
|
||||
|
||||
138
frigate/ptz/onvif_events.py
Normal file
138
frigate/ptz/onvif_events.py
Normal file
@ -0,0 +1,138 @@
|
||||
"""ONVIF PullPoint subscriber for camera-side motion events.
|
||||
|
||||
Long-running per-camera coroutine that subscribes to the camera's PullPoint
|
||||
service via `onvif-zeep-async`'s `PullPointManager` (which owns subscription
|
||||
creation, renewal, and lifecycle), pulls notification messages, parses
|
||||
IsMotion/State on each round-trip, and invokes a callback on transitions.
|
||||
|
||||
Lives on the OnvifController's dedicated asyncio loop (see
|
||||
`frigate/ptz/onvif.py` for the loop setup).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import datetime as dt
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from onvif import ONVIFCamera
|
||||
|
||||
try:
|
||||
from zeep.exceptions import Fault
|
||||
except ImportError: # tests can run without zeep installed
|
||||
|
||||
class Fault(Exception): # type: ignore[no-redef]
|
||||
pass
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Names of the boolean state SimpleItem we accept inside the message Data
|
||||
# block. Spec calls it IsMotion; the legacy MotionAlarm topic uses State.
|
||||
_STATE_NAMES = ("IsMotion", "State")
|
||||
|
||||
# Bounds on backoff between subscription failures.
|
||||
_BACKOFF_INITIAL_S = 1.0
|
||||
_BACKOFF_MAX_S = 60.0
|
||||
|
||||
|
||||
def _parse_motion_state(msg) -> bool | None:
|
||||
"""Walk a NotificationMessage and return the IsMotion/State value, or
|
||||
None if not present. The Message body is often an `lxml.etree._Element`
|
||||
that python-onvif-zeep returns for ##any wildcards — walk via .iter()."""
|
||||
body = getattr(msg, "Message", None)
|
||||
if body is None:
|
||||
return None
|
||||
raw = getattr(body, "_value_1", body)
|
||||
if not hasattr(raw, "iter"):
|
||||
return None
|
||||
for el in raw.iter():
|
||||
if not el.tag.endswith("}SimpleItem"):
|
||||
continue
|
||||
name = el.attrib.get("Name", "")
|
||||
if name not in _STATE_NAMES:
|
||||
continue
|
||||
val = el.attrib.get("Value", "").strip().lower()
|
||||
if val in ("true", "1"):
|
||||
return True
|
||||
if val in ("false", "0"):
|
||||
return False
|
||||
return None
|
||||
|
||||
|
||||
async def run_pullpoint_subscription(
|
||||
onvif_cam: "ONVIFCamera",
|
||||
cam_name: str,
|
||||
timeout_seconds: int,
|
||||
on_state: Callable[[bool], None] | Callable[[bool], Awaitable[None]],
|
||||
stop_event: asyncio.Event,
|
||||
) -> None:
|
||||
"""Loop until stop_event: create a PullPointManager, pull messages,
|
||||
dispatch on_state on transitions, reconnect on Fault with exponential
|
||||
backoff."""
|
||||
backoff = _BACKOFF_INITIAL_S
|
||||
last_state: bool | None = None
|
||||
|
||||
while not stop_event.is_set():
|
||||
manager = None
|
||||
sub_lost = asyncio.Event()
|
||||
|
||||
def _subscription_lost() -> None:
|
||||
sub_lost.set()
|
||||
|
||||
try:
|
||||
manager = await onvif_cam.create_pullpoint_manager(
|
||||
dt.timedelta(seconds=timeout_seconds),
|
||||
_subscription_lost,
|
||||
)
|
||||
service = manager.get_service()
|
||||
logger.info(f"ONVIF PullPoint subscribed for {cam_name}")
|
||||
|
||||
while not stop_event.is_set() and not sub_lost.is_set():
|
||||
# Long-poll up to 10s. The subscription manager keeps the
|
||||
# subscription itself alive in the background — we just pull.
|
||||
msgs = await service.PullMessages(
|
||||
{"Timeout": "PT10S", "MessageLimit": 32}
|
||||
)
|
||||
for m in msgs.NotificationMessage or []:
|
||||
state = _parse_motion_state(m)
|
||||
if state is None or state == last_state:
|
||||
continue
|
||||
last_state = state
|
||||
try:
|
||||
result = on_state(state)
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
except Exception:
|
||||
logger.exception(f"on_state callback error for {cam_name}")
|
||||
|
||||
if sub_lost.is_set():
|
||||
raise Fault("PullPoint subscription lost")
|
||||
|
||||
# Clean exit (stop_event set) — leave the loop.
|
||||
backoff = _BACKOFF_INITIAL_S
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"ONVIF PullPoint subscription error for {cam_name}: {e!r}; "
|
||||
f"reconnecting in {backoff:.1f}s"
|
||||
)
|
||||
finally:
|
||||
if manager is not None:
|
||||
try:
|
||||
await manager.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if stop_event.is_set():
|
||||
return
|
||||
try:
|
||||
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
|
||||
return
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
backoff = min(backoff * 2, _BACKOFF_MAX_S)
|
||||
357
frigate/ptz/onvif_metadata.py
Normal file
357
frigate/ptz/onvif_metadata.py
Normal file
@ -0,0 +1,357 @@
|
||||
"""ONVIF analytics metadata stream consumer.
|
||||
|
||||
Per-camera asyncio task that opens an RTSP connection to the camera's
|
||||
primary profile, extracts the `application/vnd.onvif.metadata` data track
|
||||
via an ffmpeg subprocess, and converts the per-frame `<tt:MotionInCells>`
|
||||
bitmap into a list of motion rectangles in Frigate detect-frame pixels.
|
||||
|
||||
Why ffmpeg rather than an in-process RTSP client: Frigate already ships
|
||||
ffmpeg and uses it heavily for video/recording; there is no async RTSP
|
||||
client in the existing dependency set that handles the `vnd.onvif.metadata`
|
||||
payload cleanly. The data track is low-bandwidth (~1 packet/sec at idle,
|
||||
≤300 bytes XML each), so the subprocess cost is negligible.
|
||||
|
||||
Wire format (ONVIF Analytics Service Spec, Annex B "Cell Motion Detection"):
|
||||
- Each RTP packet payload is one complete <tt:MetadataStream> XML doc.
|
||||
- Cells attribute = base64(PackBits(bit-packed row-major bitmap)).
|
||||
- Bits: cols*rows total, MSB-first within bytes, zero-padded.
|
||||
|
||||
Cell → detect-frame mapping uses the CellLayout transformation discovered
|
||||
at OnvifController init: Translate(tx, ty) + Scale(sx, sy) maps cell index
|
||||
(c, r) to normalized ONVIF coords [-1, +1]. We convert that to detect-frame
|
||||
pixels.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
from typing import Awaitable, Callable
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_TT_NS = "http://www.onvif.org/ver10/schema"
|
||||
_MIC_TAG = f"{{{_TT_NS}}}MotionInCells"
|
||||
|
||||
# ffmpeg's -map 0:d:0 selects the first data track from the input. -c copy
|
||||
# bypasses any transcode. -f data writes raw packet payloads to stdout.
|
||||
# -flush_packets 1 disables muxer-side buffering so each metadata frame
|
||||
# reaches us within ~1 packet of being received from the camera.
|
||||
_FFMPEG_ARGS_TEMPLATE = (
|
||||
"-nostdin",
|
||||
"-loglevel",
|
||||
"error",
|
||||
"-rtsp_transport",
|
||||
"tcp",
|
||||
"-i",
|
||||
"{url}",
|
||||
"-map",
|
||||
"0:d:0?",
|
||||
"-c",
|
||||
"copy",
|
||||
"-flush_packets",
|
||||
"1",
|
||||
"-f",
|
||||
"data",
|
||||
"pipe:1",
|
||||
)
|
||||
|
||||
# Each metadata document ends with this closing tag — we split incoming
|
||||
# stdout on it to recover packet boundaries (no other framing on a `-f data`
|
||||
# stream).
|
||||
_DOC_TERMINATOR = b"</tt:MetadataStream>"
|
||||
|
||||
_BACKOFF_INITIAL_S = 1.0
|
||||
_BACKOFF_MAX_S = 60.0
|
||||
|
||||
# Stop reading at this many bytes per single document — guards against a
|
||||
# misbehaving stream filling memory if the terminator never arrives.
|
||||
_MAX_DOC_BYTES = 64 * 1024
|
||||
|
||||
|
||||
def _packbits_decode(packed: bytes) -> bytes:
|
||||
"""ISO 12639 / TIFF 6.0 PackBits decoder."""
|
||||
out = bytearray()
|
||||
i = 0
|
||||
n = len(packed)
|
||||
while i < n:
|
||||
h = packed[i]
|
||||
i += 1
|
||||
if h <= 0x7F:
|
||||
count = h + 1
|
||||
out += packed[i : i + count]
|
||||
i += count
|
||||
elif h == 0x80:
|
||||
continue # no-op header
|
||||
else:
|
||||
count = 257 - h
|
||||
if i >= n:
|
||||
break
|
||||
out += bytes([packed[i]]) * count
|
||||
i += 1
|
||||
return bytes(out)
|
||||
|
||||
|
||||
def _decode_cells(cells_b64: str, cols: int, rows: int) -> np.ndarray | None:
|
||||
"""Decode the Cells attribute into a 2-D uint8 array shape (rows, cols).
|
||||
|
||||
Returns None if the decoded length doesn't match what the layout
|
||||
expects — caller should treat that as "no spatial data this frame"
|
||||
and fall back to whatever default (e.g. full-frame box)."""
|
||||
if not cells_b64:
|
||||
return None
|
||||
try:
|
||||
packed = base64.b64decode(cells_b64, validate=False)
|
||||
except Exception:
|
||||
return None
|
||||
raw = _packbits_decode(packed)
|
||||
needed_bytes = (cols * rows + 7) // 8
|
||||
if len(raw) < needed_bytes:
|
||||
return None
|
||||
bits = np.unpackbits(np.frombuffer(raw[:needed_bytes], dtype=np.uint8))
|
||||
bits = bits[: cols * rows]
|
||||
return bits.reshape((rows, cols)).astype(np.uint8)
|
||||
|
||||
|
||||
def _connected_component_bboxes(
|
||||
cells: np.ndarray,
|
||||
) -> list[tuple[int, int, int, int]]:
|
||||
"""4-connectivity flood fill over a small 0/1 grid; returns list of
|
||||
(c_left, c_top, c_right, c_bottom) inclusive cell-index bounding boxes
|
||||
for each connected region.
|
||||
|
||||
cv2.connectedComponentsWithStats would be faster, but the cell grid is
|
||||
tiny (typically 22x18 = 396 cells) and avoiding the cv2 import keeps
|
||||
this module testable without OpenCV installed.
|
||||
"""
|
||||
rows, cols = cells.shape
|
||||
visited = np.zeros_like(cells, dtype=bool)
|
||||
out: list[tuple[int, int, int, int]] = []
|
||||
for r0 in range(rows):
|
||||
for c0 in range(cols):
|
||||
if not cells[r0, c0] or visited[r0, c0]:
|
||||
continue
|
||||
stack = [(r0, c0)]
|
||||
cmin = cmax = c0
|
||||
rmin = rmax = r0
|
||||
while stack:
|
||||
r, c = stack.pop()
|
||||
if r < 0 or r >= rows or c < 0 or c >= cols:
|
||||
continue
|
||||
if visited[r, c] or not cells[r, c]:
|
||||
continue
|
||||
visited[r, c] = True
|
||||
if r < rmin:
|
||||
rmin = r
|
||||
if r > rmax:
|
||||
rmax = r
|
||||
if c < cmin:
|
||||
cmin = c
|
||||
if c > cmax:
|
||||
cmax = c
|
||||
stack.append((r + 1, c))
|
||||
stack.append((r - 1, c))
|
||||
stack.append((r, c + 1))
|
||||
stack.append((r, c - 1))
|
||||
out.append((cmin, rmin, cmax, rmax))
|
||||
return out
|
||||
|
||||
|
||||
def _cells_to_boxes(
|
||||
cells: np.ndarray,
|
||||
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
|
||||
detect_size: tuple[int, int],
|
||||
) -> list[tuple[int, int, int, int]]:
|
||||
"""Connected-components on the cell grid → list of detect-frame boxes.
|
||||
|
||||
cell_layout = (cols, rows, (tx, ty), (sx, sy)) — the Translate + Scale
|
||||
from CellLayout.Transformation. detect_size = (width, height) in
|
||||
detect-frame pixels.
|
||||
"""
|
||||
if cells is None or cells.size == 0 or not cells.any():
|
||||
return []
|
||||
|
||||
cols, rows, (tx, ty), (sx, sy) = cell_layout
|
||||
det_w, det_h = detect_size
|
||||
if det_w <= 0 or det_h <= 0:
|
||||
return []
|
||||
|
||||
boxes: list[tuple[int, int, int, int]] = []
|
||||
|
||||
# Map cell index → detect-frame pixel via the CellLayout transformation:
|
||||
# cell (c, r) covers normalized [tx + c*sx, tx + (c+1)*sx] horizontally
|
||||
# and similarly vertically. Convert normalized [-1, +1] → pixel.
|
||||
def cell_to_px(
|
||||
c: int, r: int, *, right_edge: bool, bottom_edge: bool
|
||||
) -> tuple[int, int]:
|
||||
cx_idx = c + 1 if right_edge else c
|
||||
cy_idx = r + 1 if bottom_edge else r
|
||||
nx = tx + cx_idx * sx
|
||||
ny = ty + cy_idx * sy
|
||||
px = int(round((nx + 1.0) * 0.5 * det_w))
|
||||
py = int(round((ny + 1.0) * 0.5 * det_h))
|
||||
return px, py
|
||||
|
||||
for c_left, c_top, c_right, c_bottom in _connected_component_bboxes(cells):
|
||||
x1, y1 = cell_to_px(c_left, c_top, right_edge=False, bottom_edge=False)
|
||||
x2, y2 = cell_to_px(c_right, c_bottom, right_edge=True, bottom_edge=True)
|
||||
x1 = max(0, min(det_w - 1, x1))
|
||||
y1 = max(0, min(det_h - 1, y1))
|
||||
x2 = max(0, min(det_w - 1, x2))
|
||||
y2 = max(0, min(det_h - 1, y2))
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
continue
|
||||
boxes.append((x1, y1, x2, y2))
|
||||
|
||||
return boxes
|
||||
|
||||
|
||||
def _extract_cells_from_doc(doc_bytes: bytes) -> tuple[str | None, int, int]:
|
||||
"""Parse a <tt:MetadataStream> XML doc, return (cells_b64, cols, rows).
|
||||
|
||||
Returns (None, 0, 0) if no MotionInCells element is found."""
|
||||
try:
|
||||
root = ET.fromstring(doc_bytes)
|
||||
except ET.ParseError:
|
||||
return None, 0, 0
|
||||
for el in root.iter(_MIC_TAG):
|
||||
cells_b64 = el.attrib.get("Cells")
|
||||
try:
|
||||
cols = int(el.attrib.get("Columns", "0"))
|
||||
rows = int(el.attrib.get("Rows", "0"))
|
||||
except ValueError:
|
||||
return None, 0, 0
|
||||
return cells_b64, cols, rows
|
||||
return None, 0, 0
|
||||
|
||||
|
||||
async def run_metadata_stream(
|
||||
rtsp_url: str,
|
||||
cam_name: str,
|
||||
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
|
||||
detect_size: tuple[int, int],
|
||||
on_boxes: Callable[[list[tuple[int, int, int, int]]], None]
|
||||
| Callable[[list[tuple[int, int, int, int]]], Awaitable[None]],
|
||||
stop_event: asyncio.Event,
|
||||
) -> None:
|
||||
"""Loop until stop_event: spawn ffmpeg → read XML docs → decode → on_boxes."""
|
||||
backoff = _BACKOFF_INITIAL_S
|
||||
|
||||
while not stop_event.is_set():
|
||||
proc = None
|
||||
try:
|
||||
args = [a.format(url=rtsp_url) for a in _FFMPEG_ARGS_TEMPLATE]
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"ffmpeg",
|
||||
*args,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
logger.info(
|
||||
f"ONVIF metadata stream: ffmpeg started for {cam_name} pid={proc.pid}"
|
||||
)
|
||||
await _consume_ffmpeg(
|
||||
proc, cam_name, cell_layout, detect_size, on_boxes, stop_event
|
||||
)
|
||||
backoff = _BACKOFF_INITIAL_S
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"ONVIF metadata stream error for {cam_name}: {e!r}; "
|
||||
f"reconnecting in {backoff:.1f}s"
|
||||
)
|
||||
finally:
|
||||
if proc is not None and proc.returncode is None:
|
||||
proc.terminate()
|
||||
try:
|
||||
await asyncio.wait_for(proc.wait(), timeout=2.0)
|
||||
except asyncio.TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
|
||||
if stop_event.is_set():
|
||||
return
|
||||
try:
|
||||
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
|
||||
return
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
backoff = min(backoff * 2, _BACKOFF_MAX_S)
|
||||
|
||||
|
||||
async def _consume_ffmpeg(
|
||||
proc: asyncio.subprocess.Process,
|
||||
cam_name: str,
|
||||
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
|
||||
detect_size: tuple[int, int],
|
||||
on_boxes,
|
||||
stop_event: asyncio.Event,
|
||||
) -> None:
|
||||
"""Read XML docs from ffmpeg stdout and dispatch boxes."""
|
||||
layout_cols, layout_rows, _, _ = cell_layout
|
||||
assert proc.stdout is not None
|
||||
buf = bytearray()
|
||||
|
||||
while not stop_event.is_set():
|
||||
chunk = await proc.stdout.read(4096)
|
||||
if not chunk:
|
||||
# ffmpeg exited or stream ended.
|
||||
stderr_tail = b""
|
||||
if proc.stderr is not None:
|
||||
try:
|
||||
stderr_tail = await asyncio.wait_for(
|
||||
proc.stderr.read(4096), timeout=0.5
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
raise RuntimeError(
|
||||
f"ffmpeg exited for {cam_name} rc={proc.returncode} "
|
||||
f"stderr={stderr_tail.decode('utf-8', 'replace').strip()[:200]}"
|
||||
)
|
||||
|
||||
buf.extend(chunk)
|
||||
if len(buf) > _MAX_DOC_BYTES * 4:
|
||||
# Drop the head to avoid unbounded growth on a wedged stream.
|
||||
buf = buf[-_MAX_DOC_BYTES:]
|
||||
|
||||
while True:
|
||||
end = buf.find(_DOC_TERMINATOR)
|
||||
if end < 0:
|
||||
break
|
||||
end += len(_DOC_TERMINATOR)
|
||||
doc = bytes(buf[:end])
|
||||
del buf[:end]
|
||||
|
||||
cells_b64, cols, rows = _extract_cells_from_doc(doc)
|
||||
if cells_b64 is None:
|
||||
continue
|
||||
# Trust the layout we discovered at init; warn (don't fail) if the
|
||||
# camera reports a different grid mid-stream.
|
||||
if cols != layout_cols or rows != layout_rows:
|
||||
logger.debug(
|
||||
f"{cam_name}: MotionInCells grid {cols}x{rows} differs "
|
||||
f"from discovered layout {layout_cols}x{layout_rows}"
|
||||
)
|
||||
use_layout = (
|
||||
cols,
|
||||
rows,
|
||||
cell_layout[2],
|
||||
(2.0 / cols if cols else 0, 2.0 / rows if rows else 0),
|
||||
)
|
||||
else:
|
||||
use_layout = cell_layout
|
||||
|
||||
cells = _decode_cells(cells_b64, cols, rows)
|
||||
if cells is None:
|
||||
continue
|
||||
boxes = _cells_to_boxes(cells, use_layout, detect_size)
|
||||
try:
|
||||
result = on_boxes(boxes)
|
||||
if asyncio.iscoroutine(result):
|
||||
await result
|
||||
except Exception:
|
||||
logger.exception(f"on_boxes callback error for {cam_name}")
|
||||
58
frigate/test/http_api/test_http_keyframe_analysis.py
Normal file
58
frigate/test/http_api/test_http_keyframe_analysis.py
Normal file
@ -0,0 +1,58 @@
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from frigate.models import Event, Recordings, ReviewSegment
|
||||
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
|
||||
|
||||
|
||||
class TestHttpKeyframeAnalysis(BaseTestHttp):
|
||||
def setUp(self):
|
||||
super().setUp([Event, Recordings, ReviewSegment])
|
||||
|
||||
def test_invalid_camera_returns_404(self):
|
||||
app = super().create_app()
|
||||
with AuthTestClient(app) as client:
|
||||
response = client.get("/keyframe_analysis?camera=does_not_exist")
|
||||
assert response.status_code == 404
|
||||
|
||||
def test_record_disabled_returns_neutral(self):
|
||||
# default minimal_config has recording disabled
|
||||
app = super().create_app()
|
||||
with AuthTestClient(app) as client:
|
||||
response = client.get("/keyframe_analysis?camera=front_door")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["severity"] == "record_disabled"
|
||||
|
||||
def test_probes_record_input_and_returns_severity(self):
|
||||
self.minimal_config["cameras"]["front_door"]["ffmpeg"]["inputs"] = [
|
||||
{
|
||||
"path": "rtsp://10.0.0.1:554/record",
|
||||
"roles": ["detect", "record"],
|
||||
}
|
||||
]
|
||||
self.minimal_config["cameras"]["front_door"]["record"] = {"enabled": True}
|
||||
app = super().create_app()
|
||||
|
||||
canned = {
|
||||
"severity": "ok",
|
||||
"keyframe_count": 5,
|
||||
"max_gap": 1.0,
|
||||
"mean_gap": 1.0,
|
||||
"min_gap": 1.0,
|
||||
"segment_time": 10,
|
||||
"duration_observed": 4.0,
|
||||
"thresholds": {"warning": 4.0, "error": 10},
|
||||
}
|
||||
|
||||
with patch(
|
||||
"frigate.api.camera.analyze_record_keyframes",
|
||||
AsyncMock(return_value=canned),
|
||||
) as mock_probe:
|
||||
with AuthTestClient(app) as client:
|
||||
response = client.get("/keyframe_analysis?camera=front_door")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["severity"] == "ok"
|
||||
# index matches the input carrying the record role ("Stream 1")
|
||||
assert response.json()["stream_index"] == 0
|
||||
# the record-role input path was probed
|
||||
assert mock_probe.await_args.args[1] == "rtsp://10.0.0.1:554/record"
|
||||
111
frigate/test/test_keyframe_analysis.py
Normal file
111
frigate/test/test_keyframe_analysis.py
Normal file
@ -0,0 +1,111 @@
|
||||
"""Tests for keyframe-spacing analysis used to detect smart/+ codecs."""
|
||||
|
||||
import asyncio
|
||||
import unittest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from frigate.util.services import (
|
||||
analyze_record_keyframes,
|
||||
classify_keyframe_gaps,
|
||||
parse_keyframe_packets,
|
||||
)
|
||||
|
||||
|
||||
class TestClassifyKeyframeGaps(unittest.TestCase):
|
||||
def test_ok_when_gaps_small(self):
|
||||
# keyframes every ~1s
|
||||
pts = [0.0, 1.0, 2.0, 3.0, 4.0]
|
||||
result = classify_keyframe_gaps(pts, segment_time=10)
|
||||
self.assertEqual(result["severity"], "ok")
|
||||
self.assertEqual(result["max_gap"], 1.0)
|
||||
self.assertEqual(result["keyframe_count"], 5)
|
||||
self.assertEqual(result["thresholds"], {"warning": 4.0, "error": 10})
|
||||
|
||||
def test_warning_when_gap_exceeds_four_seconds(self):
|
||||
pts = [0.0, 1.0, 6.5] # 5.5s gap
|
||||
result = classify_keyframe_gaps(pts, segment_time=10)
|
||||
self.assertEqual(result["severity"], "warning")
|
||||
self.assertEqual(result["max_gap"], 5.5)
|
||||
|
||||
def test_error_when_gap_exceeds_segment_time(self):
|
||||
pts = [0.0, 12.0] # 12s gap > 10s segment
|
||||
result = classify_keyframe_gaps(pts, segment_time=10)
|
||||
self.assertEqual(result["severity"], "error")
|
||||
|
||||
def test_error_threshold_tracks_segment_time(self):
|
||||
pts = [0.0, 6.0] # 6s gap, segment_time=5 -> error
|
||||
result = classify_keyframe_gaps(pts, segment_time=5)
|
||||
self.assertEqual(result["severity"], "error")
|
||||
|
||||
def test_unknown_with_single_keyframe(self):
|
||||
result = classify_keyframe_gaps([1.0], segment_time=10)
|
||||
self.assertEqual(result["severity"], "unknown")
|
||||
self.assertIsNone(result["max_gap"])
|
||||
self.assertEqual(result["keyframe_count"], 1)
|
||||
|
||||
def test_unknown_with_no_keyframes(self):
|
||||
result = classify_keyframe_gaps([], segment_time=10)
|
||||
self.assertEqual(result["severity"], "unknown")
|
||||
self.assertEqual(result["keyframe_count"], 0)
|
||||
|
||||
|
||||
class TestParseKeyframePackets(unittest.TestCase):
|
||||
def test_extracts_keyframe_pts_and_max(self):
|
||||
output = "0.000000,K__\n0.033333,___\n1.000000,K__\n1.500000,___\n"
|
||||
keyframe_pts, max_pts = parse_keyframe_packets(output)
|
||||
self.assertEqual(keyframe_pts, [0.0, 1.0])
|
||||
self.assertEqual(max_pts, 1.5)
|
||||
|
||||
def test_skips_unparseable_and_empty_lines(self):
|
||||
output = "N/A,K__\n\n2.0,K__\nbad line\n"
|
||||
keyframe_pts, max_pts = parse_keyframe_packets(output)
|
||||
self.assertEqual(keyframe_pts, [2.0])
|
||||
self.assertEqual(max_pts, 2.0)
|
||||
|
||||
def test_empty_output(self):
|
||||
keyframe_pts, max_pts = parse_keyframe_packets("")
|
||||
self.assertEqual(keyframe_pts, [])
|
||||
self.assertIsNone(max_pts)
|
||||
|
||||
|
||||
class TestAnalyzeRecordKeyframes(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_merges_duration_and_classification(self):
|
||||
csv = b"0.0,K__\n1.0,___\n6.0,K__\n7.0,___\n"
|
||||
proc = MagicMock()
|
||||
proc.communicate = AsyncMock(return_value=(csv, b""))
|
||||
ffmpeg = MagicMock()
|
||||
ffmpeg.ffprobe_path = "/usr/bin/ffprobe"
|
||||
|
||||
with patch(
|
||||
"frigate.util.services.asyncio.create_subprocess_exec",
|
||||
AsyncMock(return_value=proc),
|
||||
):
|
||||
result = await analyze_record_keyframes(
|
||||
ffmpeg, "rtsp://cam/stream", segment_time=10
|
||||
)
|
||||
|
||||
self.assertEqual(result["severity"], "warning") # 6s gap > 4s
|
||||
self.assertEqual(result["max_gap"], 6.0)
|
||||
self.assertEqual(result["duration_observed"], 7.0)
|
||||
|
||||
async def test_timeout_returns_unknown(self):
|
||||
proc = MagicMock()
|
||||
proc.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
|
||||
proc.kill = MagicMock()
|
||||
ffmpeg = MagicMock()
|
||||
ffmpeg.ffprobe_path = "/usr/bin/ffprobe"
|
||||
|
||||
with patch(
|
||||
"frigate.util.services.asyncio.create_subprocess_exec",
|
||||
AsyncMock(return_value=proc),
|
||||
):
|
||||
result = await analyze_record_keyframes(
|
||||
ffmpeg, "rtsp://cam/stream", segment_time=10
|
||||
)
|
||||
|
||||
self.assertEqual(result["severity"], "unknown")
|
||||
proc.kill.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
64
frigate/test/test_motion_source_validator.py
Normal file
64
frigate/test/test_motion_source_validator.py
Normal file
@ -0,0 +1,64 @@
|
||||
"""Validator tests for `motion.source=onvif` interactions with
|
||||
`onvif.events.enabled` and `motion.enabled`. Exercises `verify_motion_and_detect`
|
||||
directly so we don't need the full FrigateConfig path (which mounts /config)."""
|
||||
|
||||
import unittest
|
||||
|
||||
from frigate.config.config import verify_motion_and_detect
|
||||
|
||||
|
||||
class _Dummy:
|
||||
"""Light shim for the nested config attributes the validator reads."""
|
||||
|
||||
def __init__(self, **kw):
|
||||
for k, v in kw.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
def _camera(*, name, detect, motion, onvif):
|
||||
return _Dummy(name=name, detect=detect, motion=motion, onvif=onvif)
|
||||
|
||||
|
||||
class TestVerifyMotionAndDetect(unittest.TestCase):
|
||||
def test_internal_motion_with_detect_passes(self):
|
||||
cam = _camera(
|
||||
name="c",
|
||||
detect=_Dummy(enabled=True),
|
||||
motion=_Dummy(enabled=True, source="internal"),
|
||||
onvif=_Dummy(events=_Dummy(enabled=False)),
|
||||
)
|
||||
# No exception.
|
||||
self.assertIsNone(verify_motion_and_detect(cam))
|
||||
|
||||
def test_detect_with_motion_disabled_rejected(self):
|
||||
cam = _camera(
|
||||
name="c",
|
||||
detect=_Dummy(enabled=True),
|
||||
motion=_Dummy(enabled=False, source="internal"),
|
||||
onvif=_Dummy(events=_Dummy(enabled=False)),
|
||||
)
|
||||
with self.assertRaisesRegex(ValueError, "object detection requires motion"):
|
||||
verify_motion_and_detect(cam)
|
||||
|
||||
def test_source_onvif_requires_events_enabled(self):
|
||||
cam = _camera(
|
||||
name="c",
|
||||
detect=_Dummy(enabled=True),
|
||||
motion=_Dummy(enabled=False, source="onvif"),
|
||||
onvif=_Dummy(events=_Dummy(enabled=False)),
|
||||
)
|
||||
with self.assertRaisesRegex(ValueError, "onvif.events.enabled is false"):
|
||||
verify_motion_and_detect(cam)
|
||||
|
||||
def test_source_onvif_with_events_passes_even_with_motion_disabled(self):
|
||||
cam = _camera(
|
||||
name="c",
|
||||
detect=_Dummy(enabled=True),
|
||||
motion=_Dummy(enabled=False, source="onvif"),
|
||||
onvif=_Dummy(events=_Dummy(enabled=True)),
|
||||
)
|
||||
self.assertIsNone(verify_motion_and_detect(cam))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
152
frigate/test/test_onvif_metadata.py
Normal file
152
frigate/test/test_onvif_metadata.py
Normal file
@ -0,0 +1,152 @@
|
||||
"""Unit tests for the ONVIF analytics metadata decoder + cell→box mapper."""
|
||||
|
||||
import base64
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
|
||||
from frigate.ptz.onvif_metadata import (
|
||||
_cells_to_boxes,
|
||||
_decode_cells,
|
||||
_extract_cells_from_doc,
|
||||
_packbits_decode,
|
||||
)
|
||||
|
||||
|
||||
class TestPackBits(unittest.TestCase):
|
||||
def test_spec_example(self):
|
||||
# ONVIF Analytics Annex B worked example:
|
||||
# raw = ff ff ff f0 f0 f0
|
||||
# packed (PackBits) = fe ff fe f0
|
||||
packed = bytes.fromhex("feff fef0".replace(" ", ""))
|
||||
self.assertEqual(
|
||||
_packbits_decode(packed),
|
||||
bytes.fromhex("ffff fff0 f0f0".replace(" ", "")),
|
||||
)
|
||||
|
||||
def test_idle_frame_from_live_camera(self):
|
||||
# `zwA=` is a representative idle-frame payload from a 22×18 grid:
|
||||
# 396 bits → 50 bytes after byte-padding; PackBits compresses 50
|
||||
# zeros to `cf 00` → base64 `zwA=`.
|
||||
packed = base64.b64decode("zwA=")
|
||||
self.assertEqual(packed, bytes.fromhex("cf 00".replace(" ", "")))
|
||||
raw = _packbits_decode(packed)
|
||||
self.assertEqual(len(raw), 50)
|
||||
self.assertEqual(raw, b"\x00" * 50)
|
||||
|
||||
def test_literal_run(self):
|
||||
# Header 0x03 → 4 literal bytes follow.
|
||||
self.assertEqual(_packbits_decode(b"\x03ABCD"), b"ABCD")
|
||||
|
||||
def test_noop_header(self):
|
||||
# 0x80 is no-op per spec; literal after should still decode normally.
|
||||
# \x80 → no-op; \x02 → literal of 3 bytes follows; "ABC" copied.
|
||||
self.assertEqual(_packbits_decode(b"\x80\x02ABC"), b"ABC")
|
||||
|
||||
|
||||
class TestDecodeCells(unittest.TestCase):
|
||||
def test_idle(self):
|
||||
cells = _decode_cells("zwA=", 22, 18)
|
||||
self.assertIsNotNone(cells)
|
||||
self.assertEqual(cells.shape, (18, 22))
|
||||
self.assertEqual(int(cells.sum()), 0)
|
||||
|
||||
def test_invalid_base64(self):
|
||||
self.assertIsNone(_decode_cells("not-base64!@", 22, 18))
|
||||
|
||||
def test_short_payload_returns_none(self):
|
||||
# `cf 00` decodes to 50 bytes; ask for 100×100 grid (1250 bytes
|
||||
# needed) → expect None.
|
||||
self.assertIsNone(_decode_cells("zwA=", 100, 100))
|
||||
|
||||
def test_top_left_active(self):
|
||||
# Build a 22×18 grid with only cell (0,0) active. Raw bitmap byte 0
|
||||
# = 0x80 (MSB set), bytes 1..49 = 0x00. PackBits of that 50-byte
|
||||
# sequence: literal-1-byte (header 0x00) of 0x80, then replicate of
|
||||
# 49 zeros (header 257-49=208=0xD0, byte 0x00).
|
||||
packed = bytes([0x00, 0x80, 0xD0, 0x00])
|
||||
b64 = base64.b64encode(packed).decode()
|
||||
cells = _decode_cells(b64, 22, 18)
|
||||
self.assertIsNotNone(cells)
|
||||
self.assertEqual(int(cells[0, 0]), 1)
|
||||
self.assertEqual(int(cells.sum()), 1)
|
||||
|
||||
|
||||
class TestCellsToBoxes(unittest.TestCase):
|
||||
"""Verify the cell-grid → detect-frame pixel mapping using a representative
|
||||
CellLayout (22x18, Translate(-1,-1), Scale(2/22, 2/18))."""
|
||||
|
||||
LAYOUT = (22, 18, (-1.0, -1.0), (2.0 / 22, 2.0 / 18))
|
||||
DETECT = (1280, 720)
|
||||
|
||||
def test_empty(self):
|
||||
cells = np.zeros((18, 22), dtype=np.uint8)
|
||||
self.assertEqual(_cells_to_boxes(cells, self.LAYOUT, self.DETECT), [])
|
||||
|
||||
def test_top_left_cell(self):
|
||||
cells = np.zeros((18, 22), dtype=np.uint8)
|
||||
cells[0, 0] = 1
|
||||
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
|
||||
self.assertEqual(len(boxes), 1)
|
||||
x1, y1, x2, y2 = boxes[0]
|
||||
# Cell (0,0) covers normalized [-1, -1+2/22] × [-1, -1+2/18]
|
||||
# → detect px [0, 1280/22] × [0, 720/18] = [0, ~58] × [0, 40]
|
||||
self.assertEqual(x1, 0)
|
||||
self.assertEqual(y1, 0)
|
||||
self.assertAlmostEqual(x2, round(1280 / 22), delta=2)
|
||||
self.assertAlmostEqual(y2, round(720 / 18), delta=2)
|
||||
|
||||
def test_bottom_right_cell(self):
|
||||
cells = np.zeros((18, 22), dtype=np.uint8)
|
||||
cells[17, 21] = 1
|
||||
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
|
||||
self.assertEqual(len(boxes), 1)
|
||||
x1, y1, x2, y2 = boxes[0]
|
||||
# Bottom-right edge clamps to detect_size - 1.
|
||||
self.assertEqual(x2, self.DETECT[0] - 1)
|
||||
self.assertEqual(y2, self.DETECT[1] - 1)
|
||||
self.assertAlmostEqual(x1, round(21 * 1280 / 22), delta=2)
|
||||
self.assertAlmostEqual(y1, round(17 * 720 / 18), delta=2)
|
||||
|
||||
def test_two_separated_regions(self):
|
||||
cells = np.zeros((18, 22), dtype=np.uint8)
|
||||
# Region A: top-left 2×2 block
|
||||
cells[0:2, 0:2] = 1
|
||||
# Region B: bottom-right 2×2 block (separated by inactive cells)
|
||||
cells[15:17, 18:20] = 1
|
||||
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
|
||||
self.assertEqual(len(boxes), 2)
|
||||
|
||||
|
||||
class TestExtractCellsFromDoc(unittest.TestCase):
|
||||
def test_typical_frame(self):
|
||||
doc = (
|
||||
b'<tt:MetadataStream xmlns:tt="http://www.onvif.org/ver10/schema">'
|
||||
b"<tt:VideoAnalytics>"
|
||||
b'<tt:Frame UtcTime="2026-05-29T14:12:20Z">'
|
||||
b"<tt:Extension>"
|
||||
b'<tt:MotionInCells Columns="22" Rows="18" Cells="zwA="/>'
|
||||
b"</tt:Extension></tt:Frame></tt:VideoAnalytics></tt:MetadataStream>"
|
||||
)
|
||||
cells_b64, cols, rows = _extract_cells_from_doc(doc)
|
||||
self.assertEqual(cells_b64, "zwA=")
|
||||
self.assertEqual(cols, 22)
|
||||
self.assertEqual(rows, 18)
|
||||
|
||||
def test_malformed_xml(self):
|
||||
self.assertEqual(
|
||||
_extract_cells_from_doc(b"not-xml"),
|
||||
(None, 0, 0),
|
||||
)
|
||||
|
||||
def test_doc_without_motioncells(self):
|
||||
doc = (
|
||||
b'<tt:MetadataStream xmlns:tt="http://www.onvif.org/ver10/schema">'
|
||||
b"<tt:VideoAnalytics><tt:Frame/></tt:VideoAnalytics>"
|
||||
b"</tt:MetadataStream>"
|
||||
)
|
||||
self.assertEqual(_extract_cells_from_doc(doc), (None, 0, 0))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
72
frigate/test/test_onvif_pullpoint.py
Normal file
72
frigate/test/test_onvif_pullpoint.py
Normal file
@ -0,0 +1,72 @@
|
||||
"""Unit tests for the ONVIF PullPoint motion-state parser."""
|
||||
|
||||
import unittest
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
from frigate.ptz.onvif_events import _parse_motion_state
|
||||
|
||||
|
||||
class FakeMessage:
|
||||
"""Mimic the zeep NotificationMessage shape: a Message attribute holding
|
||||
an object whose `_value_1` is an lxml/etree element."""
|
||||
|
||||
class _Body:
|
||||
def __init__(self, element):
|
||||
self._value_1 = element
|
||||
|
||||
def __init__(self, xml: str):
|
||||
self.Message = self._Body(ET.fromstring(xml))
|
||||
|
||||
|
||||
_NS = 'xmlns:tt="http://www.onvif.org/ver10/schema"'
|
||||
|
||||
|
||||
def _build_msg(name: str, value: str) -> FakeMessage:
|
||||
xml = (
|
||||
f"<tt:Message {_NS}>"
|
||||
"<tt:Source>"
|
||||
'<tt:SimpleItem Name="Source" Value="VideoSourceToken"/>'
|
||||
"</tt:Source>"
|
||||
"<tt:Data>"
|
||||
f'<tt:SimpleItem Name="{name}" Value="{value}"/>'
|
||||
"</tt:Data>"
|
||||
"</tt:Message>"
|
||||
)
|
||||
return FakeMessage(xml)
|
||||
|
||||
|
||||
class TestParseMotionState(unittest.TestCase):
|
||||
def test_is_motion_true(self):
|
||||
self.assertTrue(_parse_motion_state(_build_msg("IsMotion", "true")))
|
||||
|
||||
def test_is_motion_false(self):
|
||||
self.assertFalse(_parse_motion_state(_build_msg("IsMotion", "false")))
|
||||
|
||||
def test_legacy_state_topic_name(self):
|
||||
# The legacy tns1:VideoSource/MotionAlarm payload uses "State" instead
|
||||
# of the spec-compliant "IsMotion"; we accept either.
|
||||
self.assertTrue(_parse_motion_state(_build_msg("State", "true")))
|
||||
self.assertFalse(_parse_motion_state(_build_msg("State", "false")))
|
||||
|
||||
def test_boolean_aliases(self):
|
||||
self.assertTrue(_parse_motion_state(_build_msg("IsMotion", "1")))
|
||||
self.assertFalse(_parse_motion_state(_build_msg("IsMotion", "0")))
|
||||
|
||||
def test_no_state_returns_none(self):
|
||||
# Missing the State/IsMotion SimpleItem.
|
||||
xml = (
|
||||
f"<tt:Message {_NS}>"
|
||||
'<tt:Data><tt:SimpleItem Name="Other" Value="yes"/></tt:Data>'
|
||||
"</tt:Message>"
|
||||
)
|
||||
self.assertIsNone(_parse_motion_state(FakeMessage(xml)))
|
||||
|
||||
def test_no_message_returns_none(self):
|
||||
class Empty:
|
||||
pass
|
||||
|
||||
self.assertIsNone(_parse_motion_state(Empty()))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@ -14,13 +14,16 @@ import urllib.parse
|
||||
from collections.abc import Mapping
|
||||
from multiprocessing.managers import ValueProxy
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
|
||||
|
||||
import numpy as np
|
||||
from ruamel.yaml import YAML
|
||||
|
||||
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from frigate.config import CameraConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -132,6 +135,24 @@ def get_ffmpeg_arg_list(arg: Any) -> list:
|
||||
return arg if isinstance(arg, list) else shlex.split(arg)
|
||||
|
||||
|
||||
# all built-in record presets use this segment_time
|
||||
DEFAULT_RECORD_SEGMENT_TIME = 10
|
||||
|
||||
|
||||
def get_record_segment_time(config: "CameraConfig") -> int:
|
||||
"""Extract -segment_time from the camera's record output args."""
|
||||
record_args = get_ffmpeg_arg_list(config.ffmpeg.output_args.record)
|
||||
|
||||
if record_args and record_args[0].startswith("preset"):
|
||||
return DEFAULT_RECORD_SEGMENT_TIME
|
||||
|
||||
try:
|
||||
idx = record_args.index("-segment_time")
|
||||
return int(record_args[idx + 1])
|
||||
except (ValueError, IndexError):
|
||||
return DEFAULT_RECORD_SEGMENT_TIME
|
||||
|
||||
|
||||
def load_labels(
|
||||
path: Optional[str], encoding="utf-8", prefill=91, indexed: bool | None = None
|
||||
):
|
||||
|
||||
@ -879,6 +879,131 @@ def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedPro
|
||||
return result
|
||||
|
||||
|
||||
KEYFRAME_PROBE_WINDOW_SECONDS = 20
|
||||
KEYFRAME_GAP_WARNING_SECONDS = 4.0
|
||||
|
||||
|
||||
def parse_keyframe_packets(output: str) -> Tuple[List[float], Optional[float]]:
|
||||
"""Parse ffprobe CSV `pts_time,flags` output.
|
||||
|
||||
Returns the presentation timestamps of keyframes (flags containing "K")
|
||||
and the maximum timestamp observed across all packets.
|
||||
"""
|
||||
keyframe_pts: List[float] = []
|
||||
max_pts: Optional[float] = None
|
||||
|
||||
for line in output.splitlines():
|
||||
parts = line.split(",")
|
||||
if len(parts) < 2:
|
||||
continue
|
||||
try:
|
||||
pts = float(parts[0])
|
||||
except ValueError:
|
||||
continue
|
||||
if max_pts is None or pts > max_pts:
|
||||
max_pts = pts
|
||||
if "K" in parts[1]:
|
||||
keyframe_pts.append(pts)
|
||||
|
||||
return keyframe_pts, max_pts
|
||||
|
||||
|
||||
def classify_keyframe_gaps(
|
||||
keyframe_pts: List[float], segment_time: int
|
||||
) -> dict[str, Any]:
|
||||
"""Classify keyframe spacing for recording suitability.
|
||||
|
||||
A camera using a smart/+ codec or a long/variable GOP produces large or
|
||||
irregular gaps between keyframes, which breaks time-based recording
|
||||
segmentation. Severity:
|
||||
- "unknown" when fewer than two keyframes were observed
|
||||
- "error" when the longest gap exceeds the record segment length
|
||||
- "warning" when the longest gap exceeds the warning threshold
|
||||
- "ok" otherwise
|
||||
"""
|
||||
thresholds = {
|
||||
"warning": KEYFRAME_GAP_WARNING_SECONDS,
|
||||
"error": segment_time,
|
||||
}
|
||||
|
||||
if len(keyframe_pts) < 2:
|
||||
return {
|
||||
"keyframe_count": len(keyframe_pts),
|
||||
"max_gap": None,
|
||||
"mean_gap": None,
|
||||
"min_gap": None,
|
||||
"segment_time": segment_time,
|
||||
"severity": "unknown",
|
||||
"thresholds": thresholds,
|
||||
}
|
||||
|
||||
gaps = [b - a for a, b in zip(keyframe_pts, keyframe_pts[1:])]
|
||||
max_gap = max(gaps)
|
||||
|
||||
if max_gap > segment_time:
|
||||
severity = "error"
|
||||
elif max_gap > KEYFRAME_GAP_WARNING_SECONDS:
|
||||
severity = "warning"
|
||||
else:
|
||||
severity = "ok"
|
||||
|
||||
return {
|
||||
"keyframe_count": len(keyframe_pts),
|
||||
"max_gap": round(max_gap, 2),
|
||||
"mean_gap": round(sum(gaps) / len(gaps), 2),
|
||||
"min_gap": round(min(gaps), 2),
|
||||
"segment_time": segment_time,
|
||||
"severity": severity,
|
||||
"thresholds": thresholds,
|
||||
}
|
||||
|
||||
|
||||
async def analyze_record_keyframes(
|
||||
ffmpeg, url: str, segment_time: int, window: int = KEYFRAME_PROBE_WINDOW_SECONDS
|
||||
) -> dict[str, Any]:
|
||||
"""Probe a stream for ~`window` seconds and classify its keyframe spacing.
|
||||
|
||||
Reads video packet flags via ffprobe to find keyframes, then measures the
|
||||
gaps between them. On timeout or failure returns an "unknown" result rather
|
||||
than a false all-clear.
|
||||
"""
|
||||
clean_url = escape_special_characters(url)
|
||||
cmd = [
|
||||
ffmpeg.ffprobe_path,
|
||||
"-v",
|
||||
"error",
|
||||
"-select_streams",
|
||||
"v:0",
|
||||
"-read_intervals",
|
||||
f"%+{window}",
|
||||
"-show_entries",
|
||||
"packet=pts_time,flags",
|
||||
"-of",
|
||||
"csv=p=0",
|
||||
clean_url,
|
||||
]
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=window + 15)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Keyframe probe timed out for record stream")
|
||||
proc.kill()
|
||||
return classify_keyframe_gaps([], segment_time)
|
||||
except OSError as err:
|
||||
logger.error("Keyframe probe failed: %s", err)
|
||||
return classify_keyframe_gaps([], segment_time)
|
||||
|
||||
keyframe_pts, max_pts = parse_keyframe_packets(stdout.decode("utf-8", "replace"))
|
||||
result = classify_keyframe_gaps(keyframe_pts, segment_time)
|
||||
result["duration_observed"] = round(max_pts, 2) if max_pts is not None else None
|
||||
return result
|
||||
|
||||
|
||||
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
|
||||
"""Run vainfo."""
|
||||
if not device_name:
|
||||
|
||||
@ -14,6 +14,7 @@ from frigate.camera import CameraMetrics, PTZMetrics
|
||||
from frigate.comms.inter_process import InterProcessRequestor
|
||||
from frigate.config import CameraConfig, DetectConfig, LoggerConfig, ModelConfig
|
||||
from frigate.config.camera.camera import CameraTypeEnum
|
||||
from frigate.config.camera.motion import MotionSourceEnum
|
||||
from frigate.config.camera.updater import (
|
||||
CameraConfigUpdateEnum,
|
||||
CameraConfigUpdateSubscriber,
|
||||
@ -300,7 +301,22 @@ def process_frames(
|
||||
continue
|
||||
|
||||
# look for motion if enabled
|
||||
motion_boxes = motion_detector.detect(frame)
|
||||
if camera_config.motion.source == MotionSourceEnum.onvif:
|
||||
# Motion is supplied by an external ONVIF cell-motion subscriber
|
||||
# writing to camera_metrics. Skip the per-frame internal detector.
|
||||
if camera_metrics.external_motion_active.value:
|
||||
boxes = list(camera_metrics.external_motion_boxes)
|
||||
if boxes:
|
||||
motion_boxes = [tuple(b) for b in boxes]
|
||||
else:
|
||||
# Active but no spatial data yet — fall back to full frame
|
||||
# so downstream region clustering still has something to
|
||||
# scan.
|
||||
motion_boxes = [(0, 0, frame_shape[1] - 1, frame_shape[0] - 1)]
|
||||
else:
|
||||
motion_boxes = []
|
||||
else:
|
||||
motion_boxes = motion_detector.detect(frame)
|
||||
|
||||
regions = []
|
||||
consolidated_detections = []
|
||||
|
||||
@ -24,7 +24,7 @@ from frigate.config.camera.updater import (
|
||||
)
|
||||
from frigate.const import PROCESS_PRIORITY_HIGH
|
||||
from frigate.log import LogPipe
|
||||
from frigate.util.builtin import EventsPerSecond, get_ffmpeg_arg_list
|
||||
from frigate.util.builtin import EventsPerSecond, get_record_segment_time
|
||||
from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg
|
||||
from frigate.util.image import (
|
||||
FrameManager,
|
||||
@ -34,23 +34,6 @@ from frigate.util.process import FrigateProcess
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# all built-in record presets use this segment_time
|
||||
DEFAULT_RECORD_SEGMENT_TIME = 10
|
||||
|
||||
|
||||
def _get_record_segment_time(config: CameraConfig) -> int:
|
||||
"""Extract -segment_time from the camera's record output args."""
|
||||
record_args = get_ffmpeg_arg_list(config.ffmpeg.output_args.record)
|
||||
|
||||
if record_args and record_args[0].startswith("preset"):
|
||||
return DEFAULT_RECORD_SEGMENT_TIME
|
||||
|
||||
try:
|
||||
idx = record_args.index("-segment_time")
|
||||
return int(record_args[idx + 1])
|
||||
except (ValueError, IndexError):
|
||||
return DEFAULT_RECORD_SEGMENT_TIME
|
||||
|
||||
|
||||
def capture_frames(
|
||||
ffmpeg_process: sp.Popen[Any],
|
||||
@ -185,7 +168,7 @@ class CameraWatchdog(threading.Thread):
|
||||
# `valid` segments are published with the segment's start time, so the
|
||||
# gap between consecutive publishes can reach 2 * segment_time. Pad the
|
||||
# staleness threshold so it's never tighter than that worst case.
|
||||
segment_time = _get_record_segment_time(self.config)
|
||||
segment_time = get_record_segment_time(self.config)
|
||||
self.record_stale_threshold = max(120, 2 * segment_time + 30)
|
||||
|
||||
# Stall tracking (based on last processed frame)
|
||||
|
||||
4
web/.gitignore
vendored
4
web/.gitignore
vendored
@ -12,6 +12,10 @@ dist
|
||||
dist-ssr
|
||||
*.local
|
||||
|
||||
# Playwright
|
||||
playwright-report
|
||||
test-results
|
||||
|
||||
# Editor directories and files
|
||||
.vscode/*
|
||||
!.vscode/extensions.json
|
||||
|
||||
@ -92,6 +92,15 @@ test.describe("Chat — streaming @medium", () => {
|
||||
await installChatStreamOverride(frigateApp, [
|
||||
{ type: "content", delta: "Hel" },
|
||||
{ type: "content", delta: "lo" },
|
||||
{
|
||||
type: "messages",
|
||||
messages: [
|
||||
{ role: "system", content: "sys" },
|
||||
{ role: "user", content: "hello chat" },
|
||||
{ role: "assistant", content: "Hello" },
|
||||
],
|
||||
},
|
||||
{ type: "done" },
|
||||
]);
|
||||
await frigateApp.goto("/chat");
|
||||
const input = frigateApp.page.getByPlaceholder(/ask/i);
|
||||
@ -137,6 +146,15 @@ test.describe("Chat — streaming @medium", () => {
|
||||
{ type: "content", delta: "Hel" },
|
||||
{ type: "content", delta: "lo, " },
|
||||
{ type: "content", delta: "world!" },
|
||||
{
|
||||
type: "messages",
|
||||
messages: [
|
||||
{ role: "system", content: "sys" },
|
||||
{ role: "user", content: "greet me" },
|
||||
{ role: "assistant", content: "Hello, world!" },
|
||||
],
|
||||
},
|
||||
{ type: "done" },
|
||||
],
|
||||
{ chunkDelayMs: 50 },
|
||||
);
|
||||
@ -151,19 +169,39 @@ test.describe("Chat — streaming @medium", () => {
|
||||
});
|
||||
});
|
||||
|
||||
test("tool_calls chunks render a ToolCallsGroup", async ({ frigateApp }) => {
|
||||
await installChatStreamOverride(frigateApp, [
|
||||
test("tool calls in the chain render a ToolCallsGroup", async ({
|
||||
frigateApp,
|
||||
}) => {
|
||||
const toolTurn = [
|
||||
{ role: "system", content: "sys" },
|
||||
{ role: "user", content: "find people" },
|
||||
{
|
||||
type: "tool_calls",
|
||||
role: "assistant",
|
||||
content: null,
|
||||
tool_calls: [
|
||||
{
|
||||
id: "call_1",
|
||||
name: "search_objects",
|
||||
arguments: { label: "person" },
|
||||
type: "function",
|
||||
function: {
|
||||
name: "search_objects",
|
||||
arguments: '{"label":"person"}',
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
{ role: "tool", tool_call_id: "call_1", content: "[]" },
|
||||
];
|
||||
await installChatStreamOverride(frigateApp, [
|
||||
{ type: "messages", messages: toolTurn },
|
||||
{ type: "content", delta: "Searching for people." },
|
||||
{
|
||||
type: "messages",
|
||||
messages: [
|
||||
...toolTurn,
|
||||
{ role: "assistant", content: "Searching for people." },
|
||||
],
|
||||
},
|
||||
{ type: "done" },
|
||||
]);
|
||||
await frigateApp.goto("/chat");
|
||||
const input = frigateApp.page.getByPlaceholder(/ask/i);
|
||||
@ -253,6 +291,15 @@ test.describe("Chat — attachment chip @medium", () => {
|
||||
// We use the stream override so the first message completes quickly.
|
||||
await installChatStreamOverride(frigateApp, [
|
||||
{ type: "content", delta: "Done." },
|
||||
{
|
||||
type: "messages",
|
||||
messages: [
|
||||
{ role: "system", content: "sys" },
|
||||
{ role: "user", content: "hello" },
|
||||
{ role: "assistant", content: "Done." },
|
||||
],
|
||||
},
|
||||
{ type: "done" },
|
||||
]);
|
||||
await frigateApp.goto("/chat");
|
||||
|
||||
|
||||
@ -262,6 +262,10 @@
|
||||
"label": "Enable motion detection",
|
||||
"description": "Enable or disable motion detection for this camera."
|
||||
},
|
||||
"source": {
|
||||
"label": "Motion source",
|
||||
"description": "Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled)."
|
||||
},
|
||||
"threshold": {
|
||||
"label": "Motion threshold",
|
||||
"description": "Pixel difference threshold used by the motion detector; higher values reduce sensitivity (range 1-255)."
|
||||
@ -843,6 +847,22 @@
|
||||
"description": "Internal field to track whether autotracking was enabled in configuration."
|
||||
}
|
||||
},
|
||||
"events": {
|
||||
"label": "ONVIF events",
|
||||
"description": "Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
|
||||
"enabled": {
|
||||
"label": "Enable ONVIF events",
|
||||
"description": "Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal."
|
||||
},
|
||||
"subscription_timeout": {
|
||||
"label": "Subscription timeout",
|
||||
"description": "Seconds before the PullPoint subscription expires and is renewed."
|
||||
},
|
||||
"use_metadata_stream": {
|
||||
"label": "Use metadata stream",
|
||||
"description": "Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track."
|
||||
}
|
||||
},
|
||||
"ignore_time_mismatch": {
|
||||
"label": "Ignore time mismatch",
|
||||
"description": "Ignore time synchronization differences between camera and Frigate server for ONVIF communication."
|
||||
|
||||
@ -769,6 +769,10 @@
|
||||
"label": "Enable motion detection",
|
||||
"description": "Enable or disable motion detection for all cameras; can be overridden per-camera."
|
||||
},
|
||||
"source": {
|
||||
"label": "Motion source",
|
||||
"description": "Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled)."
|
||||
},
|
||||
"threshold": {
|
||||
"label": "Motion threshold",
|
||||
"description": "Pixel difference threshold used by the motion detector; higher values reduce sensitivity (range 1-255)."
|
||||
@ -1623,6 +1627,22 @@
|
||||
"description": "Internal field to track whether autotracking was enabled in configuration."
|
||||
}
|
||||
},
|
||||
"events": {
|
||||
"label": "ONVIF events",
|
||||
"description": "Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
|
||||
"enabled": {
|
||||
"label": "Enable ONVIF events",
|
||||
"description": "Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal."
|
||||
},
|
||||
"subscription_timeout": {
|
||||
"label": "Subscription timeout",
|
||||
"description": "Seconds before the PullPoint subscription expires and is renewed."
|
||||
},
|
||||
"use_metadata_stream": {
|
||||
"label": "Use metadata stream",
|
||||
"description": "Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track."
|
||||
}
|
||||
},
|
||||
"ignore_time_mismatch": {
|
||||
"label": "Ignore time mismatch",
|
||||
"description": "Ignore time synchronization differences between camera and Frigate server for ONVIF communication."
|
||||
|
||||
@ -174,6 +174,21 @@
|
||||
"error": "Error: {{error}}",
|
||||
"tips": {
|
||||
"title": "Camera Probe Info"
|
||||
},
|
||||
"keyframes": {
|
||||
"title": "Keyframe analysis",
|
||||
"analyzing": "Analyzing keyframes... {{seconds}} seconds remaining",
|
||||
"stillAnalyzing": "Still analyzing keyframes...",
|
||||
"recordStream": "Record stream:",
|
||||
"keyframeCount": "Keyframes observed:",
|
||||
"observedDuration": "Observed duration:",
|
||||
"gap": "Keyframe gap (min / avg / max):",
|
||||
"segmentLength": "Recording segment length:",
|
||||
"ok": "Keyframes every ~{{seconds}}s, good for recording and playback.",
|
||||
"warning": "Sparse or variable keyframes (longest gap ~{{seconds}}s), likely a smart codec (H.264+/H.265+), this is not recommended.",
|
||||
"error": "Keyframe gap (~{{seconds}}s) exceeds the recording segment length ({{segmentTime}}s). Some segments may have no keyframe, which breaks playback. Disable the smart/+ codec on the camera or shorten its keyframe interval.",
|
||||
"unknown": "Couldn't determine keyframe spacing.",
|
||||
"recordDisabled": "Recording is disabled for this camera."
|
||||
}
|
||||
},
|
||||
"framesAndDetections": "Frames / Detections",
|
||||
|
||||
@ -7,7 +7,8 @@ import {
|
||||
DialogTitle,
|
||||
} from "../ui/dialog";
|
||||
import ActivityIndicator from "../indicators/activity-indicator";
|
||||
import { Ffprobe } from "@/types/stats";
|
||||
import KeyframeAnalysisSection from "./KeyframeAnalysisSection";
|
||||
import { Ffprobe, KeyframeAnalysis } from "@/types/stats";
|
||||
import { Button } from "../ui/button";
|
||||
import copy from "copy-to-clipboard";
|
||||
import { CameraConfig } from "@/types/frigateConfig";
|
||||
@ -30,6 +31,7 @@ export default function CameraInfoDialog({
|
||||
}: CameraInfoDialogProps) {
|
||||
const { t } = useTranslation(["views/system"]);
|
||||
const [ffprobeInfo, setFfprobeInfo] = useState<Ffprobe[]>();
|
||||
const [keyframeInfo, setKeyframeInfo] = useState<KeyframeAnalysis>();
|
||||
|
||||
useEffect(() => {
|
||||
axios
|
||||
@ -67,7 +69,12 @@ export default function CameraInfoDialog({
|
||||
}, []);
|
||||
|
||||
const onCopyFfprobe = async () => {
|
||||
copy(JSON.stringify(ffprobeInfo));
|
||||
copy(
|
||||
JSON.stringify({
|
||||
ffprobe: ffprobeInfo,
|
||||
keyframe_analysis: keyframeInfo,
|
||||
}),
|
||||
);
|
||||
toast.success(t("cameras.toast.success.copyToClipboard"));
|
||||
};
|
||||
|
||||
@ -96,7 +103,7 @@ export default function CameraInfoDialog({
|
||||
<Trans ns="views/system">cameras.info.streamDataFromFFPROBE</Trans>
|
||||
</DialogDescription>
|
||||
|
||||
<div className="mb-2 p-4">
|
||||
<div className="mb-2 p-4 text-sm">
|
||||
{ffprobeInfo ? (
|
||||
<div>
|
||||
{ffprobeInfo.map((stream, idx) => (
|
||||
@ -184,6 +191,10 @@ export default function CameraInfoDialog({
|
||||
)}
|
||||
</div>
|
||||
))}
|
||||
<KeyframeAnalysisSection
|
||||
cameraName={camera.name}
|
||||
onResult={setKeyframeInfo}
|
||||
/>
|
||||
</div>
|
||||
) : (
|
||||
<div className="flex flex-col items-center">
|
||||
|
||||
193
web/src/components/overlay/KeyframeAnalysisSection.tsx
Normal file
193
web/src/components/overlay/KeyframeAnalysisSection.tsx
Normal file
@ -0,0 +1,193 @@
|
||||
import { useEffect, useMemo, useState } from "react";
|
||||
import { useTranslation } from "react-i18next";
|
||||
import axios from "axios";
|
||||
import { FaCircleCheck, FaTriangleExclamation } from "react-icons/fa6";
|
||||
import { LuX } from "react-icons/lu";
|
||||
import ActivityIndicator from "../indicators/activity-indicator";
|
||||
import { KeyframeAnalysis } from "@/types/stats";
|
||||
|
||||
const PROBE_WINDOW_SECONDS = 20;
|
||||
|
||||
type KeyframeAnalysisSectionProps = {
|
||||
cameraName: string;
|
||||
onResult?: (analysis: KeyframeAnalysis) => void;
|
||||
};
|
||||
|
||||
export default function KeyframeAnalysisSection({
|
||||
cameraName,
|
||||
onResult,
|
||||
}: KeyframeAnalysisSectionProps) {
|
||||
const { t } = useTranslation(["views/system"]);
|
||||
const [analysis, setAnalysis] = useState<KeyframeAnalysis>();
|
||||
const [failed, setFailed] = useState(false);
|
||||
const [secondsRemaining, setSecondsRemaining] =
|
||||
useState(PROBE_WINDOW_SECONDS);
|
||||
|
||||
// fire the probe once on mount
|
||||
useEffect(() => {
|
||||
let active = true;
|
||||
axios
|
||||
.get("keyframe_analysis", { params: { camera: cameraName } })
|
||||
.then((res) => {
|
||||
if (active) {
|
||||
setAnalysis(res.data);
|
||||
onResult?.(res.data);
|
||||
}
|
||||
})
|
||||
.catch(() => {
|
||||
if (active) {
|
||||
setFailed(true);
|
||||
}
|
||||
});
|
||||
return () => {
|
||||
active = false;
|
||||
};
|
||||
// re-probing only depends on the camera; onResult is a stable setter
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [cameraName]);
|
||||
|
||||
// countdown while waiting for the probe to return
|
||||
useEffect(() => {
|
||||
if (analysis || failed) {
|
||||
return;
|
||||
}
|
||||
const interval = setInterval(() => {
|
||||
setSecondsRemaining((s) => (s > 0 ? s - 1 : 0));
|
||||
}, 1000);
|
||||
return () => clearInterval(interval);
|
||||
}, [analysis, failed]);
|
||||
|
||||
const content = useMemo(() => {
|
||||
if (failed) {
|
||||
return <Row icon="unknown">{t("cameras.info.keyframes.unknown")}</Row>;
|
||||
}
|
||||
|
||||
if (!analysis) {
|
||||
return (
|
||||
<div className="flex items-center gap-2 text-muted-foreground">
|
||||
<ActivityIndicator className="size-4" />
|
||||
<span>
|
||||
{secondsRemaining > 0
|
||||
? t("cameras.info.keyframes.analyzing", {
|
||||
seconds: secondsRemaining,
|
||||
})
|
||||
: t("cameras.info.keyframes.stillAnalyzing")}
|
||||
</span>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
let summary;
|
||||
switch (analysis.severity) {
|
||||
case "ok":
|
||||
summary = (
|
||||
<Row icon="ok">
|
||||
{t("cameras.info.keyframes.ok", { seconds: analysis.mean_gap })}
|
||||
</Row>
|
||||
);
|
||||
break;
|
||||
case "warning":
|
||||
summary = (
|
||||
<Row icon="warning">
|
||||
{t("cameras.info.keyframes.warning", { seconds: analysis.max_gap })}
|
||||
</Row>
|
||||
);
|
||||
break;
|
||||
case "error":
|
||||
summary = (
|
||||
<Row icon="error">
|
||||
{t("cameras.info.keyframes.error", {
|
||||
seconds: analysis.max_gap,
|
||||
segmentTime: analysis.segment_time,
|
||||
})}
|
||||
</Row>
|
||||
);
|
||||
break;
|
||||
case "record_disabled":
|
||||
summary = (
|
||||
<Row icon="unknown">{t("cameras.info.keyframes.recordDisabled")}</Row>
|
||||
);
|
||||
break;
|
||||
default:
|
||||
summary = (
|
||||
<Row icon="unknown">{t("cameras.info.keyframes.unknown")}</Row>
|
||||
);
|
||||
}
|
||||
|
||||
// gap statistics are only meaningful once at least two keyframes were seen
|
||||
const hasStats = analysis.max_gap != null;
|
||||
const hasDetails = hasStats || analysis.stream_index != null;
|
||||
|
||||
return (
|
||||
<div className="text-muted-foreground">
|
||||
{analysis.stream_index != null && (
|
||||
<div>
|
||||
{t("cameras.info.keyframes.recordStream")}{" "}
|
||||
<span className="text-primary">
|
||||
{t("cameras.info.stream", { idx: analysis.stream_index + 1 })}
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
{hasStats && (
|
||||
<div>
|
||||
<div>
|
||||
{t("cameras.info.keyframes.keyframeCount")}{" "}
|
||||
<span className="text-primary">{analysis.keyframe_count}</span>
|
||||
</div>
|
||||
<div>
|
||||
{t("cameras.info.keyframes.observedDuration")}{" "}
|
||||
<span className="text-primary">
|
||||
{analysis.duration_observed}s
|
||||
</span>
|
||||
</div>
|
||||
<div>
|
||||
{t("cameras.info.keyframes.gap")}{" "}
|
||||
<span className="text-primary">
|
||||
{analysis.min_gap}s / {analysis.mean_gap}s / {analysis.max_gap}s
|
||||
</span>
|
||||
</div>
|
||||
<div>
|
||||
{t("cameras.info.keyframes.segmentLength")}{" "}
|
||||
<span className="text-primary">{analysis.segment_time}s</span>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
<div className={hasDetails ? "mt-3" : undefined}>{summary}</div>
|
||||
</div>
|
||||
);
|
||||
}, [analysis, failed, secondsRemaining, t]);
|
||||
|
||||
return (
|
||||
<div className="mb-5">
|
||||
<div className="mb-1 rounded-md bg-secondary p-2 text-lg text-primary">
|
||||
{t("cameras.info.keyframes.title")}
|
||||
</div>
|
||||
<div className="ml-2">{content}</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
type RowProps = {
|
||||
icon: "ok" | "warning" | "error" | "unknown";
|
||||
children: React.ReactNode;
|
||||
};
|
||||
|
||||
function Row({ icon, children }: RowProps) {
|
||||
return (
|
||||
<div className="flex items-start gap-2">
|
||||
{icon === "ok" && (
|
||||
<FaCircleCheck className="mt-0.5 size-4 flex-shrink-0 text-success" />
|
||||
)}
|
||||
{icon === "warning" && (
|
||||
<FaTriangleExclamation className="mt-0.5 size-4 flex-shrink-0 text-yellow-500" />
|
||||
)}
|
||||
{icon === "error" && (
|
||||
<LuX className="mt-0.5 size-4 flex-shrink-0 text-danger" />
|
||||
)}
|
||||
{icon === "unknown" && (
|
||||
<FaTriangleExclamation className="mt-0.5 size-4 flex-shrink-0 text-muted-foreground" />
|
||||
)}
|
||||
<span className="text-primary">{children}</span>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@ -13,6 +13,7 @@ import { ChatComposer } from "@/components/chat/ChatComposer";
|
||||
import ChatSettings from "@/components/chat/ChatSettings";
|
||||
import type {
|
||||
ChatMessage,
|
||||
ChatStats,
|
||||
GenAIModelsResponse,
|
||||
ShowStatsMode,
|
||||
} from "@/types/chat";
|
||||
@ -22,12 +23,28 @@ import {
|
||||
getFindSimilarObjectsFromToolCalls,
|
||||
prependAttachment,
|
||||
streamChatCompletion,
|
||||
toolCallsForMessage,
|
||||
toolResponsesById,
|
||||
} from "@/utils/chatUtil";
|
||||
|
||||
type StreamingTurn = {
|
||||
content: string;
|
||||
reasoning: string;
|
||||
chain: ChatMessage[];
|
||||
stats?: ChatStats;
|
||||
};
|
||||
|
||||
const hasText = (content: unknown): content is string =>
|
||||
typeof content === "string" && content.trim().length > 0;
|
||||
|
||||
const toWire = (messages: ChatMessage[]): ChatMessage[] =>
|
||||
messages.map(({ reasoning: _r, stats: _s, ...rest }) => rest);
|
||||
|
||||
export default function ChatPage() {
|
||||
const { t } = useTranslation(["views/chat"]);
|
||||
const [input, setInput] = useState("");
|
||||
const [messages, setMessages] = useState<ChatMessage[]>([]);
|
||||
const [streaming, setStreaming] = useState<StreamingTurn | null>(null);
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [attachedEventId, setAttachedEventId] = useState<string | null>(null);
|
||||
@ -72,28 +89,19 @@ export default function ChatPage() {
|
||||
if (isNearBottom) {
|
||||
el.scrollTo({ top: el.scrollHeight, behavior: "smooth" });
|
||||
}
|
||||
}, [messages, autoScroll]);
|
||||
}, [messages, streaming, autoScroll]);
|
||||
|
||||
const submitConversation = useCallback(
|
||||
async (messagesToSend: ChatMessage[]) => {
|
||||
if (isLoading) return;
|
||||
const last = messagesToSend[messagesToSend.length - 1];
|
||||
if (!last || last.role !== "user" || !last.content.trim()) return;
|
||||
if (!last || last.role !== "user" || !hasText(last.content)) return;
|
||||
|
||||
setError(null);
|
||||
const assistantPlaceholder: ChatMessage = {
|
||||
role: "assistant",
|
||||
content: "",
|
||||
toolCalls: undefined,
|
||||
};
|
||||
setMessages([...messagesToSend, assistantPlaceholder]);
|
||||
setMessages(messagesToSend);
|
||||
setStreaming({ content: "", reasoning: "", chain: [] });
|
||||
setIsLoading(true);
|
||||
|
||||
const apiMessages = messagesToSend.map((m) => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
}));
|
||||
|
||||
const baseURL = axios.defaults.baseURL ?? "";
|
||||
const url = `${baseURL}chat/completion`;
|
||||
const headers: Record<string, string> = {
|
||||
@ -104,16 +112,50 @@ export default function ChatPage() {
|
||||
const controller = new AbortController();
|
||||
abortRef.current = controller;
|
||||
|
||||
let chain: ChatMessage[] = [];
|
||||
let stats: ChatStats | undefined;
|
||||
let reasoning = "";
|
||||
let hadError = false;
|
||||
|
||||
await streamChatCompletion(
|
||||
url,
|
||||
headers,
|
||||
apiMessages,
|
||||
toWire(messagesToSend),
|
||||
{
|
||||
updateMessages: (updater) => setMessages(updater),
|
||||
onError: (message) => setError(message),
|
||||
onContentDelta: (delta) =>
|
||||
setStreaming((s) => (s ? { ...s, content: s.content + delta } : s)),
|
||||
onReasoningDelta: (delta) => {
|
||||
reasoning += delta;
|
||||
setStreaming((s) =>
|
||||
s ? { ...s, reasoning: s.reasoning + delta } : s,
|
||||
);
|
||||
},
|
||||
onChain: (fullChain) => {
|
||||
chain = fullChain;
|
||||
setStreaming((s) => (s ? { ...s, chain: fullChain } : s));
|
||||
},
|
||||
onStats: (s) => {
|
||||
stats = s;
|
||||
setStreaming((cur) => (cur ? { ...cur, stats: s } : cur));
|
||||
},
|
||||
onError: (message) => {
|
||||
hadError = true;
|
||||
setError(message);
|
||||
},
|
||||
onDone: () => {
|
||||
abortRef.current = null;
|
||||
setIsLoading(false);
|
||||
setStreaming(null);
|
||||
const lastMsg = chain[chain.length - 1];
|
||||
if (!hadError && lastMsg?.role === "assistant") {
|
||||
setMessages(
|
||||
chain.map((m, i) =>
|
||||
i === chain.length - 1
|
||||
? { ...m, reasoning: reasoning || undefined, stats }
|
||||
: m,
|
||||
),
|
||||
);
|
||||
}
|
||||
},
|
||||
defaultErrorMessage: t("error"),
|
||||
},
|
||||
@ -125,12 +167,14 @@ export default function ChatPage() {
|
||||
);
|
||||
|
||||
const recentEventIds = useMemo(() => {
|
||||
const responses = toolResponsesById(messages);
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const msg = messages[i];
|
||||
if (msg.role !== "assistant" || !msg.toolCalls) continue;
|
||||
const similar = getFindSimilarObjectsFromToolCalls(msg.toolCalls);
|
||||
if (msg.role !== "assistant" || !msg.tool_calls?.length) continue;
|
||||
const calls = toolCallsForMessage(msg, responses);
|
||||
const similar = getFindSimilarObjectsFromToolCalls(calls);
|
||||
if (similar) return similar.results.map((e) => e.id);
|
||||
const events = getEventIdsFromSearchObjectsToolCalls(msg.toolCalls);
|
||||
const events = getEventIdsFromSearchObjectsToolCalls(calls);
|
||||
if (events.length > 0) return events.map((e) => e.id);
|
||||
}
|
||||
return [];
|
||||
@ -154,12 +198,14 @@ export default function ChatPage() {
|
||||
abortRef.current?.abort();
|
||||
abortRef.current = null;
|
||||
setIsLoading(false);
|
||||
setStreaming(null);
|
||||
}, []);
|
||||
|
||||
const startNewChat = useCallback(() => {
|
||||
abortRef.current?.abort();
|
||||
abortRef.current = null;
|
||||
setIsLoading(false);
|
||||
setStreaming(null);
|
||||
setMessages([]);
|
||||
setInput("");
|
||||
setAttachedEventId(null);
|
||||
@ -181,7 +227,83 @@ export default function ChatPage() {
|
||||
setAttachedEventId(null);
|
||||
}, []);
|
||||
|
||||
const hasStarted = messages.length > 0;
|
||||
const hasStarted = messages.length > 0 || streaming != null;
|
||||
|
||||
// While streaming, the backend's in-flight chain is the source of truth;
|
||||
// otherwise the committed conversation is.
|
||||
const renderList =
|
||||
streaming && streaming.chain.length ? streaming.chain : messages;
|
||||
const responses = toolResponsesById(renderList);
|
||||
const renderTail = renderList[renderList.length - 1];
|
||||
const finalShown =
|
||||
renderTail?.role === "assistant" && hasText(renderTail.content);
|
||||
|
||||
const renderMessage = (msg: ChatMessage, i: number) => {
|
||||
if (msg.role === "system" || msg.role === "tool") return null;
|
||||
|
||||
if (msg.role === "user") {
|
||||
if (!hasText(msg.content)) return null;
|
||||
return (
|
||||
<div key={i} className="flex flex-col gap-2">
|
||||
<MessageBubble
|
||||
role="user"
|
||||
content={msg.content}
|
||||
messageIndex={i}
|
||||
onEditSubmit={handleEditSubmit}
|
||||
isComplete
|
||||
showStats={showStats}
|
||||
/>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
const calls = toolCallsForMessage(msg, responses);
|
||||
const contentText = hasText(msg.content) ? msg.content : "";
|
||||
const similar = getFindSimilarObjectsFromToolCalls(calls);
|
||||
const events = similar ? [] : getEventIdsFromSearchObjectsToolCalls(calls);
|
||||
|
||||
return (
|
||||
<div key={i} className="flex flex-col gap-2">
|
||||
{calls.length > 0 && <ToolCallsGroup toolCalls={calls} />}
|
||||
{hasText(msg.reasoning) && (
|
||||
<ReasoningBubble
|
||||
reasoning={msg.reasoning}
|
||||
answerStarted={!!contentText}
|
||||
/>
|
||||
)}
|
||||
{contentText && (
|
||||
<MessageBubble
|
||||
role="assistant"
|
||||
content={contentText}
|
||||
messageIndex={i}
|
||||
isComplete
|
||||
stats={msg.stats}
|
||||
showStats={showStats}
|
||||
/>
|
||||
)}
|
||||
{similar ? (
|
||||
<ChatEventThumbnailsRow
|
||||
events={similar.results}
|
||||
anchor={similar.anchor}
|
||||
onAttach={setAttachedEventId}
|
||||
/>
|
||||
) : (
|
||||
<ChatEventThumbnailsRow
|
||||
events={events}
|
||||
onAttach={setAttachedEventId}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
const processingDots = (
|
||||
<div className="flex items-center gap-2 self-start rounded-2xl bg-muted px-5 py-4">
|
||||
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.32s]" />
|
||||
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.16s]" />
|
||||
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60" />
|
||||
</div>
|
||||
);
|
||||
|
||||
return (
|
||||
<div className="flex size-full flex-col">
|
||||
@ -212,102 +334,31 @@ export default function ChatPage() {
|
||||
<div className="flex w-full flex-col xl:w-[50%] 3xl:w-[35%]">
|
||||
{hasStarted ? (
|
||||
<div className="flex w-full flex-1 flex-col gap-3 pb-3">
|
||||
{messages.map((msg, i) => {
|
||||
const isLastAssistant =
|
||||
i === messages.length - 1 && msg.role === "assistant";
|
||||
const isComplete =
|
||||
msg.role === "user" || !isLoading || !isLastAssistant;
|
||||
const hasToolCalls =
|
||||
msg.toolCalls && msg.toolCalls.length > 0;
|
||||
const hasContent = !!msg.content?.trim();
|
||||
const hasReasoning = !!msg.reasoning?.trim();
|
||||
const showProcessing =
|
||||
isLastAssistant &&
|
||||
isLoading &&
|
||||
!hasContent &&
|
||||
!hasReasoning;
|
||||
|
||||
// Hide empty placeholder only when there are no tool calls
|
||||
// and no reasoning streaming yet
|
||||
if (
|
||||
isLastAssistant &&
|
||||
isLoading &&
|
||||
!hasContent &&
|
||||
!hasToolCalls &&
|
||||
!hasReasoning
|
||||
)
|
||||
return (
|
||||
<div
|
||||
key={i}
|
||||
className="flex items-center gap-2 self-start rounded-2xl bg-muted px-5 py-4"
|
||||
>
|
||||
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.32s]" />
|
||||
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.16s]" />
|
||||
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60" />
|
||||
</div>
|
||||
);
|
||||
|
||||
return (
|
||||
<div key={i} className="flex flex-col gap-2">
|
||||
{msg.role === "assistant" && hasToolCalls && (
|
||||
<ToolCallsGroup toolCalls={msg.toolCalls!} />
|
||||
)}
|
||||
{msg.role === "assistant" && hasReasoning && (
|
||||
{renderList.map((msg, i) => renderMessage(msg, i))}
|
||||
{streaming &&
|
||||
!finalShown &&
|
||||
(streaming.content || streaming.reasoning ? (
|
||||
<div className="flex flex-col gap-2">
|
||||
{hasText(streaming.reasoning) && (
|
||||
<ReasoningBubble
|
||||
reasoning={msg.reasoning!}
|
||||
answerStarted={hasContent}
|
||||
reasoning={streaming.reasoning}
|
||||
answerStarted={!!streaming.content}
|
||||
/>
|
||||
)}
|
||||
{showProcessing ? (
|
||||
<div className="flex items-center gap-2 self-start rounded-2xl bg-muted px-5 py-4">
|
||||
<span className="size-2 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.3s]" />
|
||||
<span className="size-2 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.15s]" />
|
||||
<span className="size-2 animate-bounce rounded-full bg-muted-foreground/60" />
|
||||
</div>
|
||||
) : msg.role === "assistant" &&
|
||||
!hasContent &&
|
||||
hasReasoning &&
|
||||
!isComplete ? null : (
|
||||
{streaming.content && (
|
||||
<MessageBubble
|
||||
role={msg.role}
|
||||
content={msg.content}
|
||||
messageIndex={i}
|
||||
onEditSubmit={
|
||||
msg.role === "user" ? handleEditSubmit : undefined
|
||||
}
|
||||
isComplete={isComplete}
|
||||
stats={msg.stats}
|
||||
role="assistant"
|
||||
content={streaming.content}
|
||||
messageIndex={-1}
|
||||
isComplete={false}
|
||||
stats={streaming.stats}
|
||||
showStats={showStats}
|
||||
/>
|
||||
)}
|
||||
{msg.role === "assistant" &&
|
||||
isComplete &&
|
||||
(() => {
|
||||
const similar = getFindSimilarObjectsFromToolCalls(
|
||||
msg.toolCalls,
|
||||
);
|
||||
if (similar) {
|
||||
return (
|
||||
<ChatEventThumbnailsRow
|
||||
events={similar.results}
|
||||
anchor={similar.anchor}
|
||||
onAttach={setAttachedEventId}
|
||||
/>
|
||||
);
|
||||
}
|
||||
const events = getEventIdsFromSearchObjectsToolCalls(
|
||||
msg.toolCalls,
|
||||
);
|
||||
return (
|
||||
<ChatEventThumbnailsRow
|
||||
events={events}
|
||||
onAttach={setAttachedEventId}
|
||||
/>
|
||||
);
|
||||
})()}
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
) : (
|
||||
processingDots
|
||||
))}
|
||||
{error && (
|
||||
<p
|
||||
className="flex items-center gap-1.5 self-start text-sm text-destructive"
|
||||
|
||||
@ -1,17 +1,30 @@
|
||||
export type ToolCallFunction = {
|
||||
name: string;
|
||||
arguments: string;
|
||||
};
|
||||
|
||||
export type WireToolCall = {
|
||||
id: string;
|
||||
type?: string;
|
||||
function: ToolCallFunction;
|
||||
};
|
||||
|
||||
export type ChatMessage = {
|
||||
role: "system" | "user" | "assistant" | "tool";
|
||||
content: unknown;
|
||||
tool_call_id?: string;
|
||||
name?: string;
|
||||
tool_calls?: WireToolCall[];
|
||||
reasoning?: string;
|
||||
stats?: ChatStats;
|
||||
};
|
||||
|
||||
export type ToolCall = {
|
||||
name: string;
|
||||
arguments?: Record<string, unknown>;
|
||||
response?: string;
|
||||
};
|
||||
|
||||
export type ChatMessage = {
|
||||
role: "user" | "assistant";
|
||||
content: string;
|
||||
reasoning?: string;
|
||||
toolCalls?: ToolCall[];
|
||||
stats?: ChatStats;
|
||||
};
|
||||
|
||||
export type StartingRequest = {
|
||||
label: string;
|
||||
prompt: string;
|
||||
|
||||
@ -135,3 +135,22 @@ export type Ffprobe = {
|
||||
}[];
|
||||
};
|
||||
};
|
||||
|
||||
export type KeyframeSeverity =
|
||||
| "ok"
|
||||
| "warning"
|
||||
| "error"
|
||||
| "unknown"
|
||||
| "record_disabled";
|
||||
|
||||
export type KeyframeAnalysis = {
|
||||
severity: KeyframeSeverity;
|
||||
stream_index?: number;
|
||||
keyframe_count?: number;
|
||||
max_gap?: number | null;
|
||||
mean_gap?: number | null;
|
||||
min_gap?: number | null;
|
||||
duration_observed?: number | null;
|
||||
segment_time?: number;
|
||||
thresholds?: { warning: number; error: number };
|
||||
};
|
||||
|
||||
@ -1,16 +1,20 @@
|
||||
import type { ChatMessage, ChatStats, ToolCall } from "@/types/chat";
|
||||
|
||||
export type StreamChatCallbacks = {
|
||||
/** Update the messages array (e.g. pass to setState). */
|
||||
updateMessages: (updater: (prev: ChatMessage[]) => ChatMessage[]) => void;
|
||||
/** Streamed delta of the assistant's final answer text. */
|
||||
onContentDelta: (delta: string) => void;
|
||||
/** Streamed delta of the assistant's reasoning trace. */
|
||||
onReasoningDelta: (delta: string) => void;
|
||||
/** The full conversation chain so far (system message, history, this turn's
|
||||
* tool-call turns, tool results, and — on the final emission — the final
|
||||
* assistant message). */
|
||||
onChain: (chain: ChatMessage[]) => void;
|
||||
/** Token/timing stats for the turn. */
|
||||
onStats: (stats: ChatStats) => void;
|
||||
/** Called when the stream sends an error or fetch fails. */
|
||||
onError: (message: string) => void;
|
||||
/** Called when the stream finishes (success or error). */
|
||||
onDone: () => void;
|
||||
/** Called when the stream emits token/timing stats. The stats are also
|
||||
* attached to the last assistant message in updateMessages, so consumers
|
||||
* can usually rely on the message itself rather than wiring this up. */
|
||||
onStats?: (stats: ChatStats) => void;
|
||||
/** Message used when fetch throws and no server error is available. */
|
||||
defaultErrorMessage?: string;
|
||||
};
|
||||
@ -25,7 +29,7 @@ type StatsChunk = {
|
||||
|
||||
type StreamChunk =
|
||||
| { type: "error"; error: string }
|
||||
| { type: "tool_calls"; tool_calls: ToolCall[] }
|
||||
| { type: "messages"; messages: ChatMessage[] }
|
||||
| { type: "content"; delta: string }
|
||||
| { type: "reasoning"; delta: string }
|
||||
| StatsChunk;
|
||||
@ -41,16 +45,18 @@ export type StreamChatOptions = {
|
||||
export async function streamChatCompletion(
|
||||
url: string,
|
||||
headers: Record<string, string>,
|
||||
apiMessages: { role: string; content: string }[],
|
||||
apiMessages: ChatMessage[],
|
||||
callbacks: StreamChatCallbacks,
|
||||
signal?: AbortSignal,
|
||||
options: StreamChatOptions = {},
|
||||
): Promise<void> {
|
||||
const {
|
||||
updateMessages,
|
||||
onContentDelta,
|
||||
onReasoningDelta,
|
||||
onChain,
|
||||
onStats,
|
||||
onError,
|
||||
onDone,
|
||||
onStats,
|
||||
defaultErrorMessage = "Something went wrong. Please try again.",
|
||||
} = callbacks;
|
||||
|
||||
@ -91,65 +97,27 @@ export async function streamChatCompletion(
|
||||
const applyChunk = (data: StreamChunk) => {
|
||||
if (data.type === "error") {
|
||||
onError(data.error);
|
||||
updateMessages((prev) =>
|
||||
prev.filter((m) => !(m.role === "assistant" && m.content === "")),
|
||||
);
|
||||
return "break";
|
||||
}
|
||||
if (data.type === "tool_calls" && data.tool_calls?.length) {
|
||||
updateMessages((prev) => {
|
||||
const next = [...prev];
|
||||
const lastMsg = next[next.length - 1];
|
||||
if (lastMsg?.role === "assistant")
|
||||
next[next.length - 1] = {
|
||||
...lastMsg,
|
||||
toolCalls: data.tool_calls,
|
||||
};
|
||||
return next;
|
||||
});
|
||||
if (data.type === "messages") {
|
||||
onChain(data.messages ?? []);
|
||||
return "continue";
|
||||
}
|
||||
if (data.type === "content" && data.delta !== undefined) {
|
||||
updateMessages((prev) => {
|
||||
const next = [...prev];
|
||||
const lastMsg = next[next.length - 1];
|
||||
if (lastMsg?.role === "assistant")
|
||||
next[next.length - 1] = {
|
||||
...lastMsg,
|
||||
content: lastMsg.content + data.delta,
|
||||
};
|
||||
return next;
|
||||
});
|
||||
onContentDelta(data.delta);
|
||||
return "continue";
|
||||
}
|
||||
if (data.type === "reasoning" && data.delta !== undefined) {
|
||||
updateMessages((prev) => {
|
||||
const next = [...prev];
|
||||
const lastMsg = next[next.length - 1];
|
||||
if (lastMsg?.role === "assistant")
|
||||
next[next.length - 1] = {
|
||||
...lastMsg,
|
||||
reasoning: (lastMsg.reasoning ?? "") + data.delta,
|
||||
};
|
||||
return next;
|
||||
});
|
||||
onReasoningDelta(data.delta);
|
||||
return "continue";
|
||||
}
|
||||
if (data.type === "stats") {
|
||||
const stats: ChatStats = {
|
||||
onStats({
|
||||
promptTokens: data.prompt_tokens,
|
||||
completionTokens: data.completion_tokens,
|
||||
completionDurationMs: data.completion_duration_ms,
|
||||
tokensPerSecond: data.tokens_per_second,
|
||||
};
|
||||
updateMessages((prev) => {
|
||||
const next = [...prev];
|
||||
const lastMsg = next[next.length - 1];
|
||||
if (lastMsg?.role === "assistant")
|
||||
next[next.length - 1] = { ...lastMsg, stats };
|
||||
return next;
|
||||
});
|
||||
onStats?.(stats);
|
||||
return "continue";
|
||||
}
|
||||
return "continue";
|
||||
@ -165,9 +133,8 @@ export async function streamChatCompletion(
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) continue;
|
||||
try {
|
||||
const data = JSON.parse(trimmed) as StreamChunk & { type: string };
|
||||
const result = applyChunk(data as StreamChunk);
|
||||
if (result === "break") {
|
||||
const data = JSON.parse(trimmed) as StreamChunk;
|
||||
if (applyChunk(data) === "break") {
|
||||
hadStreamError = true;
|
||||
break;
|
||||
}
|
||||
@ -181,50 +148,63 @@ export async function streamChatCompletion(
|
||||
// Flush remaining buffer
|
||||
if (!hadStreamError && buffer.trim()) {
|
||||
try {
|
||||
const data = JSON.parse(buffer.trim()) as StreamChunk & {
|
||||
type: string;
|
||||
delta?: string;
|
||||
};
|
||||
if (data.type === "content" && data.delta !== undefined) {
|
||||
updateMessages((prev) => {
|
||||
const next = [...prev];
|
||||
const lastMsg = next[next.length - 1];
|
||||
if (lastMsg?.role === "assistant")
|
||||
next[next.length - 1] = {
|
||||
...lastMsg,
|
||||
content: lastMsg.content + data.delta!,
|
||||
};
|
||||
return next;
|
||||
});
|
||||
}
|
||||
const data = JSON.parse(buffer.trim()) as StreamChunk;
|
||||
applyChunk(data);
|
||||
} catch {
|
||||
// ignore final malformed chunk
|
||||
}
|
||||
}
|
||||
|
||||
if (!hadStreamError) {
|
||||
updateMessages((prev) => {
|
||||
const next = [...prev];
|
||||
const lastMsg = next[next.length - 1];
|
||||
if (lastMsg?.role === "assistant" && lastMsg.content === "")
|
||||
next[next.length - 1] = { ...lastMsg, content: " " };
|
||||
return next;
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof DOMException && err.name === "AbortError") {
|
||||
// User stopped generation — not an error
|
||||
} else {
|
||||
onError(defaultErrorMessage);
|
||||
updateMessages((prev) =>
|
||||
prev.filter((m) => !(m.role === "assistant" && m.content === "")),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
onDone();
|
||||
}
|
||||
}
|
||||
|
||||
/** Map each tool result message to its tool_call_id for response lookup. */
|
||||
export function toolResponsesById(
|
||||
messages: ChatMessage[],
|
||||
): Map<string, string> {
|
||||
const map = new Map<string, string>();
|
||||
for (const m of messages) {
|
||||
if (m.role === "tool" && typeof m.tool_call_id === "string") {
|
||||
map.set(
|
||||
m.tool_call_id,
|
||||
typeof m.content === "string" ? m.content : JSON.stringify(m.content),
|
||||
);
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/** Derive the display tool calls for one assistant message. */
|
||||
export function toolCallsForMessage(
|
||||
message: ChatMessage,
|
||||
responses: Map<string, string>,
|
||||
): ToolCall[] {
|
||||
if (!message.tool_calls?.length) return [];
|
||||
return message.tool_calls.map((tc) => {
|
||||
let args: Record<string, unknown> | undefined;
|
||||
const raw = tc.function?.arguments;
|
||||
if (typeof raw === "string") {
|
||||
try {
|
||||
args = JSON.parse(raw) as Record<string, unknown>;
|
||||
} catch {
|
||||
args = undefined;
|
||||
}
|
||||
}
|
||||
return {
|
||||
name: tc.function?.name ?? "",
|
||||
arguments: args,
|
||||
response: responses.get(tc.id),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse search_objects tool call response(s) into event ids for thumbnails.
|
||||
*/
|
||||
|
||||
Loading…
Reference in New Issue
Block a user