Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
14ea1dc74b
Merge 42453dad30 into efe585a920 2026-06-11 14:43:32 +02:00
20 changed files with 320 additions and 1072 deletions

View File

@ -121,12 +121,6 @@ If segments are only ~1 second instead of ~10 seconds, the camera is sending cor
- **Changing codec, bitrate, or resolution mid-stream** — Any encoding changes during an active stream can cause unpredictable segment splitting.
- **Camera firmware bugs** — Check for firmware updates from your camera manufacturer.
:::tip
You don't have to run `ffprobe` by hand to catch this. Open a camera's **Camera Probe Info** dialog (the info icon on the System → Metrics → Cameras page) and check the **Keyframe analysis** section. It probes the record stream and flags sparse or variable keyframes, which is what smart/"+" codecs (H.264+/H.265+) and long keyframe intervals produce.
:::
### Step 4: Check for a stuck detector
If the detect stream is not processing frames, segments will accumulate. Common causes:

View File

@ -400,35 +400,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/HTTPValidationError"
/keyframe_analysis:
get:
tags:
- Camera
summary: Keyframe Analysis
description: >-
Probe a camera's record stream and classify its keyframe spacing.
Detects smart/+ codecs and long/variable GOPs that degrade recording.
operationId: keyframe_analysis_keyframe_analysis_get
parameters:
- name: camera
in: query
required: false
schema:
type: string
default: ""
title: Camera
responses:
"200":
description: Successful Response
content:
application/json:
schema: {}
"422":
description: Validation Error
content:
application/json:
schema:
$ref: "#/components/schemas/HTTPValidationError"
/ffprobe/snapshot:
get:
tags:

View File

@ -34,15 +34,11 @@ from frigate.config.camera.updater import (
)
from frigate.config.env import substitute_frigate_vars
from frigate.models import User
from frigate.util.builtin import clean_camera_user_pass, get_record_segment_time
from frigate.util.builtin import clean_camera_user_pass
from frigate.util.camera_cleanup import cleanup_camera_db, cleanup_camera_files
from frigate.util.config import find_config_file
from frigate.util.image import run_ffmpeg_snapshot
from frigate.util.services import (
analyze_record_keyframes,
ffprobe_stream,
is_restricted_go2rtc_source,
)
from frigate.util.services import ffprobe_stream, is_restricted_go2rtc_source
logger = logging.getLogger(__name__)
@ -366,48 +362,6 @@ def ffprobe(request: Request, paths: str = "", detailed: bool = False):
return JSONResponse(content=output)
@router.get("/keyframe_analysis", dependencies=[Depends(require_role(["admin"]))])
async def keyframe_analysis(request: Request, camera: str = ""):
"""Probe a camera's record stream and classify its keyframe spacing.
Detects smart/+ codecs and long/variable GOPs that degrade recording.
"""
config: FrigateConfig = request.app.frigate_config
if camera not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"{camera} is not a valid camera."},
status_code=404,
)
camera_config = config.cameras[camera]
if not camera_config.enabled:
return JSONResponse(
content={"success": False, "message": f"{camera} is not enabled."},
status_code=404,
)
# keyframe spacing only matters when this camera is recording
if not camera_config.record.enabled:
return JSONResponse(content={"severity": "record_disabled"})
# recording guarantees an input carries the record role; its index matches
# the "Stream N" numbering the ffprobe endpoint surfaces (same input order)
record_index, record_input = next(
(idx, i)
for idx, i in enumerate(camera_config.ffmpeg.inputs)
if "record" in i.roles
)
segment_time = get_record_segment_time(camera_config)
result = await analyze_record_keyframes(
config.ffmpeg, record_input.path, segment_time
)
result["stream_index"] = record_index
return JSONResponse(content=result)
@router.get("/ffprobe/snapshot", dependencies=[Depends(require_role(["admin"]))])
def ffprobe_snapshot(request: Request, url: str = "", timeout: int = 10):
"""Get a snapshot from a stream URL using ffmpeg."""

View File

