diff --git a/web/src/api/WsProvider.tsx b/web/src/api/WsProvider.tsx new file mode 100644 index 000000000..0bab68b0c --- /dev/null +++ b/web/src/api/WsProvider.tsx @@ -0,0 +1,81 @@ +import { baseUrl } from "./baseUrl"; +import { ReactNode, useCallback, useEffect, useRef } from "react"; +import { WsSendContext } from "./wsContext"; +import type { Update } from "./wsContext"; +import { bufferRawMessage, resetWsStore } from "./ws"; + +export function WsProvider({ children }: { children: ReactNode }) { + const wsUrl = `${baseUrl.replace(/^http/, "ws")}ws`; + const wsRef = useRef(null); + const reconnectTimer = useRef | null>(null); + const reconnectAttempt = useRef(0); + const unmounted = useRef(false); + + const sendJsonMessage = useCallback((msg: unknown) => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify(msg)); + } + }, []); + + useEffect(() => { + unmounted.current = false; + + function connect() { + if (unmounted.current) return; + + const ws = new WebSocket(wsUrl); + wsRef.current = ws; + + ws.onopen = () => { + reconnectAttempt.current = 0; + ws.send( + JSON.stringify({ topic: "onConnect", message: "", retain: false }), + ); + }; + + // Near-zero-cost handler: just buffer the raw string. + // All JSON.parse + state updates happen in a single rAF in ws.ts, + // giving React uninterrupted CPU time between frames. + ws.onmessage = (event: MessageEvent) => { + bufferRawMessage(event.data as string); + }; + + ws.onclose = () => { + if (unmounted.current) return; + const delay = Math.min(1000 * 2 ** reconnectAttempt.current, 30000); + reconnectAttempt.current++; + reconnectTimer.current = setTimeout(connect, delay); + }; + + ws.onerror = () => { + ws.close(); + }; + } + + connect(); + + return () => { + unmounted.current = true; + if (reconnectTimer.current) { + clearTimeout(reconnectTimer.current); + } + wsRef.current?.close(); + resetWsStore(); + }; + }, [wsUrl]); + + const send = useCallback( + (message: Update) => { + sendJsonMessage({ + topic: message.topic, + payload: message.payload, + retain: message.retain, + }); + }, + [sendJsonMessage], + ); + + return ( + {children} + ); +} diff --git a/web/src/api/index.tsx b/web/src/api/index.tsx index e5c5617ab..41cb7d24c 100644 --- a/web/src/api/index.tsx +++ b/web/src/api/index.tsx @@ -1,6 +1,6 @@ import { baseUrl } from "./baseUrl"; import { SWRConfig } from "swr"; -import { WsProvider } from "./ws"; +import { WsProvider } from "./WsProvider"; import axios from "axios"; import { ReactNode } from "react"; import { isRedirectingToLogin, setRedirectingToLogin } from "./auth-redirect"; diff --git a/web/src/api/ws.tsx b/web/src/api/ws.ts similarity index 57% rename from web/src/api/ws.tsx rename to web/src/api/ws.ts index 07d44d67a..d198eda4c 100644 --- a/web/src/api/ws.tsx +++ b/web/src/api/ws.ts @@ -1,6 +1,11 @@ -import { baseUrl } from "./baseUrl"; -import { useCallback, useEffect, useRef, useState } from "react"; -import useWebSocket, { ReadyState } from "react-use-websocket"; +import { + useCallback, + useContext, + useEffect, + useMemo, + useRef, + useSyncExternalStore, +} from "react"; import { EmbeddingsReindexProgressType, FrigateCameraState, @@ -14,8 +19,11 @@ import { Job, } from "@/types/ws"; import { FrigateStats } from "@/types/stats"; -import { createContainer } from "react-tracked"; -import useDeepMemo from "@/hooks/use-deep-memo"; +import { isEqual } from "lodash"; +import { WsSendContext } from "./wsContext"; +import type { Update, WsSend } from "./wsContext"; + +export type { Update }; export type WsFeedMessage = { topic: string; @@ -24,170 +32,277 @@ export type WsFeedMessage = { id: string; }; -type Update = { - topic: string; - payload: unknown; - retain: boolean; -}; - type WsState = { [topic: string]: unknown; }; -type useValueReturn = [WsState, (update: Update) => void]; +// --------------------------------------------------------------------------- +// External store for WebSocket state using useSyncExternalStore +// +// Per-topic subscriptions ensure only subscribers of a changed topic are +// notified, avoiding O(messages × total_subscribers) snapshot checks. +// +// Updates are batched via requestAnimationFrame so a burst of WS messages +// (e.g. on initial connect) results in a single React render pass. +// --------------------------------------------------------------------------- + +type Listener = () => void; + +const wsState: WsState = {}; +const wsTopicListeners = new Map>(); + +// --- rAF drain + flush --- +// +// The onmessage handler in WsProvider pushes raw event.data strings into +// rawMessageBuffer (sub-microsecond cost). A single requestAnimationFrame +// callback drains the buffer, JSON-parses each message, applies state updates, +// and notifies React subscribers — all in one burst per frame. This gives +// React uninterrupted CPU time between frames. + +const rawMessageBuffer: string[] = []; +let pendingUpdates: Record | null = null; +let pendingFeedMessages: WsFeedMessage[] = []; +let flushScheduled = false; + +/** + * Reset all module-level state. Called on WsProvider unmount to prevent + * stale data from leaking across mount/unmount cycles (e.g. HMR, logout). + */ +export function resetWsStore() { + for (const key of Object.keys(wsState)) { + delete wsState[key]; + } + wsTopicListeners.clear(); + rawMessageBuffer.length = 0; + pendingUpdates = null; + pendingFeedMessages = []; + flushScheduled = false; + lastCameraActivityPayload = null; + wsMessageSubscribers.clear(); + wsMessageIdCounter = 0; +} + +export function bufferRawMessage(data: string) { + rawMessageBuffer.push(data); + if (!flushScheduled) { + flushScheduled = true; + requestAnimationFrame(drainAndFlush); + } +} + +function drainAndFlush() { + flushScheduled = false; + + // 1. Drain raw buffer → ingest messages + if (rawMessageBuffer.length > 0) { + const batch = rawMessageBuffer.splice(0); + for (const raw of batch) { + const data: Update = JSON.parse(raw); + if (data) { + ingestMessage(data); + } + } + } + + // 2. Flush state updates to React + if (pendingUpdates) { + const updates = pendingUpdates; + pendingUpdates = null; + + for (const [topic, newVal] of Object.entries(updates)) { + const oldVal = wsState[topic]; + // Fast path: === for primitives ("ON"/"OFF", numbers). + // Fall back to isEqual for objects/arrays. + const unchanged = + oldVal === newVal || + (typeof newVal === "object" && + newVal !== null && + isEqual(oldVal, newVal)); + if (!unchanged) { + wsState[topic] = newVal; + // Snapshot the Set — a listener may trigger unmount that modifies it. + const listeners = wsTopicListeners.get(topic); + if (listeners) { + for (const l of Array.from(listeners)) l(); + } + } + } + } + + // 3. Deliver feed messages + if (pendingFeedMessages.length > 0 && wsMessageSubscribers.size > 0) { + const msgs = pendingFeedMessages; + pendingFeedMessages = []; + for (const msg of msgs) { + wsMessageSubscribers.forEach((cb) => cb(msg)); + } + } else { + pendingFeedMessages = []; + } +} + +function ingestMessage(data: Update) { + if (!pendingUpdates) pendingUpdates = {}; + pendingUpdates[data.topic] = data.payload; + + if (data.topic === "camera_activity") { + expandCameraActivity(data.payload as string, pendingUpdates); + } + + if (wsMessageSubscribers.size > 0) { + pendingFeedMessages.push({ + topic: data.topic, + payload: data.payload, + timestamp: Date.now(), + id: String(wsMessageIdCounter++), + }); + } +} + +// --- Subscriptions --- + +export function subscribeWsTopic( + topic: string, + listener: Listener, +): () => void { + let set = wsTopicListeners.get(topic); + if (!set) { + set = new Set(); + wsTopicListeners.set(topic, set); + } + set.add(listener); + return () => { + set!.delete(listener); + if (set!.size === 0) wsTopicListeners.delete(topic); + }; +} + +export function getWsTopicValue(topic: string): unknown { + return wsState[topic]; +} + +// --------------------------------------------------------------------------- +// Feed message subscribers +// --------------------------------------------------------------------------- const wsMessageSubscribers = new Set<(msg: WsFeedMessage) => void>(); let wsMessageIdCounter = 0; -function useValue(): useValueReturn { - const wsUrl = `${baseUrl.replace(/^http/, "ws")}ws`; +// --------------------------------------------------------------------------- +// Camera activity expansion +// --------------------------------------------------------------------------- - // main state +// Cache the last raw camera_activity JSON string so we can skip JSON.parse +// and the entire expansion when nothing has changed. This avoids creating +// fresh objects (which defeat Object.is and force expensive isEqual deep +// traversals) on every flush — critical with many cameras. +let lastCameraActivityPayload: string | null = null; - const [wsState, setWsState] = useState({}); +function expandCameraActivity( + payload: string, + updates: Record, +) { + // Fast path: if the raw JSON string is identical, nothing changed. + if (payload === lastCameraActivityPayload) { + // Remove camera_activity from pending updates so flushPendingUpdates + // doesn't run isEqual on a freshly-parsed object that's identical. + delete updates["camera_activity"]; + return; + } + lastCameraActivityPayload = payload; - useEffect(() => { - const activityValue: string = wsState["camera_activity"] as string; + let activity: { [key: string]: Partial }; - if (!activityValue) { - return; - } + try { + activity = JSON.parse(payload); + } catch { + return; + } - let cameraActivity: { [key: string]: Partial }; + // Remove the root topic — no component reads the monolithic object. + // Only per-camera subtopics and per-setting primitives are stored. + delete updates["camera_activity"]; - try { - cameraActivity = JSON.parse(activityValue); - } catch { - return; - } + if (Object.keys(activity).length === 0) return; - if (Object.keys(cameraActivity).length === 0) { - return; - } + for (const [name, state] of Object.entries(activity)) { + // Notify per-camera subtopic specifically + updates[`camera_activity/${name}`] = state; - const cameraStates: WsState = {}; + const cameraConfig = state?.config; + if (!cameraConfig) continue; - Object.entries(cameraActivity).forEach(([name, state]) => { - const cameraConfig = state?.config; + const { + record, + detect, + enabled, + snapshots, + audio, + audio_transcription, + notifications, + notifications_suspended, + autotracking, + alerts, + detections, + object_descriptions, + review_descriptions, + } = cameraConfig; - if (!cameraConfig) { - return; - } - - const { - record, - detect, - enabled, - snapshots, - audio, - audio_transcription, - notifications, - notifications_suspended, - autotracking, - alerts, - detections, - object_descriptions, - review_descriptions, - } = cameraConfig; - cameraStates[`${name}/recordings/state`] = record ? "ON" : "OFF"; - cameraStates[`${name}/enabled/state`] = enabled ? "ON" : "OFF"; - cameraStates[`${name}/detect/state`] = detect ? "ON" : "OFF"; - cameraStates[`${name}/snapshots/state`] = snapshots ? "ON" : "OFF"; - cameraStates[`${name}/audio/state`] = audio ? "ON" : "OFF"; - cameraStates[`${name}/audio_transcription/state`] = audio_transcription - ? "ON" - : "OFF"; - cameraStates[`${name}/notifications/state`] = notifications - ? "ON" - : "OFF"; - cameraStates[`${name}/notifications/suspended`] = - notifications_suspended || 0; - cameraStates[`${name}/ptz_autotracker/state`] = autotracking - ? "ON" - : "OFF"; - cameraStates[`${name}/review_alerts/state`] = alerts ? "ON" : "OFF"; - cameraStates[`${name}/review_detections/state`] = detections - ? "ON" - : "OFF"; - cameraStates[`${name}/object_descriptions/state`] = object_descriptions - ? "ON" - : "OFF"; - cameraStates[`${name}/review_descriptions/state`] = review_descriptions - ? "ON" - : "OFF"; - }); - - setWsState((prevState) => ({ - ...prevState, - ...cameraStates, - })); - - // we only want this to run initially when the config is loaded - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [wsState["camera_activity"]]); - - // ws handler - const { sendJsonMessage, readyState } = useWebSocket(wsUrl, { - onMessage: (event) => { - const data: Update = JSON.parse(event.data); - - if (data) { - setWsState((prevState) => ({ - ...prevState, - [data.topic]: data.payload, - })); - - // Notify feed subscribers - if (wsMessageSubscribers.size > 0) { - const feedMsg: WsFeedMessage = { - topic: data.topic, - payload: data.payload, - timestamp: Date.now(), - id: String(wsMessageIdCounter++), - }; - wsMessageSubscribers.forEach((cb) => cb(feedMsg)); - } - } - }, - onOpen: () => { - sendJsonMessage({ - topic: "onConnect", - message: "", - retain: false, - }); - }, - onClose: () => {}, - shouldReconnect: () => true, - retryOnError: true, - }); - - const setState = useCallback( - (message: Update) => { - if (readyState === ReadyState.OPEN) { - sendJsonMessage({ - topic: message.topic, - payload: message.payload, - retain: message.retain, - }); - } - }, - [readyState, sendJsonMessage], - ); - - return [wsState, setState]; + updates[`${name}/recordings/state`] = record ? "ON" : "OFF"; + updates[`${name}/enabled/state`] = enabled ? "ON" : "OFF"; + updates[`${name}/detect/state`] = detect ? "ON" : "OFF"; + updates[`${name}/snapshots/state`] = snapshots ? "ON" : "OFF"; + updates[`${name}/audio/state`] = audio ? "ON" : "OFF"; + updates[`${name}/audio_transcription/state`] = audio_transcription + ? "ON" + : "OFF"; + updates[`${name}/notifications/state`] = notifications ? "ON" : "OFF"; + updates[`${name}/notifications/suspended`] = notifications_suspended || 0; + updates[`${name}/ptz_autotracker/state`] = autotracking ? "ON" : "OFF"; + updates[`${name}/review_alerts/state`] = alerts ? "ON" : "OFF"; + updates[`${name}/review_detections/state`] = detections ? "ON" : "OFF"; + updates[`${name}/object_descriptions/state`] = object_descriptions + ? "ON" + : "OFF"; + updates[`${name}/review_descriptions/state`] = review_descriptions + ? "ON" + : "OFF"; + } } -export const { - Provider: WsProvider, - useTrackedState: useWsState, - useUpdate: useWsUpdate, -} = createContainer(useValue, { defaultState: {}, concurrentMode: true }); +// --------------------------------------------------------------------------- +// Hooks +// --------------------------------------------------------------------------- +/** + * Get the send function for publishing WS messages. + */ +export function useWsUpdate(): WsSend { + const send = useContext(WsSendContext); + if (!send) { + throw new Error("useWsUpdate must be used within WsProvider"); + } + return send; +} + +/** + * Subscribe to a single WS topic with proper bail-out. + * Only re-renders when the topic's value changes (Object.is comparison). + * Uses useSyncExternalStore — zero useEffect, so no PassiveMask flags + * propagate through the fiber tree. + */ export function useWs(watchTopic: string, publishTopic: string) { - const state = useWsState(); + const payload = useSyncExternalStore( + useCallback( + (listener: Listener) => subscribeWsTopic(watchTopic, listener), + [watchTopic], + ), + useCallback(() => wsState[watchTopic], [watchTopic]), + ); + const sendJsonMessage = useWsUpdate(); - const value = { payload: state[watchTopic] || null }; + const value = { payload: payload ?? null }; const send = useCallback( (payload: unknown, retain = false) => { @@ -203,6 +318,10 @@ export function useWs(watchTopic: string, publishTopic: string) { return { value, send }; } +// --------------------------------------------------------------------------- +// Convenience hooks +// --------------------------------------------------------------------------- + export function useEnabledState(camera: string): { payload: ToggleableSetting; send: (payload: ToggleableSetting, retain?: boolean) => void; @@ -413,28 +532,42 @@ export function useFrigateEvents(): { payload: FrigateEvent } { const { value: { payload }, } = useWs("events", ""); - return { payload: JSON.parse(payload as string) }; + const parsed = useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); + return { payload: parsed }; } export function useAudioDetections(): { payload: FrigateAudioDetections } { const { value: { payload }, } = useWs("audio_detections", ""); - return { payload: JSON.parse(payload as string) }; + const parsed = useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); + return { payload: parsed }; } export function useFrigateReviews(): FrigateReview { const { value: { payload }, } = useWs("reviews", ""); - return useDeepMemo(JSON.parse(payload as string)); + return useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); } export function useFrigateStats(): FrigateStats { const { value: { payload }, } = useWs("stats", ""); - return useDeepMemo(JSON.parse(payload as string)); + return useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); } export function useInitialCameraState( @@ -446,32 +579,32 @@ export function useInitialCameraState( const { value: { payload }, send: sendCommand, - } = useWs("camera_activity", "onConnect"); + } = useWs(`camera_activity/${camera}`, "onConnect"); - const data = useDeepMemo(JSON.parse(payload as string)); + // camera_activity sub-topic payload is already parsed by expandCameraActivity + const data = payload as FrigateCameraState | undefined; + // onConnect is sent once in WsProvider.onopen — no need to re-request on + // every component mount. Components read cached wsState immediately via + // useSyncExternalStore. Only re-request when the user tabs back in. useEffect(() => { - let listener = undefined; - if (revalidateOnFocus) { - sendCommand("onConnect"); - listener = () => { - if (document.visibilityState == "visible") { - sendCommand("onConnect"); - } - }; - addEventListener("visibilitychange", listener); - } + sendCommand("onConnect"); + if (!revalidateOnFocus) return; - return () => { - if (listener) { - removeEventListener("visibilitychange", listener); + const listener = () => { + if (document.visibilityState === "visible") { + sendCommand("onConnect"); } }; - // only refresh when onRefresh value changes + addEventListener("visibilitychange", listener); + + return () => { + removeEventListener("visibilitychange", listener); + }; // eslint-disable-next-line react-hooks/exhaustive-deps }, [revalidateOnFocus]); - return { payload: data ? data[camera] : undefined }; + return { payload: data as FrigateCameraState }; } export function useModelState( @@ -483,7 +616,10 @@ export function useModelState( send: sendCommand, } = useWs("model_state", "modelState"); - const data = useDeepMemo(JSON.parse(payload as string)); + const data = useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); useEffect(() => { let listener = undefined; @@ -519,7 +655,10 @@ export function useEmbeddingsReindexProgress( send: sendCommand, } = useWs("embeddings_reindex_progress", "embeddingsReindexProgress"); - const data = useDeepMemo(JSON.parse(payload as string)); + const data = useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); useEffect(() => { let listener = undefined; @@ -553,8 +692,9 @@ export function useAudioTranscriptionProcessState( send: sendCommand, } = useWs("audio_transcription_state", "audioTranscriptionState"); - const data = useDeepMemo( - payload ? (JSON.parse(payload as string) as string) : "idle", + const data = useMemo( + () => (payload ? (JSON.parse(payload as string) as string) : "idle"), + [payload], ); useEffect(() => { @@ -587,7 +727,10 @@ export function useBirdseyeLayout(revalidateOnFocus: boolean = true): { send: sendCommand, } = useWs("birdseye_layout", "birdseyeLayout"); - const data = useDeepMemo(JSON.parse(payload as string)); + const data = useMemo( + () => (payload ? JSON.parse(payload as string) : undefined), + [payload], + ); useEffect(() => { let listener = undefined; @@ -684,10 +827,14 @@ export function useTrackedObjectUpdate(): { const { value: { payload }, } = useWs("tracked_object_update", ""); - const parsed = payload - ? JSON.parse(payload as string) - : { type: "", id: "", camera: "" }; - return { payload: useDeepMemo(parsed) }; + const parsed = useMemo( + () => + payload + ? JSON.parse(payload as string) + : { type: "", id: "", camera: "" }, + [payload], + ); + return { payload: parsed }; } export function useNotifications(camera: string): { @@ -730,10 +877,14 @@ export function useTriggers(): { payload: TriggerStatus } { const { value: { payload }, } = useWs("triggers", ""); - const parsed = payload - ? JSON.parse(payload as string) - : { name: "", camera: "", event_id: "", type: "", score: 0 }; - return { payload: useDeepMemo(parsed) }; + const parsed = useMemo( + () => + payload + ? JSON.parse(payload as string) + : { name: "", camera: "", event_id: "", type: "", score: 0 }, + [payload], + ); + return { payload: parsed }; } export function useJobStatus( @@ -745,8 +896,9 @@ export function useJobStatus( send: sendCommand, } = useWs("job_state", "jobState"); - const jobData = useDeepMemo( - payload && typeof payload === "string" ? JSON.parse(payload) : {}, + const jobData = useMemo( + () => (payload && typeof payload === "string" ? JSON.parse(payload) : {}), + [payload], ); const currentJob = jobData[jobType] || null; diff --git a/web/src/api/wsContext.ts b/web/src/api/wsContext.ts new file mode 100644 index 000000000..15fee8ffa --- /dev/null +++ b/web/src/api/wsContext.ts @@ -0,0 +1,11 @@ +import { createContext } from "react"; + +export type Update = { + topic: string; + payload: unknown; + retain: boolean; +}; + +export type WsSend = (update: Update) => void; + +export const WsSendContext = createContext(null);