diff --git a/web/src/api/WsProvider.tsx b/web/src/api/WsProvider.tsx index 0bab68b0c..4e4f72490 100644 --- a/web/src/api/WsProvider.tsx +++ b/web/src/api/WsProvider.tsx @@ -2,7 +2,7 @@ import { baseUrl } from "./baseUrl"; import { ReactNode, useCallback, useEffect, useRef } from "react"; import { WsSendContext } from "./wsContext"; import type { Update } from "./wsContext"; -import { bufferRawMessage, resetWsStore } from "./ws"; +import { processWsMessage, resetWsStore } from "./ws"; export function WsProvider({ children }: { children: ReactNode }) { const wsUrl = `${baseUrl.replace(/^http/, "ws")}ws`; @@ -33,11 +33,8 @@ export function WsProvider({ children }: { children: ReactNode }) { ); }; - // Near-zero-cost handler: just buffer the raw string. - // All JSON.parse + state updates happen in a single rAF in ws.ts, - // giving React uninterrupted CPU time between frames. ws.onmessage = (event: MessageEvent) => { - bufferRawMessage(event.data as string); + processWsMessage(event.data as string); }; ws.onclose = () => { diff --git a/web/src/api/ws.ts b/web/src/api/ws.ts index d198eda4c..1e4806eba 100644 --- a/web/src/api/ws.ts +++ b/web/src/api/ws.ts @@ -36,34 +36,12 @@ type WsState = { [topic: string]: unknown; }; -// --------------------------------------------------------------------------- // External store for WebSocket state using useSyncExternalStore -// -// Per-topic subscriptions ensure only subscribers of a changed topic are -// notified, avoiding O(messages × total_subscribers) snapshot checks. -// -// Updates are batched via requestAnimationFrame so a burst of WS messages -// (e.g. on initial connect) results in a single React render pass. -// --------------------------------------------------------------------------- - type Listener = () => void; const wsState: WsState = {}; const wsTopicListeners = new Map>(); -// --- rAF drain + flush --- -// -// The onmessage handler in WsProvider pushes raw event.data strings into -// rawMessageBuffer (sub-microsecond cost). A single requestAnimationFrame -// callback drains the buffer, JSON-parses each message, applies state updates, -// and notifies React subscribers — all in one burst per frame. This gives -// React uninterrupted CPU time between frames. - -const rawMessageBuffer: string[] = []; -let pendingUpdates: Record | null = null; -let pendingFeedMessages: WsFeedMessage[] = []; -let flushScheduled = false; - /** * Reset all module-level state. Called on WsProvider unmount to prevent * stale data from leaking across mount/unmount cycles (e.g. HMR, logout). @@ -73,89 +51,53 @@ export function resetWsStore() { delete wsState[key]; } wsTopicListeners.clear(); - rawMessageBuffer.length = 0; - pendingUpdates = null; - pendingFeedMessages = []; - flushScheduled = false; lastCameraActivityPayload = null; wsMessageSubscribers.clear(); wsMessageIdCounter = 0; } -export function bufferRawMessage(data: string) { - rawMessageBuffer.push(data); - if (!flushScheduled) { - flushScheduled = true; - requestAnimationFrame(drainAndFlush); - } -} +/** + * Parse and apply a raw WS message synchronously. + * Called directly from WsProvider's onmessage handler. + */ +export function processWsMessage(raw: string) { + const data: Update = JSON.parse(raw); + if (!data) return; -function drainAndFlush() { - flushScheduled = false; + const { topic, payload } = data; - // 1. Drain raw buffer → ingest messages - if (rawMessageBuffer.length > 0) { - const batch = rawMessageBuffer.splice(0); - for (const raw of batch) { - const data: Update = JSON.parse(raw); - if (data) { - ingestMessage(data); - } - } - } - - // 2. Flush state updates to React - if (pendingUpdates) { - const updates = pendingUpdates; - pendingUpdates = null; - - for (const [topic, newVal] of Object.entries(updates)) { - const oldVal = wsState[topic]; - // Fast path: === for primitives ("ON"/"OFF", numbers). - // Fall back to isEqual for objects/arrays. - const unchanged = - oldVal === newVal || - (typeof newVal === "object" && - newVal !== null && - isEqual(oldVal, newVal)); - if (!unchanged) { - wsState[topic] = newVal; - // Snapshot the Set — a listener may trigger unmount that modifies it. - const listeners = wsTopicListeners.get(topic); - if (listeners) { - for (const l of Array.from(listeners)) l(); - } - } - } - } - - // 3. Deliver feed messages - if (pendingFeedMessages.length > 0 && wsMessageSubscribers.size > 0) { - const msgs = pendingFeedMessages; - pendingFeedMessages = []; - for (const msg of msgs) { - wsMessageSubscribers.forEach((cb) => cb(msg)); - } + if (topic === "camera_activity") { + applyCameraActivity(payload as string); } else { - pendingFeedMessages = []; - } -} - -function ingestMessage(data: Update) { - if (!pendingUpdates) pendingUpdates = {}; - pendingUpdates[data.topic] = data.payload; - - if (data.topic === "camera_activity") { - expandCameraActivity(data.payload as string, pendingUpdates); + applyTopicUpdate(topic, payload); } if (wsMessageSubscribers.size > 0) { - pendingFeedMessages.push({ - topic: data.topic, - payload: data.payload, - timestamp: Date.now(), - id: String(wsMessageIdCounter++), - }); + wsMessageSubscribers.forEach((cb) => + cb({ + topic, + payload, + timestamp: Date.now(), + id: String(wsMessageIdCounter++), + }), + ); + } +} + +function applyTopicUpdate(topic: string, newVal: unknown) { + const oldVal = wsState[topic]; + // Fast path: === for primitives ("ON"/"OFF", numbers). + // Fall back to isEqual for objects/arrays. + const unchanged = + oldVal === newVal || + (typeof newVal === "object" && newVal !== null && isEqual(oldVal, newVal)); + if (unchanged) return; + + wsState[topic] = newVal; + // Snapshot the Set — a listener may trigger unmount that modifies it. + const listeners = wsTopicListeners.get(topic); + if (listeners) { + for (const l of Array.from(listeners)) l(); } } @@ -198,17 +140,9 @@ let wsMessageIdCounter = 0; // traversals) on every flush — critical with many cameras. let lastCameraActivityPayload: string | null = null; -function expandCameraActivity( - payload: string, - updates: Record, -) { +function applyCameraActivity(payload: string) { // Fast path: if the raw JSON string is identical, nothing changed. - if (payload === lastCameraActivityPayload) { - // Remove camera_activity from pending updates so flushPendingUpdates - // doesn't run isEqual on a freshly-parsed object that's identical. - delete updates["camera_activity"]; - return; - } + if (payload === lastCameraActivityPayload) return; lastCameraActivityPayload = payload; let activity: { [key: string]: Partial }; @@ -219,15 +153,10 @@ function expandCameraActivity( return; } - // Remove the root topic — no component reads the monolithic object. - // Only per-camera subtopics and per-setting primitives are stored. - delete updates["camera_activity"]; - if (Object.keys(activity).length === 0) return; for (const [name, state] of Object.entries(activity)) { - // Notify per-camera subtopic specifically - updates[`camera_activity/${name}`] = state; + applyTopicUpdate(`camera_activity/${name}`, state); const cameraConfig = state?.config; if (!cameraConfig) continue; @@ -248,25 +177,40 @@ function expandCameraActivity( review_descriptions, } = cameraConfig; - updates[`${name}/recordings/state`] = record ? "ON" : "OFF"; - updates[`${name}/enabled/state`] = enabled ? "ON" : "OFF"; - updates[`${name}/detect/state`] = detect ? "ON" : "OFF"; - updates[`${name}/snapshots/state`] = snapshots ? "ON" : "OFF"; - updates[`${name}/audio/state`] = audio ? "ON" : "OFF"; - updates[`${name}/audio_transcription/state`] = audio_transcription - ? "ON" - : "OFF"; - updates[`${name}/notifications/state`] = notifications ? "ON" : "OFF"; - updates[`${name}/notifications/suspended`] = notifications_suspended || 0; - updates[`${name}/ptz_autotracker/state`] = autotracking ? "ON" : "OFF"; - updates[`${name}/review_alerts/state`] = alerts ? "ON" : "OFF"; - updates[`${name}/review_detections/state`] = detections ? "ON" : "OFF"; - updates[`${name}/object_descriptions/state`] = object_descriptions - ? "ON" - : "OFF"; - updates[`${name}/review_descriptions/state`] = review_descriptions - ? "ON" - : "OFF"; + applyTopicUpdate(`${name}/recordings/state`, record ? "ON" : "OFF"); + applyTopicUpdate(`${name}/enabled/state`, enabled ? "ON" : "OFF"); + applyTopicUpdate(`${name}/detect/state`, detect ? "ON" : "OFF"); + applyTopicUpdate(`${name}/snapshots/state`, snapshots ? "ON" : "OFF"); + applyTopicUpdate(`${name}/audio/state`, audio ? "ON" : "OFF"); + applyTopicUpdate( + `${name}/audio_transcription/state`, + audio_transcription ? "ON" : "OFF", + ); + applyTopicUpdate( + `${name}/notifications/state`, + notifications ? "ON" : "OFF", + ); + applyTopicUpdate( + `${name}/notifications/suspended`, + notifications_suspended || 0, + ); + applyTopicUpdate( + `${name}/ptz_autotracker/state`, + autotracking ? "ON" : "OFF", + ); + applyTopicUpdate(`${name}/review_alerts/state`, alerts ? "ON" : "OFF"); + applyTopicUpdate( + `${name}/review_detections/state`, + detections ? "ON" : "OFF", + ); + applyTopicUpdate( + `${name}/object_descriptions/state`, + object_descriptions ? "ON" : "OFF", + ); + applyTopicUpdate( + `${name}/review_descriptions/state`, + review_descriptions ? "ON" : "OFF", + ); } }