@ -7,7 +7,7 @@ import operator
import time
from datetime import datetime
from functools import reduce
from typing import Any, Optional
from typing import Any, Dict, List, Optional
import cv2
from fastapi import APIRouter, Body, Depends, HTTPException, Request
@ -59,7 +59,7 @@ class ToolExecuteRequest(BaseModel):
"""Request model for tool execution."""
tool_name: str
arguments: dict[str, Any]
arguments: Dict[str, Any]
class VLMMonitorRequest(BaseModel):
@ -68,8 +68,8 @@ class VLMMonitorRequest(BaseModel):
camera: str
condition: str
max_duration_minutes: int = 60
labels: list[str] = []
zones: list[str] = []
labels: List[str] = []
zones: List[str] = []
@router.get(
@ -91,10 +91,10 @@ def get_tools(request: Request) -> JSONResponse:
def _resolve_zones(
zones: list[str],
zones: List[str],
config: FrigateConfig,
target_cameras: list[str],
) -> list[str]:
target_cameras: List[str],
) -> List[str]:
"""Map zone names to their canonical config keys, case-insensitively.
LLMs frequently echo a user's casing ("Front Yard") instead of the
@ -107,7 +107,7 @@ def _resolve_zones(
if not zones:
return zones
lookup: dict[str, str] = {}
lookup: Dict[str, str] = {}
for camera_id in target_cameras:
camera_config = config.cameras.get(camera_id)
if camera_config is None:
@ -120,8 +120,8 @@ def _resolve_zones(
async def _execute_search_objects(
request: Request,
arguments: dict[str, Any],
allowed_cameras: list[str],
arguments: Dict[str, Any],
allowed_cameras: List[str],
) -> JSONResponse:
"""
Execute the search_objects tool.
@ -213,8 +213,8 @@ async def _execute_search_objects(
async def _execute_search_objects_semantic(
request: Request,
arguments: dict[str, Any],
allowed_cameras: list[str],
arguments: Dict[str, Any],
allowed_cameras: List[str],
semantic_query: str,
) -> JSONResponse:
"""Search objects via fused thumbnail + description embeddings.
@ -263,8 +263,8 @@ async def _execute_search_objects_semantic(
limit = int(arguments.get("limit", 25))
limit = max(1, min(limit, 100))
visual_distances: dict[str, float] = {}
description_distances: dict[str, float] = {}
visual_distances: Dict[str, float] = {}
description_distances: Dict[str, float] = {}
try:
rows = context.search_thumbnail(semantic_query)
visual_distances = {row[0]: row[1] for row in rows}
@ -305,7 +305,7 @@ async def _execute_search_objects_semantic(
eligible = {e.id: e for e in Event.select().where(reduce(operator.and_, clauses))}
scored: list[tuple[str, float]] = []
scored: List[tuple[str, float]] = []
for eid in eligible:
v_score = (
distance_to_score(visual_distances[eid], context.thumb_stats)
@ -331,9 +331,9 @@ async def _execute_search_objects_semantic(
async def _execute_find_similar_objects(
request: Request,
arguments: dict[str, Any],
allowed_cameras: list[str],
) -> dict[str, Any]:
arguments: Dict[str, Any],
allowed_cameras: List[str],
) -> Dict[str, Any]:
"""Execute the find_similar_objects tool.
Returns a plain dict (not JSONResponse) so the chat loop can embed it
@ -403,8 +403,8 @@ async def _execute_find_similar_objects(
# version (see frigate/embeddings/__init__.py). Mirror the pattern used by
# frigate/api/event.py events_search: fetch top-k globally, then intersect
# with the structured filters via Peewee.
visual_distances: dict[str, float] = {}
description_distances: dict[str, float] = {}
visual_distances: Dict[str, float] = {}
description_distances: Dict[str, float] = {}
try:
if similarity_mode in ("visual", "fused"):
@ -462,7 +462,7 @@ async def _execute_find_similar_objects(
eligible = {e.id: e for e in Event.select().where(reduce(operator.and_, clauses))}
# 6. Fuse and rank.
scored: list[tuple[str, float]] = []
scored: List[tuple[str, float]] = []
for eid in eligible:
v_score = (
distance_to_score(visual_distances[eid], context.thumb_stats)
@ -503,7 +503,7 @@ async def _execute_find_similar_objects(
async def execute_tool(
request: Request,
body: ToolExecuteRequest = Body(...),
allowed_cameras: list[str] = Depends(get_allowed_cameras_for_filter),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
) -> JSONResponse:
"""
Execute a tool function call.
@ -545,8 +545,8 @@ async def execute_tool(
async def _execute_get_live_context(
request: Request,
camera: str,
allowed_cameras: list[str],
) -> dict[str, Any]:
allowed_cameras: List[str],
) -> Dict[str, Any]:
# Reject wildcards explicitly so models retry with a real camera name
# instead of silently fanning out across every camera.
if camera in ("*", "all"):
@ -593,7 +593,7 @@ async def _execute_get_live_context(
"stationary": obj_dict.get("stationary", False),
}
result: dict[str, Any] = {
result: Dict[str, Any] = {
"camera": camera,
"timestamp": frame_time,
"detections": list(tracked_objects_dict.values()),
@ -620,7 +620,7 @@ async def _execute_get_live_context(
async def _get_live_frame_image_url(
request: Request,
camera: str,
allowed_cameras: list[str],
allowed_cameras: List[str],
) -> Optional[str]:
"""
Fetch the current live frame for a camera as a base64 data URL.
@ -659,8 +659,8 @@ async def _get_live_frame_image_url(
async def _execute_set_camera_state(
request: Request,
arguments: dict[str, Any],
) -> dict[str, Any]:
arguments: Dict[str, Any],
) -> Dict[str, Any]:
role = request.headers.get("remote-role", "")
if "admin" not in [r.strip() for r in role.split(",")]:
return {"error": "Admin privileges required to change camera settings."}
@ -699,10 +699,10 @@ async def _execute_set_camera_state(
async def _execute_tool_internal(
tool_name: str,
arguments: dict[str, Any],
arguments: Dict[str, Any],
request: Request,
allowed_cameras: list[str],
) -> dict[str, Any]:
allowed_cameras: List[str],
) -> Dict[str, Any]:
"""
Internal helper to execute a tool and return the result as a dict.
@ -763,8 +763,8 @@ async def _execute_tool_internal(
async def _execute_start_camera_watch(
request: Request,
arguments: dict[str, Any],
) -> dict[str, Any]:
arguments: Dict[str, Any],
) -> Dict[str, Any]:
camera = arguments.get("camera", "").strip()
condition = arguments.get("condition", "").strip()
max_duration_minutes = int(arguments.get("max_duration_minutes", 60))
@ -814,14 +814,14 @@ async def _execute_start_camera_watch(
}
def _execute_stop_camera_watch() -> dict[str, Any]:
def _execute_stop_camera_watch() -> Dict[str, Any]:
cancelled = stop_vlm_watch_job()
if cancelled:
return {"success": True, "message": "Watch job cancelled."}
return {"success": False, "message": "No active watch job to cancel."}
def _execute_get_profile_status(request: Request) -> dict[str, Any]:
def _execute_get_profile_status(request: Request) -> Dict[str, Any]:
"""Return profile status including active profile and activation timestamps."""
profile_manager = getattr(request.app, "profile_manager", None)
if profile_manager is None:
@ -846,9 +846,9 @@ def _execute_get_profile_status(request: Request) -> dict[str, Any]:
def _execute_get_recap(
arguments: dict[str, Any],
allowed_cameras: list[str],
) -> dict[str, Any]:
arguments: Dict[str, Any],
allowed_cameras: List[str],
) -> Dict[str, Any]:
"""Fetch review segments with GenAI metadata for a time period."""
from functools import reduce
@ -909,7 +909,7 @@ def _execute_get_recap(
.iterator()
)
events: list[dict[str, Any]] = []
events: List[Dict[str, Any]] = []
for row in rows:
data = row.get("data") or {}
@ -920,7 +920,7 @@ def _execute_get_recap(
data = {}
camera = row["camera"]
event: dict[str, Any] = {
event: Dict[str, Any] = {
"camera": camera.replace("_", " ").title(),
"severity": row.get("severity", "detection"),
}
@ -984,10 +984,10 @@ def _execute_get_recap(
async def _execute_pending_tools(
pending_tool_calls: list[dict[str, Any]],
pending_tool_calls: List[Dict[str, Any]],
request: Request,
allowed_cameras: list[str],
) -> tuple[list[ToolCall], list[dict[str, Any]], list[dict[str, Any]]]:
allowed_cameras: List[str],
) -> tuple[List[ToolCall], List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Execute a list of tool calls.
@ -996,9 +996,9 @@ async def _execute_pending_tools(
tool result dicts for conversation,
extra messages to inject after tool results e.g. user messages with images)
"""
tool_calls_out: list[ToolCall] = []
tool_results: list[dict[str, Any]] = []
extra_messages: list[dict[str, Any]] = []
tool_calls_out: List[ToolCall] = []
tool_results: List[Dict[str, Any]] = []
extra_messages: List[Dict[str, Any]] = []
for tool_call in pending_tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call.get("arguments") or {}
@ -1106,7 +1106,7 @@ async def _execute_pending_tools(
async def chat_completion(
request: Request,
body: ChatCompletionRequest = Body(...),
allowed_cameras: list[str] = Depends(get_allowed_cameras_for_filter),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
):
"""
Chat completion endpoint with tool calling support.
@ -1138,23 +1138,19 @@ async def chat_completion(
)
conversation = []
# Build the system message only when the client hasn't already pinned one.
# The first turn has no system message; we generate it (with the current
# timestamp) and return the whole chain so the client persists it. Later
# turns send it back verbatim, freezing the timestamp so the prompt prefix
# stays byte-identical and the model server's prompt cache keeps hitting.
if not body.messages or body.messages[0].role != "system":
conversation.append(
{
"role": "system",
"content": build_chat_system_prompt(
config=config,
allowed_cameras=allowed_cameras,
semantic_search_enabled=semantic_search_enabled,
attribute_classifications=attribute_classifications,
),
}
)
system_prompt = build_chat_system_prompt(
config=config,
allowed_cameras=allowed_cameras,
semantic_search_enabled=semantic_search_enabled,
attribute_classifications=attribute_classifications,
)
conversation.append(
{
"role": "system",
"content": system_prompt,
}
)
for msg in body.messages:
msg_dict = {
@ -1165,13 +1161,11 @@ async def chat_completion(
msg_dict["tool_call_id"] = msg.tool_call_id
if msg.name:
msg_dict["name"] = msg.name
if msg.tool_calls is not None:
msg_dict["tool_calls"] = msg.tool_calls
conversation.append(msg_dict)
tool_iterations = 0
tool_calls: list[ToolCall] = []
tool_calls: List[ToolCall] = []
max_iterations = body.max_tool_iterations
logger.debug(
@ -1181,20 +1175,11 @@ async def chat_completion(
# True LLM streaming when client supports it and stream requested
if body.stream and hasattr(genai_client, "chat_with_tools_stream"):
stream_tool_calls: List[ToolCall] = []
stream_iterations = 0
async def stream_body_llm():
nonlocal conversation, stream_iterations
def _emit_chain(extra: Optional[list[dict[str, Any]]] = None):
# Return the full conversation (including the system message) so
# the client persists and replays it verbatim next turn.
chain = conversation + (extra or [])
return (
json.dumps({"type": "messages", "messages": chain}).encode("utf-8")
+ b"\n"
)
nonlocal conversation, stream_tool_calls, stream_iterations
while stream_iterations < max_iterations:
if await request.is_disconnected():
logger.debug("Client disconnected, stopping chat stream")
@ -1259,33 +1244,31 @@ async def chat_completion(
)
return
(
_executed_calls,
executed_calls,
tool_results,
extra_msgs,
) = await _execute_pending_tools(
pending, request, allowed_cameras
)
stream_tool_calls.extend(executed_calls)
conversation.extend(tool_results)
conversation.extend(extra_msgs)
# Emit the running chain so the client can render tool
# calls live and replay them verbatim next turn.
yield _emit_chain()
yield (
json.dumps(
{
"type": "tool_calls",
"tool_calls": [
tc.model_dump() for tc in stream_tool_calls
],
}
).encode("utf-8")
+ b"\n"
)
break
else:
# Streaming never appends the final assistant message
# to the conversation, so add it to the chain.
yield _emit_chain(
extra=[
{
"role": "assistant",
"content": msg.get("content"),
}
]
)
yield (json.dumps({"type": "done"}).encode("utf-8") + b"\n")
return
else:
yield _emit_chain()
yield json.dumps({"type": "done"}).encode("utf-8") + b"\n"
return StreamingResponse(
@ -1332,15 +1315,19 @@ async def chat_completion(
if body.stream:
final_reasoning = response.get("reasoning")
chain = list(conversation)
async def stream_body() -> Any:
yield (
json.dumps({"type": "messages", "messages": chain}).encode(
"utf-8"
if tool_calls:
yield (
json.dumps(
{
"type": "tool_calls",
"tool_calls": [
tc.model_dump() for tc in tool_calls
],
}
).encode("utf-8")
+ b"\n"
)
+ b"\n"
)
# Emit the full reasoning trace up front when the
# underlying client did not stream it
if final_reasoning:
@ -1376,7 +1363,6 @@ async def chat_completion(
finish_reason=response.get("finish_reason", "stop"),
tool_iterations=tool_iterations,
tool_calls=tool_calls,
messages=list(conversation),
).model_dump(),
)
@ -1409,7 +1395,6 @@ async def chat_completion(
finish_reason="length",
tool_iterations=tool_iterations,
tool_calls=tool_calls,
messages=list(conversation),
).model_dump(),
)

View File

@ -1,6 +1,6 @@
"""Chat API request models."""
from typing import Any, Optional
from typing import Optional
from pydantic import BaseModel, Field
@ -11,29 +11,13 @@ class ChatMessage(BaseModel):
role: str = Field(
description="Message role: 'user', 'assistant', 'system', or 'tool'"
)
content: Optional[Any] = Field(
default=None,
description=(
"Message content. Usually a string, but may be a multimodal content "
"list (e.g. text + image_url) or null for assistant turns that only "
"request tool calls."
),
)
content: str = Field(description="Message content")
tool_call_id: Optional[str] = Field(
default=None, description="For tool messages, the ID of the tool call"
)
name: Optional[str] = Field(
default=None, description="For tool messages, the tool name"
)
tool_calls: Optional[list[dict[str, Any]]] = Field(
default=None,
description=(
"For assistant messages replayed from prior turns, the OpenAI-format "
"tool calls the model previously requested. Replaying these verbatim "
"keeps the conversation prefix byte-for-byte identical so the model "
"server's prompt cache hits on follow-up turns."
),
)
class ChatCompletionRequest(BaseModel):

View File

@ -56,12 +56,3 @@ class ChatCompletionResponse(BaseModel):
default_factory=list,
description="List of tool calls that were executed during this completion",
)
messages: list[dict[str, Any]] = Field(
default_factory=list,
description=(
"The full conversation chain, including the system message. Persist "
"and replay this verbatim on the next request so the prompt prefix "
"stays byte-identical and the model server's prompt cache keeps "
"hitting."
),
)

View File

@ -1,58 +0,0 @@
from unittest.mock import AsyncMock, patch
from frigate.models import Event, Recordings, ReviewSegment
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
class TestHttpKeyframeAnalysis(BaseTestHttp):
def setUp(self):
super().setUp([Event, Recordings, ReviewSegment])
def test_invalid_camera_returns_404(self):
app = super().create_app()
with AuthTestClient(app) as client:
response = client.get("/keyframe_analysis?camera=does_not_exist")
assert response.status_code == 404
def test_record_disabled_returns_neutral(self):
# default minimal_config has recording disabled
app = super().create_app()
with AuthTestClient(app) as client:
response = client.get("/keyframe_analysis?camera=front_door")
assert response.status_code == 200
assert response.json()["severity"] == "record_disabled"
def test_probes_record_input_and_returns_severity(self):
self.minimal_config["cameras"]["front_door"]["ffmpeg"]["inputs"] = [
{
"path": "rtsp://10.0.0.1:554/record",
"roles": ["detect", "record"],
}
]
self.minimal_config["cameras"]["front_door"]["record"] = {"enabled": True}
app = super().create_app()
canned = {
"severity": "ok",
"keyframe_count": 5,
"max_gap": 1.0,
"mean_gap": 1.0,
"min_gap": 1.0,
"segment_time": 10,
"duration_observed": 4.0,
"thresholds": {"warning": 4.0, "error": 10},
}
with patch(
"frigate.api.camera.analyze_record_keyframes",
AsyncMock(return_value=canned),
) as mock_probe:
with AuthTestClient(app) as client:
response = client.get("/keyframe_analysis?camera=front_door")
assert response.status_code == 200
assert response.json()["severity"] == "ok"
# index matches the input carrying the record role ("Stream 1")
assert response.json()["stream_index"] == 0
# the record-role input path was probed
assert mock_probe.await_args.args[1] == "rtsp://10.0.0.1:554/record"

View File

@ -1,111 +0,0 @@
"""Tests for keyframe-spacing analysis used to detect smart/+ codecs."""
import asyncio
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
from frigate.util.services import (
analyze_record_keyframes,
classify_keyframe_gaps,
parse_keyframe_packets,
)
class TestClassifyKeyframeGaps(unittest.TestCase):
def test_ok_when_gaps_small(self):
# keyframes every ~1s
pts = [0.0, 1.0, 2.0, 3.0, 4.0]
result = classify_keyframe_gaps(pts, segment_time=10)
self.assertEqual(result["severity"], "ok")
self.assertEqual(result["max_gap"], 1.0)
self.assertEqual(result["keyframe_count"], 5)
self.assertEqual(result["thresholds"], {"warning": 4.0, "error": 10})
def test_warning_when_gap_exceeds_four_seconds(self):
pts = [0.0, 1.0, 6.5] # 5.5s gap
result = classify_keyframe_gaps(pts, segment_time=10)
self.assertEqual(result["severity"], "warning")
self.assertEqual(result["max_gap"], 5.5)
def test_error_when_gap_exceeds_segment_time(self):
pts = [0.0, 12.0] # 12s gap > 10s segment
result = classify_keyframe_gaps(pts, segment_time=10)
self.assertEqual(result["severity"], "error")
def test_error_threshold_tracks_segment_time(self):
pts = [0.0, 6.0] # 6s gap, segment_time=5 -> error
result = classify_keyframe_gaps(pts, segment_time=5)
self.assertEqual(result["severity"], "error")
def test_unknown_with_single_keyframe(self):
result = classify_keyframe_gaps([1.0], segment_time=10)
self.assertEqual(result["severity"], "unknown")
self.assertIsNone(result["max_gap"])
self.assertEqual(result["keyframe_count"], 1)
def test_unknown_with_no_keyframes(self):
result = classify_keyframe_gaps([], segment_time=10)
self.assertEqual(result["severity"], "unknown")
self.assertEqual(result["keyframe_count"], 0)
class TestParseKeyframePackets(unittest.TestCase):
def test_extracts_keyframe_pts_and_max(self):
output = "0.000000,K__\n0.033333,___\n1.000000,K__\n1.500000,___\n"
keyframe_pts, max_pts = parse_keyframe_packets(output)
self.assertEqual(keyframe_pts, [0.0, 1.0])
self.assertEqual(max_pts, 1.5)
def test_skips_unparseable_and_empty_lines(self):
output = "N/A,K__\n\n2.0,K__\nbad line\n"
keyframe_pts, max_pts = parse_keyframe_packets(output)
self.assertEqual(keyframe_pts, [2.0])
self.assertEqual(max_pts, 2.0)
def test_empty_output(self):
keyframe_pts, max_pts = parse_keyframe_packets("")
self.assertEqual(keyframe_pts, [])
self.assertIsNone(max_pts)
class TestAnalyzeRecordKeyframes(unittest.IsolatedAsyncioTestCase):
async def test_merges_duration_and_classification(self):
csv = b"0.0,K__\n1.0,___\n6.0,K__\n7.0,___\n"
proc = MagicMock()
proc.communicate = AsyncMock(return_value=(csv, b""))
ffmpeg = MagicMock()
ffmpeg.ffprobe_path = "/usr/bin/ffprobe"
with patch(
"frigate.util.services.asyncio.create_subprocess_exec",
AsyncMock(return_value=proc),
):
result = await analyze_record_keyframes(
ffmpeg, "rtsp://cam/stream", segment_time=10
)
self.assertEqual(result["severity"], "warning") # 6s gap > 4s
self.assertEqual(result["max_gap"], 6.0)
self.assertEqual(result["duration_observed"], 7.0)
async def test_timeout_returns_unknown(self):
proc = MagicMock()
proc.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
proc.kill = MagicMock()
ffmpeg = MagicMock()
ffmpeg.ffprobe_path = "/usr/bin/ffprobe"
with patch(
"frigate.util.services.asyncio.create_subprocess_exec",
AsyncMock(return_value=proc),
):
result = await analyze_record_keyframes(
ffmpeg, "rtsp://cam/stream", segment_time=10
)
self.assertEqual(result["severity"], "unknown")
proc.kill.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@ -14,16 +14,13 @@ import urllib.parse
from collections.abc import Mapping
from multiprocessing.managers import ValueProxy
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Optional, Tuple, Union
import numpy as np
from ruamel.yaml import YAML
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
if TYPE_CHECKING:
from frigate.config import CameraConfig
logger = logging.getLogger(__name__)
@ -135,24 +132,6 @@ def get_ffmpeg_arg_list(arg: Any) -> list:
return arg if isinstance(arg, list) else shlex.split(arg)
# all built-in record presets use this segment_time
DEFAULT_RECORD_SEGMENT_TIME = 10
def get_record_segment_time(config: "CameraConfig") -> int:
"""Extract -segment_time from the camera's record output args."""
record_args = get_ffmpeg_arg_list(config.ffmpeg.output_args.record)
if record_args and record_args[0].startswith("preset"):
return DEFAULT_RECORD_SEGMENT_TIME
try:
idx = record_args.index("-segment_time")
return int(record_args[idx + 1])
except (ValueError, IndexError):
return DEFAULT_RECORD_SEGMENT_TIME
def load_labels(
path: Optional[str], encoding="utf-8", prefill=91, indexed: bool | None = None
):

View File

@ -879,131 +879,6 @@ def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedPro
return result
KEYFRAME_PROBE_WINDOW_SECONDS = 20
KEYFRAME_GAP_WARNING_SECONDS = 4.0
def parse_keyframe_packets(output: str) -> Tuple[List[float], Optional[float]]:
"""Parse ffprobe CSV `pts_time,flags` output.
Returns the presentation timestamps of keyframes (flags containing "K")
and the maximum timestamp observed across all packets.
"""
keyframe_pts: List[float] = []
max_pts: Optional[float] = None
for line in output.splitlines():
parts = line.split(",")
if len(parts) < 2:
continue
try:
pts = float(parts[0])
except ValueError:
continue
if max_pts is None or pts > max_pts:
max_pts = pts
if "K" in parts[1]:
keyframe_pts.append(pts)
return keyframe_pts, max_pts
def classify_keyframe_gaps(
keyframe_pts: List[float], segment_time: int
) -> dict[str, Any]:
"""Classify keyframe spacing for recording suitability.
A camera using a smart/+ codec or a long/variable GOP produces large or
irregular gaps between keyframes, which breaks time-based recording
segmentation. Severity:
- "unknown" when fewer than two keyframes were observed
- "error" when the longest gap exceeds the record segment length
- "warning" when the longest gap exceeds the warning threshold
- "ok" otherwise
"""
thresholds = {
"warning": KEYFRAME_GAP_WARNING_SECONDS,
"error": segment_time,
}
if len(keyframe_pts) < 2:
return {
"keyframe_count": len(keyframe_pts),
"max_gap": None,
"mean_gap": None,
"min_gap": None,
"segment_time": segment_time,
"severity": "unknown",
"thresholds": thresholds,
}
gaps = [b - a for a, b in zip(keyframe_pts, keyframe_pts[1:])]
max_gap = max(gaps)
if max_gap > segment_time:
severity = "error"
elif max_gap > KEYFRAME_GAP_WARNING_SECONDS:
severity = "warning"
else:
severity = "ok"
return {
"keyframe_count": len(keyframe_pts),
"max_gap": round(max_gap, 2),
"mean_gap": round(sum(gaps) / len(gaps), 2),
"min_gap": round(min(gaps), 2),
"segment_time": segment_time,
"severity": severity,
"thresholds": thresholds,
}
async def analyze_record_keyframes(
ffmpeg, url: str, segment_time: int, window: int = KEYFRAME_PROBE_WINDOW_SECONDS
) -> dict[str, Any]:
"""Probe a stream for ~`window` seconds and classify its keyframe spacing.
Reads video packet flags via ffprobe to find keyframes, then measures the
gaps between them. On timeout or failure returns an "unknown" result rather
than a false all-clear.
"""
clean_url = escape_special_characters(url)
cmd = [
ffmpeg.ffprobe_path,
"-v",
"error",
"-select_streams",
"v:0",
"-read_intervals",
f"%+{window}",
"-show_entries",
"packet=pts_time,flags",
"-of",
"csv=p=0",
clean_url,
]
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=window + 15)
except asyncio.TimeoutError:
logger.warning("Keyframe probe timed out for record stream")
proc.kill()
return classify_keyframe_gaps([], segment_time)
except OSError as err:
logger.error("Keyframe probe failed: %s", err)
return classify_keyframe_gaps([], segment_time)
keyframe_pts, max_pts = parse_keyframe_packets(stdout.decode("utf-8", "replace"))
result = classify_keyframe_gaps(keyframe_pts, segment_time)
result["duration_observed"] = round(max_pts, 2) if max_pts is not None else None
return result
def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess:
"""Run vainfo."""
if not device_name:

View File

@ -24,7 +24,7 @@ from frigate.config.camera.updater import (
)
from frigate.const import PROCESS_PRIORITY_HIGH
from frigate.log import LogPipe
from frigate.util.builtin import EventsPerSecond, get_record_segment_time
from frigate.util.builtin import EventsPerSecond, get_ffmpeg_arg_list
from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg
from frigate.util.image import (
FrameManager,
@ -34,6 +34,23 @@ from frigate.util.process import FrigateProcess
logger = logging.getLogger(__name__)
# all built-in record presets use this segment_time
DEFAULT_RECORD_SEGMENT_TIME = 10
def _get_record_segment_time(config: CameraConfig) -> int:
"""Extract -segment_time from the camera's record output args."""
record_args = get_ffmpeg_arg_list(config.ffmpeg.output_args.record)
if record_args and record_args[0].startswith("preset"):
return DEFAULT_RECORD_SEGMENT_TIME
try:
idx = record_args.index("-segment_time")
return int(record_args[idx + 1])
except (ValueError, IndexError):
return DEFAULT_RECORD_SEGMENT_TIME
def capture_frames(
ffmpeg_process: sp.Popen[Any],
@ -168,7 +185,7 @@ class CameraWatchdog(threading.Thread):
# `valid` segments are published with the segment's start time, so the
# gap between consecutive publishes can reach 2 * segment_time. Pad the
# staleness threshold so it's never tighter than that worst case.
segment_time = get_record_segment_time(self.config)
segment_time = _get_record_segment_time(self.config)
self.record_stale_threshold = max(120, 2 * segment_time + 30)
# Stall tracking (based on last processed frame)

4
web/.gitignore vendored
View File

@ -12,10 +12,6 @@ dist
dist-ssr
*.local
# Playwright
playwright-report
test-results
# Editor directories and files
.vscode/*
!.vscode/extensions.json

View File

@ -92,15 +92,6 @@ test.describe("Chat — streaming @medium", () => {
await installChatStreamOverride(frigateApp, [
{ type: "content", delta: "Hel" },
{ type: "content", delta: "lo" },
{
type: "messages",
messages: [
{ role: "system", content: "sys" },
{ role: "user", content: "hello chat" },
{ role: "assistant", content: "Hello" },
],
},
{ type: "done" },
]);
await frigateApp.goto("/chat");
const input = frigateApp.page.getByPlaceholder(/ask/i);
@ -146,15 +137,6 @@ test.describe("Chat — streaming @medium", () => {
{ type: "content", delta: "Hel" },
{ type: "content", delta: "lo, " },
{ type: "content", delta: "world!" },
{
type: "messages",
messages: [
{ role: "system", content: "sys" },
{ role: "user", content: "greet me" },
{ role: "assistant", content: "Hello, world!" },
],
},
{ type: "done" },
],
{ chunkDelayMs: 50 },
);
@ -169,39 +151,19 @@ test.describe("Chat — streaming @medium", () => {
});
});
test("tool calls in the chain render a ToolCallsGroup", async ({
frigateApp,
}) => {
const toolTurn = [
{ role: "system", content: "sys" },
{ role: "user", content: "find people" },
test("tool_calls chunks render a ToolCallsGroup", async ({ frigateApp }) => {
await installChatStreamOverride(frigateApp, [
{
role: "assistant",
content: null,
type: "tool_calls",
tool_calls: [
{
id: "call_1",
type: "function",
function: {
name: "search_objects",
arguments: '{"label":"person"}',
},
name: "search_objects",
arguments: { label: "person" },
},
],
},
{ role: "tool", tool_call_id: "call_1", content: "[]" },
];
await installChatStreamOverride(frigateApp, [
{ type: "messages", messages: toolTurn },
{ type: "content", delta: "Searching for people." },
{
type: "messages",
messages: [
...toolTurn,
{ role: "assistant", content: "Searching for people." },
],
},
{ type: "done" },
]);
await frigateApp.goto("/chat");
const input = frigateApp.page.getByPlaceholder(/ask/i);
@ -291,15 +253,6 @@ test.describe("Chat — attachment chip @medium", () => {
// We use the stream override so the first message completes quickly.
await installChatStreamOverride(frigateApp, [
{ type: "content", delta: "Done." },
{
type: "messages",
messages: [
{ role: "system", content: "sys" },
{ role: "user", content: "hello" },
{ role: "assistant", content: "Done." },
],
},
{ type: "done" },
]);
await frigateApp.goto("/chat");

View File

@ -174,21 +174,6 @@
"error": "Error: {{error}}",
"tips": {
"title": "Camera Probe Info"
},
"keyframes": {
"title": "Keyframe analysis",
"analyzing": "Analyzing keyframes... {{seconds}} seconds remaining",
"stillAnalyzing": "Still analyzing keyframes...",
"recordStream": "Record stream:",
"keyframeCount": "Keyframes observed:",
"observedDuration": "Observed duration:",
"gap": "Keyframe gap (min / avg / max):",
"segmentLength": "Recording segment length:",
"ok": "Keyframes every ~{{seconds}}s, good for recording and playback.",
"warning": "Sparse or variable keyframes (longest gap ~{{seconds}}s), likely a smart codec (H.264+/H.265+), this is not recommended.",
"error": "Keyframe gap (~{{seconds}}s) exceeds the recording segment length ({{segmentTime}}s). Some segments may have no keyframe, which breaks playback. Disable the smart/+ codec on the camera or shorten its keyframe interval.",
"unknown": "Couldn't determine keyframe spacing.",
"recordDisabled": "Recording is disabled for this camera."
}
},
"framesAndDetections": "Frames / Detections",

View File

@ -7,8 +7,7 @@ import {
DialogTitle,
} from "../ui/dialog";
import ActivityIndicator from "../indicators/activity-indicator";
import KeyframeAnalysisSection from "./KeyframeAnalysisSection";
import { Ffprobe, KeyframeAnalysis } from "@/types/stats";
import { Ffprobe } from "@/types/stats";
import { Button } from "../ui/button";
import copy from "copy-to-clipboard";
import { CameraConfig } from "@/types/frigateConfig";
@ -31,7 +30,6 @@ export default function CameraInfoDialog({
}: CameraInfoDialogProps) {
const { t } = useTranslation(["views/system"]);
const [ffprobeInfo, setFfprobeInfo] = useState<Ffprobe[]>();
const [keyframeInfo, setKeyframeInfo] = useState<KeyframeAnalysis>();
useEffect(() => {
axios
@ -69,12 +67,7 @@ export default function CameraInfoDialog({
}, []);
const onCopyFfprobe = async () => {
copy(
JSON.stringify({
ffprobe: ffprobeInfo,
keyframe_analysis: keyframeInfo,
}),
);
copy(JSON.stringify(ffprobeInfo));
toast.success(t("cameras.toast.success.copyToClipboard"));
};
@ -103,7 +96,7 @@ export default function CameraInfoDialog({
<Trans ns="views/system">cameras.info.streamDataFromFFPROBE</Trans>
</DialogDescription>
<div className="mb-2 p-4 text-sm">
<div className="mb-2 p-4">
{ffprobeInfo ? (
<div>
{ffprobeInfo.map((stream, idx) => (
@ -191,10 +184,6 @@ export default function CameraInfoDialog({
)}
</div>
))}
<KeyframeAnalysisSection
cameraName={camera.name}
onResult={setKeyframeInfo}
/>
</div>
) : (
<div className="flex flex-col items-center">

View File

@ -1,193 +0,0 @@
import { useEffect, useMemo, useState } from "react";
import { useTranslation } from "react-i18next";
import axios from "axios";
import { FaCircleCheck, FaTriangleExclamation } from "react-icons/fa6";
import { LuX } from "react-icons/lu";
import ActivityIndicator from "../indicators/activity-indicator";
import { KeyframeAnalysis } from "@/types/stats";
const PROBE_WINDOW_SECONDS = 20;
type KeyframeAnalysisSectionProps = {
cameraName: string;
onResult?: (analysis: KeyframeAnalysis) => void;
};
export default function KeyframeAnalysisSection({
cameraName,
onResult,
}: KeyframeAnalysisSectionProps) {
const { t } = useTranslation(["views/system"]);
const [analysis, setAnalysis] = useState<KeyframeAnalysis>();
const [failed, setFailed] = useState(false);
const [secondsRemaining, setSecondsRemaining] =
useState(PROBE_WINDOW_SECONDS);
// fire the probe once on mount
useEffect(() => {
let active = true;
axios
.get("keyframe_analysis", { params: { camera: cameraName } })
.then((res) => {
if (active) {
setAnalysis(res.data);
onResult?.(res.data);
}
})
.catch(() => {
if (active) {
setFailed(true);
}
});
return () => {
active = false;
};
// re-probing only depends on the camera; onResult is a stable setter
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [cameraName]);
// countdown while waiting for the probe to return
useEffect(() => {
if (analysis || failed) {
return;
}
const interval = setInterval(() => {
setSecondsRemaining((s) => (s > 0 ? s - 1 : 0));
}, 1000);
return () => clearInterval(interval);
}, [analysis, failed]);
const content = useMemo(() => {
if (failed) {
return <Row icon="unknown">{t("cameras.info.keyframes.unknown")}</Row>;
}
if (!analysis) {
return (
<div className="flex items-center gap-2 text-muted-foreground">
<ActivityIndicator className="size-4" />
<span>
{secondsRemaining > 0
? t("cameras.info.keyframes.analyzing", {
seconds: secondsRemaining,
})
: t("cameras.info.keyframes.stillAnalyzing")}
</span>
</div>
);
}
let summary;
switch (analysis.severity) {
case "ok":
summary = (
<Row icon="ok">
{t("cameras.info.keyframes.ok", { seconds: analysis.mean_gap })}
</Row>
);
break;
case "warning":
summary = (
<Row icon="warning">
{t("cameras.info.keyframes.warning", { seconds: analysis.max_gap })}
</Row>
);
break;
case "error":
summary = (
<Row icon="error">
{t("cameras.info.keyframes.error", {
seconds: analysis.max_gap,
segmentTime: analysis.segment_time,
})}
</Row>
);
break;
case "record_disabled":
summary = (
<Row icon="unknown">{t("cameras.info.keyframes.recordDisabled")}</Row>
);
break;
default:
summary = (
<Row icon="unknown">{t("cameras.info.keyframes.unknown")}</Row>
);
}
// gap statistics are only meaningful once at least two keyframes were seen
const hasStats = analysis.max_gap != null;
const hasDetails = hasStats || analysis.stream_index != null;
return (
<div className="text-muted-foreground">
{analysis.stream_index != null && (
<div>
{t("cameras.info.keyframes.recordStream")}{" "}
<span className="text-primary">
{t("cameras.info.stream", { idx: analysis.stream_index + 1 })}
</span>
</div>
)}
{hasStats && (
<div>
<div>
{t("cameras.info.keyframes.keyframeCount")}{" "}
<span className="text-primary">{analysis.keyframe_count}</span>
</div>
<div>
{t("cameras.info.keyframes.observedDuration")}{" "}
<span className="text-primary">
{analysis.duration_observed}s
</span>
</div>
<div>
{t("cameras.info.keyframes.gap")}{" "}
<span className="text-primary">
{analysis.min_gap}s / {analysis.mean_gap}s / {analysis.max_gap}s
</span>
</div>
<div>
{t("cameras.info.keyframes.segmentLength")}{" "}
<span className="text-primary">{analysis.segment_time}s</span>
</div>
</div>
)}
<div className={hasDetails ? "mt-3" : undefined}>{summary}</div>
</div>
);
}, [analysis, failed, secondsRemaining, t]);
return (
<div className="mb-5">
<div className="mb-1 rounded-md bg-secondary p-2 text-lg text-primary">
{t("cameras.info.keyframes.title")}
</div>
<div className="ml-2">{content}</div>
</div>
);
}
type RowProps = {
icon: "ok" | "warning" | "error" | "unknown";
children: React.ReactNode;
};
function Row({ icon, children }: RowProps) {
return (
<div className="flex items-start gap-2">
{icon === "ok" && (
<FaCircleCheck className="mt-0.5 size-4 flex-shrink-0 text-success" />
)}
{icon === "warning" && (
<FaTriangleExclamation className="mt-0.5 size-4 flex-shrink-0 text-yellow-500" />
)}
{icon === "error" && (
<LuX className="mt-0.5 size-4 flex-shrink-0 text-danger" />
)}
{icon === "unknown" && (
<FaTriangleExclamation className="mt-0.5 size-4 flex-shrink-0 text-muted-foreground" />
)}
<span className="text-primary">{children}</span>
</div>
);
}

View File

@ -13,7 +13,6 @@ import { ChatComposer } from "@/components/chat/ChatComposer";
import ChatSettings from "@/components/chat/ChatSettings";
import type {
ChatMessage,
ChatStats,
GenAIModelsResponse,
ShowStatsMode,
} from "@/types/chat";
@ -23,28 +22,12 @@ import {
getFindSimilarObjectsFromToolCalls,
prependAttachment,
streamChatCompletion,
toolCallsForMessage,
toolResponsesById,
} from "@/utils/chatUtil";
type StreamingTurn = {
content: string;
reasoning: string;
chain: ChatMessage[];
stats?: ChatStats;
};
const hasText = (content: unknown): content is string =>
typeof content === "string" && content.trim().length > 0;
const toWire = (messages: ChatMessage[]): ChatMessage[] =>
messages.map(({ reasoning: _r, stats: _s, ...rest }) => rest);
export default function ChatPage() {
const { t } = useTranslation(["views/chat"]);
const [input, setInput] = useState("");
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [streaming, setStreaming] = useState<StreamingTurn | null>(null);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<string | null>(null);
const [attachedEventId, setAttachedEventId] = useState<string | null>(null);
@ -89,19 +72,28 @@ export default function ChatPage() {
if (isNearBottom) {
el.scrollTo({ top: el.scrollHeight, behavior: "smooth" });
}
}, [messages, streaming, autoScroll]);
}, [messages, autoScroll]);
const submitConversation = useCallback(
async (messagesToSend: ChatMessage[]) => {
if (isLoading) return;
const last = messagesToSend[messagesToSend.length - 1];
if (!last || last.role !== "user" || !hasText(last.content)) return;
if (!last || last.role !== "user" || !last.content.trim()) return;
setError(null);
setMessages(messagesToSend);
setStreaming({ content: "", reasoning: "", chain: [] });
const assistantPlaceholder: ChatMessage = {
role: "assistant",
content: "",
toolCalls: undefined,
};
setMessages([...messagesToSend, assistantPlaceholder]);
setIsLoading(true);
const apiMessages = messagesToSend.map((m) => ({
role: m.role,
content: m.content,
}));
const baseURL = axios.defaults.baseURL ?? "";
const url = `${baseURL}chat/completion`;
const headers: Record<string, string> = {
@ -112,50 +104,16 @@ export default function ChatPage() {
const controller = new AbortController();
abortRef.current = controller;
let chain: ChatMessage[] = [];
let stats: ChatStats | undefined;
let reasoning = "";
let hadError = false;
await streamChatCompletion(
url,
headers,
toWire(messagesToSend),
apiMessages,
{
onContentDelta: (delta) =>
setStreaming((s) => (s ? { ...s, content: s.content + delta } : s)),
onReasoningDelta: (delta) => {
reasoning += delta;
setStreaming((s) =>
s ? { ...s, reasoning: s.reasoning + delta } : s,
);
},
onChain: (fullChain) => {
chain = fullChain;
setStreaming((s) => (s ? { ...s, chain: fullChain } : s));
},
onStats: (s) => {
stats = s;
setStreaming((cur) => (cur ? { ...cur, stats: s } : cur));
},
onError: (message) => {
hadError = true;
setError(message);
},
updateMessages: (updater) => setMessages(updater),
onError: (message) => setError(message),
onDone: () => {
abortRef.current = null;
setIsLoading(false);
setStreaming(null);
const lastMsg = chain[chain.length - 1];
if (!hadError && lastMsg?.role === "assistant") {
setMessages(
chain.map((m, i) =>
i === chain.length - 1
? { ...m, reasoning: reasoning || undefined, stats }
: m,
),
);
}
},
defaultErrorMessage: t("error"),
},
@ -167,14 +125,12 @@ export default function ChatPage() {
);
const recentEventIds = useMemo(() => {
const responses = toolResponsesById(messages);
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role !== "assistant" || !msg.tool_calls?.length) continue;
const calls = toolCallsForMessage(msg, responses);
const similar = getFindSimilarObjectsFromToolCalls(calls);
if (msg.role !== "assistant" || !msg.toolCalls) continue;
const similar = getFindSimilarObjectsFromToolCalls(msg.toolCalls);
if (similar) return similar.results.map((e) => e.id);
const events = getEventIdsFromSearchObjectsToolCalls(calls);
const events = getEventIdsFromSearchObjectsToolCalls(msg.toolCalls);
if (events.length > 0) return events.map((e) => e.id);
}
return [];
@ -198,14 +154,12 @@ export default function ChatPage() {
abortRef.current?.abort();
abortRef.current = null;
setIsLoading(false);
setStreaming(null);
}, []);
const startNewChat = useCallback(() => {
abortRef.current?.abort();
abortRef.current = null;
setIsLoading(false);
setStreaming(null);
setMessages([]);
setInput("");
setAttachedEventId(null);
@ -227,83 +181,7 @@ export default function ChatPage() {
setAttachedEventId(null);
}, []);
const hasStarted = messages.length > 0 || streaming != null;
// While streaming, the backend's in-flight chain is the source of truth;
// otherwise the committed conversation is.
const renderList =
streaming && streaming.chain.length ? streaming.chain : messages;
const responses = toolResponsesById(renderList);
const renderTail = renderList[renderList.length - 1];
const finalShown =
renderTail?.role === "assistant" && hasText(renderTail.content);
const renderMessage = (msg: ChatMessage, i: number) => {
if (msg.role === "system" || msg.role === "tool") return null;
if (msg.role === "user") {
if (!hasText(msg.content)) return null;
return (
<div key={i} className="flex flex-col gap-2">
<MessageBubble
role="user"
content={msg.content}
messageIndex={i}
onEditSubmit={handleEditSubmit}
isComplete
showStats={showStats}
/>
</div>
);
}
const calls = toolCallsForMessage(msg, responses);
const contentText = hasText(msg.content) ? msg.content : "";
const similar = getFindSimilarObjectsFromToolCalls(calls);
const events = similar ? [] : getEventIdsFromSearchObjectsToolCalls(calls);
return (
<div key={i} className="flex flex-col gap-2">
{calls.length > 0 && <ToolCallsGroup toolCalls={calls} />}
{hasText(msg.reasoning) && (
<ReasoningBubble
reasoning={msg.reasoning}
answerStarted={!!contentText}
/>
)}
{contentText && (
<MessageBubble
role="assistant"
content={contentText}
messageIndex={i}
isComplete
stats={msg.stats}
showStats={showStats}
/>
)}
{similar ? (
<ChatEventThumbnailsRow
events={similar.results}
anchor={similar.anchor}
onAttach={setAttachedEventId}
/>
) : (
<ChatEventThumbnailsRow
events={events}
onAttach={setAttachedEventId}
/>
)}
</div>
);
};
const processingDots = (
<div className="flex items-center gap-2 self-start rounded-2xl bg-muted px-5 py-4">
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.32s]" />
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.16s]" />
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60" />
</div>
);
const hasStarted = messages.length > 0;
return (
<div className="flex size-full flex-col">
@ -334,31 +212,102 @@ export default function ChatPage() {
<div className="flex w-full flex-col xl:w-[50%] 3xl:w-[35%]">
{hasStarted ? (
<div className="flex w-full flex-1 flex-col gap-3 pb-3">
{renderList.map((msg, i) => renderMessage(msg, i))}
{streaming &&
!finalShown &&
(streaming.content || streaming.reasoning ? (
<div className="flex flex-col gap-2">
{hasText(streaming.reasoning) && (
{messages.map((msg, i) => {
const isLastAssistant =
i === messages.length - 1 && msg.role === "assistant";
const isComplete =
msg.role === "user" || !isLoading || !isLastAssistant;
const hasToolCalls =
msg.toolCalls && msg.toolCalls.length > 0;
const hasContent = !!msg.content?.trim();
const hasReasoning = !!msg.reasoning?.trim();
const showProcessing =
isLastAssistant &&
isLoading &&
!hasContent &&
!hasReasoning;
// Hide empty placeholder only when there are no tool calls
// and no reasoning streaming yet
if (
isLastAssistant &&
isLoading &&
!hasContent &&
!hasToolCalls &&
!hasReasoning
)
return (
<div
key={i}
className="flex items-center gap-2 self-start rounded-2xl bg-muted px-5 py-4"
>
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.32s]" />
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.16s]" />
<span className="size-2.5 animate-bounce rounded-full bg-muted-foreground/60" />
</div>
);
return (
<div key={i} className="flex flex-col gap-2">
{msg.role === "assistant" && hasToolCalls && (
<ToolCallsGroup toolCalls={msg.toolCalls!} />
)}
{msg.role === "assistant" && hasReasoning && (
<ReasoningBubble
reasoning={streaming.reasoning}
answerStarted={!!streaming.content}
reasoning={msg.reasoning!}
answerStarted={hasContent}
/>
)}
{streaming.content && (
{showProcessing ? (
<div className="flex items-center gap-2 self-start rounded-2xl bg-muted px-5 py-4">
<span className="size-2 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.3s]" />
<span className="size-2 animate-bounce rounded-full bg-muted-foreground/60 [animation-delay:-0.15s]" />
<span className="size-2 animate-bounce rounded-full bg-muted-foreground/60" />
</div>
) : msg.role === "assistant" &&
!hasContent &&
hasReasoning &&
!isComplete ? null : (
<MessageBubble
role="assistant"
content={streaming.content}
messageIndex={-1}
isComplete={false}
stats={streaming.stats}
role={msg.role}
content={msg.content}
messageIndex={i}
onEditSubmit={
msg.role === "user" ? handleEditSubmit : undefined
}
isComplete={isComplete}
stats={msg.stats}
showStats={showStats}
/>
)}
{msg.role === "assistant" &&
isComplete &&
(() => {
const similar = getFindSimilarObjectsFromToolCalls(
msg.toolCalls,
);
if (similar) {
return (
<ChatEventThumbnailsRow
events={similar.results}
anchor={similar.anchor}
onAttach={setAttachedEventId}
/>
);
}
const events = getEventIdsFromSearchObjectsToolCalls(
msg.toolCalls,
);
return (
<ChatEventThumbnailsRow
events={events}
onAttach={setAttachedEventId}
/>
);
})()}
</div>
) : (
processingDots
))}
);
})}
{error && (
<p
className="flex items-center gap-1.5 self-start text-sm text-destructive"

View File

@ -1,30 +1,17 @@
export type ToolCallFunction = {
name: string;
arguments: string;
};
export type WireToolCall = {
id: string;
type?: string;
function: ToolCallFunction;
};
export type ChatMessage = {
role: "system" | "user" | "assistant" | "tool";
content: unknown;
tool_call_id?: string;
name?: string;
tool_calls?: WireToolCall[];
reasoning?: string;
stats?: ChatStats;
};
export type ToolCall = {
name: string;
arguments?: Record<string, unknown>;
response?: string;
};
export type ChatMessage = {
role: "user" | "assistant";
content: string;
reasoning?: string;
toolCalls?: ToolCall[];
stats?: ChatStats;
};
export type StartingRequest = {
label: string;
prompt: string;

View File

@ -135,22 +135,3 @@ export type Ffprobe = {
}[];
};
};
export type KeyframeSeverity =
| "ok"
| "warning"
| "error"
| "unknown"
| "record_disabled";
export type KeyframeAnalysis = {
severity: KeyframeSeverity;
stream_index?: number;
keyframe_count?: number;
max_gap?: number | null;
mean_gap?: number | null;
min_gap?: number | null;
duration_observed?: number | null;
segment_time?: number;
thresholds?: { warning: number; error: number };
};

View File

@ -1,20 +1,16 @@
import type { ChatMessage, ChatStats, ToolCall } from "@/types/chat";
export type StreamChatCallbacks = {
/** Streamed delta of the assistant's final answer text. */
onContentDelta: (delta: string) => void;
/** Streamed delta of the assistant's reasoning trace. */
onReasoningDelta: (delta: string) => void;
/** The full conversation chain so far (system message, history, this turn's
* tool-call turns, tool results, and on the final emission the final
* assistant message). */
onChain: (chain: ChatMessage[]) => void;
/** Token/timing stats for the turn. */
onStats: (stats: ChatStats) => void;
/** Update the messages array (e.g. pass to setState). */
updateMessages: (updater: (prev: ChatMessage[]) => ChatMessage[]) => void;
/** Called when the stream sends an error or fetch fails. */
onError: (message: string) => void;
/** Called when the stream finishes (success or error). */
onDone: () => void;
/** Called when the stream emits token/timing stats. The stats are also
* attached to the last assistant message in updateMessages, so consumers
* can usually rely on the message itself rather than wiring this up. */
onStats?: (stats: ChatStats) => void;
/** Message used when fetch throws and no server error is available. */
defaultErrorMessage?: string;
};
@ -29,7 +25,7 @@ type StatsChunk = {
type StreamChunk =
| { type: "error"; error: string }
| { type: "messages"; messages: ChatMessage[] }
| { type: "tool_calls"; tool_calls: ToolCall[] }
| { type: "content"; delta: string }
| { type: "reasoning"; delta: string }
| StatsChunk;
@ -45,18 +41,16 @@ export type StreamChatOptions = {
export async function streamChatCompletion(
url: string,
headers: Record<string, string>,
apiMessages: ChatMessage[],
apiMessages: { role: string; content: string }[],
callbacks: StreamChatCallbacks,
signal?: AbortSignal,
options: StreamChatOptions = {},
): Promise<void> {
const {
onContentDelta,
onReasoningDelta,
onChain,
onStats,
updateMessages,
onError,
onDone,
onStats,
defaultErrorMessage = "Something went wrong. Please try again.",
} = callbacks;
@ -97,27 +91,65 @@ export async function streamChatCompletion(
const applyChunk = (data: StreamChunk) => {
if (data.type === "error") {
onError(data.error);
updateMessages((prev) =>
prev.filter((m) => !(m.role === "assistant" && m.content === "")),
);
return "break";
}
if (data.type === "messages") {
onChain(data.messages ?? []);
if (data.type === "tool_calls" && data.tool_calls?.length) {
updateMessages((prev) => {
const next = [...prev];
const lastMsg = next[next.length - 1];
if (lastMsg?.role === "assistant")
next[next.length - 1] = {
...lastMsg,
toolCalls: data.tool_calls,
};
return next;
});
return "continue";
}
if (data.type === "content" && data.delta !== undefined) {
onContentDelta(data.delta);
updateMessages((prev) => {
const next = [...prev];
const lastMsg = next[next.length - 1];
if (lastMsg?.role === "assistant")
next[next.length - 1] = {
...lastMsg,
content: lastMsg.content + data.delta,
};
return next;
});
return "continue";
}
if (data.type === "reasoning" && data.delta !== undefined) {
onReasoningDelta(data.delta);
updateMessages((prev) => {
const next = [...prev];
const lastMsg = next[next.length - 1];
if (lastMsg?.role === "assistant")
next[next.length - 1] = {
...lastMsg,
reasoning: (lastMsg.reasoning ?? "") + data.delta,
};
return next;
});
return "continue";
}
if (data.type === "stats") {
onStats({
const stats: ChatStats = {
promptTokens: data.prompt_tokens,
completionTokens: data.completion_tokens,
completionDurationMs: data.completion_duration_ms,
tokensPerSecond: data.tokens_per_second,
};
updateMessages((prev) => {
const next = [...prev];
const lastMsg = next[next.length - 1];
if (lastMsg?.role === "assistant")
next[next.length - 1] = { ...lastMsg, stats };
return next;
});
onStats?.(stats);
return "continue";
}
return "continue";
@ -133,8 +165,9 @@ export async function streamChatCompletion(
const trimmed = line.trim();
if (!trimmed) continue;
try {
const data = JSON.parse(trimmed) as StreamChunk;
if (applyChunk(data) === "break") {
const data = JSON.parse(trimmed) as StreamChunk & { type: string };
const result = applyChunk(data as StreamChunk);
if (result === "break") {
hadStreamError = true;
break;
}
@ -148,63 +181,50 @@ export async function streamChatCompletion(
// Flush remaining buffer
if (!hadStreamError && buffer.trim()) {
try {
const data = JSON.parse(buffer.trim()) as StreamChunk;
applyChunk(data);
const data = JSON.parse(buffer.trim()) as StreamChunk & {
type: string;
delta?: string;
};
if (data.type === "content" && data.delta !== undefined) {
updateMessages((prev) => {
const next = [...prev];
const lastMsg = next[next.length - 1];
if (lastMsg?.role === "assistant")
next[next.length - 1] = {
...lastMsg,
content: lastMsg.content + data.delta!,
};
return next;
});
}
} catch {
// ignore final malformed chunk
}
}
if (!hadStreamError) {
updateMessages((prev) => {
const next = [...prev];
const lastMsg = next[next.length - 1];
if (lastMsg?.role === "assistant" && lastMsg.content === "")
next[next.length - 1] = { ...lastMsg, content: " " };
return next;
});
}
} catch (err) {
if (err instanceof DOMException && err.name === "AbortError") {
// User stopped generation — not an error
} else {
onError(defaultErrorMessage);
updateMessages((prev) =>
prev.filter((m) => !(m.role === "assistant" && m.content === "")),
);
}
} finally {
onDone();
}
}
/** Map each tool result message to its tool_call_id for response lookup. */
export function toolResponsesById(
messages: ChatMessage[],
): Map<string, string> {
const map = new Map<string, string>();
for (const m of messages) {
if (m.role === "tool" && typeof m.tool_call_id === "string") {
map.set(
m.tool_call_id,
typeof m.content === "string" ? m.content : JSON.stringify(m.content),
);
}
}
return map;
}
/** Derive the display tool calls for one assistant message. */
export function toolCallsForMessage(
message: ChatMessage,
responses: Map<string, string>,
): ToolCall[] {
if (!message.tool_calls?.length) return [];
return message.tool_calls.map((tc) => {
let args: Record<string, unknown> | undefined;
const raw = tc.function?.arguments;
if (typeof raw === "string") {
try {
args = JSON.parse(raw) as Record<string, unknown>;
} catch {
args = undefined;
}
}
return {
name: tc.function?.name ?? "",
arguments: args,
response: responses.get(tc.id),
};
});
}
/**
* Parse search_objects tool call response(s) into event ids for thumbnails.
*/