refactor websockets to remove react-tracked

react 19 removed useReducer eager bailout, which broke react-tracked.

react-tracked works by wrapping state in a JavaScript Proxy. When a component reads state.someField, the proxy records that access. On the next state update, it compares only the fields each component actually touched and skips re-renders if those fields are unchanged. Under the hood, this relies on useReducer — and in React 18, useReducer had an "eager bail-out" that short-circuited rendering when the new state was === to the old state. React 19 removed that optimization, so every dispatch now schedules a render regardless, and the proxy comparison runs too late to prevent it.

useSyncExternalStore is a React primitive (added in 18, stable in 19) designed for exactly this pattern: subscribing to an external store:

useSyncExternalStore(
  subscribe,   // (listener) => unsubscribe — called when the store changes
  getSnapshot  // () => value — returns the current value for this subscriber
)

React calls getSnapshot during render and compares the result with Object.is. If the value is the same reference, the component bails out — no re-render. The key difference from react-tracked is that this bail-out is built into React's reconciler, not bolted on via proxy tricks and useReducer.

The per-topic subscription model makes this efficient. Instead of one global store where every subscriber has to check if their fields changed, each useWs("some/topic", ...) call subscribes only to that topic's listener set. When a message arrives for front_door/detect/state, only components subscribed to that exact topic get their listener fired → React calls their getSnapshot → Object.is compares the value → bail-out if unchanged. Components watching back_yard/detect/state are never even notified.
This commit is contained in:
Josh Hawkins 2026-03-09 08:26:31 -05:00
parent b2c7840c29
commit dc6973dcc2
4 changed files with 426 additions and 182 deletions

View File

@ -0,0 +1,81 @@
import { baseUrl } from "./baseUrl";
import { ReactNode, useCallback, useEffect, useRef } from "react";
import { WsSendContext } from "./wsContext";
import type { Update } from "./wsContext";
import { bufferRawMessage, resetWsStore } from "./ws";
export function WsProvider({ children }: { children: ReactNode }) {
const wsUrl = `${baseUrl.replace(/^http/, "ws")}ws`;
const wsRef = useRef<WebSocket | null>(null);
const reconnectTimer = useRef<ReturnType<typeof setTimeout> | null>(null);
const reconnectAttempt = useRef(0);
const unmounted = useRef(false);
const sendJsonMessage = useCallback((msg: unknown) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(msg));
}
}, []);
useEffect(() => {
unmounted.current = false;
function connect() {
if (unmounted.current) return;
const ws = new WebSocket(wsUrl);
wsRef.current = ws;
ws.onopen = () => {
reconnectAttempt.current = 0;
ws.send(
JSON.stringify({ topic: "onConnect", message: "", retain: false }),
);
};
// Near-zero-cost handler: just buffer the raw string.
// All JSON.parse + state updates happen in a single rAF in ws.ts,
// giving React uninterrupted CPU time between frames.
ws.onmessage = (event: MessageEvent) => {
bufferRawMessage(event.data as string);
};
ws.onclose = () => {
if (unmounted.current) return;
const delay = Math.min(1000 * 2 ** reconnectAttempt.current, 30000);
reconnectAttempt.current++;
reconnectTimer.current = setTimeout(connect, delay);
};
ws.onerror = () => {
ws.close();
};
}
connect();
return () => {
unmounted.current = true;
if (reconnectTimer.current) {
clearTimeout(reconnectTimer.current);
}
wsRef.current?.close();
resetWsStore();
};
}, [wsUrl]);
const send = useCallback(
(message: Update) => {
sendJsonMessage({
topic: message.topic,
payload: message.payload,
retain: message.retain,
});
},
[sendJsonMessage],
);
return (
<WsSendContext.Provider value={send}>{children}</WsSendContext.Provider>
);
}

View File

@ -1,6 +1,6 @@
import { baseUrl } from "./baseUrl";
import { SWRConfig } from "swr";
import { WsProvider } from "./ws";
import { WsProvider } from "./WsProvider";
import axios from "axios";
import { ReactNode } from "react";
import { isRedirectingToLogin, setRedirectingToLogin } from "./auth-redirect";

View File

@ -1,6 +1,11 @@
import { baseUrl } from "./baseUrl";
import { useCallback, useEffect, useRef, useState } from "react";
import useWebSocket, { ReadyState } from "react-use-websocket";
import {
useCallback,
useContext,
useEffect,
useMemo,
useRef,
useSyncExternalStore,
} from "react";
import {
EmbeddingsReindexProgressType,
FrigateCameraState,
@ -14,8 +19,11 @@ import {
Job,
} from "@/types/ws";
import { FrigateStats } from "@/types/stats";
import { createContainer } from "react-tracked";
import useDeepMemo from "@/hooks/use-deep-memo";
import { isEqual } from "lodash";
import { WsSendContext } from "./wsContext";
import type { Update, WsSend } from "./wsContext";
export type { Update };
export type WsFeedMessage = {
topic: string;
@ -24,170 +32,277 @@ export type WsFeedMessage = {
id: string;
};
type Update = {
topic: string;
payload: unknown;
retain: boolean;
};
type WsState = {
[topic: string]: unknown;
};
type useValueReturn = [WsState, (update: Update) => void];
// ---------------------------------------------------------------------------
// External store for WebSocket state using useSyncExternalStore
//
// Per-topic subscriptions ensure only subscribers of a changed topic are
// notified, avoiding O(messages × total_subscribers) snapshot checks.
//
// Updates are batched via requestAnimationFrame so a burst of WS messages
// (e.g. on initial connect) results in a single React render pass.
// ---------------------------------------------------------------------------
type Listener = () => void;
const wsState: WsState = {};
const wsTopicListeners = new Map<string, Set<Listener>>();
// --- rAF drain + flush ---
//
// The onmessage handler in WsProvider pushes raw event.data strings into
// rawMessageBuffer (sub-microsecond cost). A single requestAnimationFrame
// callback drains the buffer, JSON-parses each message, applies state updates,
// and notifies React subscribers — all in one burst per frame. This gives
// React uninterrupted CPU time between frames.
const rawMessageBuffer: string[] = [];
let pendingUpdates: Record<string, unknown> | null = null;
let pendingFeedMessages: WsFeedMessage[] = [];
let flushScheduled = false;
/**
* Reset all module-level state. Called on WsProvider unmount to prevent
* stale data from leaking across mount/unmount cycles (e.g. HMR, logout).
*/
export function resetWsStore() {
for (const key of Object.keys(wsState)) {
delete wsState[key];
}
wsTopicListeners.clear();
rawMessageBuffer.length = 0;
pendingUpdates = null;
pendingFeedMessages = [];
flushScheduled = false;
lastCameraActivityPayload = null;
wsMessageSubscribers.clear();
wsMessageIdCounter = 0;
}
export function bufferRawMessage(data: string) {
rawMessageBuffer.push(data);
if (!flushScheduled) {
flushScheduled = true;
requestAnimationFrame(drainAndFlush);
}
}
function drainAndFlush() {
flushScheduled = false;
// 1. Drain raw buffer → ingest messages
if (rawMessageBuffer.length > 0) {
const batch = rawMessageBuffer.splice(0);
for (const raw of batch) {
const data: Update = JSON.parse(raw);
if (data) {
ingestMessage(data);
}
}
}
// 2. Flush state updates to React
if (pendingUpdates) {
const updates = pendingUpdates;
pendingUpdates = null;
for (const [topic, newVal] of Object.entries(updates)) {
const oldVal = wsState[topic];
// Fast path: === for primitives ("ON"/"OFF", numbers).
// Fall back to isEqual for objects/arrays.
const unchanged =
oldVal === newVal ||
(typeof newVal === "object" &&
newVal !== null &&
isEqual(oldVal, newVal));
if (!unchanged) {
wsState[topic] = newVal;
// Snapshot the Set — a listener may trigger unmount that modifies it.
const listeners = wsTopicListeners.get(topic);
if (listeners) {
for (const l of Array.from(listeners)) l();
}
}
}
}
// 3. Deliver feed messages
if (pendingFeedMessages.length > 0 && wsMessageSubscribers.size > 0) {
const msgs = pendingFeedMessages;
pendingFeedMessages = [];
for (const msg of msgs) {
wsMessageSubscribers.forEach((cb) => cb(msg));
}
} else {
pendingFeedMessages = [];
}
}
function ingestMessage(data: Update) {
if (!pendingUpdates) pendingUpdates = {};
pendingUpdates[data.topic] = data.payload;
if (data.topic === "camera_activity") {
expandCameraActivity(data.payload as string, pendingUpdates);
}
if (wsMessageSubscribers.size > 0) {
pendingFeedMessages.push({
topic: data.topic,
payload: data.payload,
timestamp: Date.now(),
id: String(wsMessageIdCounter++),
});
}
}
// --- Subscriptions ---
export function subscribeWsTopic(
topic: string,
listener: Listener,
): () => void {
let set = wsTopicListeners.get(topic);
if (!set) {
set = new Set();
wsTopicListeners.set(topic, set);
}
set.add(listener);
return () => {
set!.delete(listener);
if (set!.size === 0) wsTopicListeners.delete(topic);
};
}
export function getWsTopicValue(topic: string): unknown {
return wsState[topic];
}
// ---------------------------------------------------------------------------
// Feed message subscribers
// ---------------------------------------------------------------------------
const wsMessageSubscribers = new Set<(msg: WsFeedMessage) => void>();
let wsMessageIdCounter = 0;
function useValue(): useValueReturn {
const wsUrl = `${baseUrl.replace(/^http/, "ws")}ws`;
// ---------------------------------------------------------------------------
// Camera activity expansion
// ---------------------------------------------------------------------------
// main state
// Cache the last raw camera_activity JSON string so we can skip JSON.parse
// and the entire expansion when nothing has changed. This avoids creating
// fresh objects (which defeat Object.is and force expensive isEqual deep
// traversals) on every flush — critical with many cameras.
let lastCameraActivityPayload: string | null = null;
const [wsState, setWsState] = useState<WsState>({});
function expandCameraActivity(
payload: string,
updates: Record<string, unknown>,
) {
// Fast path: if the raw JSON string is identical, nothing changed.
if (payload === lastCameraActivityPayload) {
// Remove camera_activity from pending updates so flushPendingUpdates
// doesn't run isEqual on a freshly-parsed object that's identical.
delete updates["camera_activity"];
return;
}
lastCameraActivityPayload = payload;
useEffect(() => {
const activityValue: string = wsState["camera_activity"] as string;
let activity: { [key: string]: Partial<FrigateCameraState> };
if (!activityValue) {
return;
}
try {
activity = JSON.parse(payload);
} catch {
return;
}
let cameraActivity: { [key: string]: Partial<FrigateCameraState> };
// Remove the root topic — no component reads the monolithic object.
// Only per-camera subtopics and per-setting primitives are stored.
delete updates["camera_activity"];
try {
cameraActivity = JSON.parse(activityValue);
} catch {
return;
}
if (Object.keys(activity).length === 0) return;
if (Object.keys(cameraActivity).length === 0) {
return;
}
for (const [name, state] of Object.entries(activity)) {
// Notify per-camera subtopic specifically
updates[`camera_activity/${name}`] = state;
const cameraStates: WsState = {};
const cameraConfig = state?.config;
if (!cameraConfig) continue;
Object.entries(cameraActivity).forEach(([name, state]) => {
const cameraConfig = state?.config;
const {
record,
detect,
enabled,
snapshots,
audio,
audio_transcription,
notifications,
notifications_suspended,
autotracking,
alerts,
detections,
object_descriptions,
review_descriptions,
} = cameraConfig;
if (!cameraConfig) {
return;
}
const {
record,
detect,
enabled,
snapshots,
audio,
audio_transcription,
notifications,
notifications_suspended,
autotracking,
alerts,
detections,
object_descriptions,
review_descriptions,
} = cameraConfig;
cameraStates[`${name}/recordings/state`] = record ? "ON" : "OFF";
cameraStates[`${name}/enabled/state`] = enabled ? "ON" : "OFF";
cameraStates[`${name}/detect/state`] = detect ? "ON" : "OFF";
cameraStates[`${name}/snapshots/state`] = snapshots ? "ON" : "OFF";
cameraStates[`${name}/audio/state`] = audio ? "ON" : "OFF";
cameraStates[`${name}/audio_transcription/state`] = audio_transcription
? "ON"
: "OFF";
cameraStates[`${name}/notifications/state`] = notifications
? "ON"
: "OFF";
cameraStates[`${name}/notifications/suspended`] =
notifications_suspended || 0;
cameraStates[`${name}/ptz_autotracker/state`] = autotracking
? "ON"
: "OFF";
cameraStates[`${name}/review_alerts/state`] = alerts ? "ON" : "OFF";
cameraStates[`${name}/review_detections/state`] = detections
? "ON"
: "OFF";
cameraStates[`${name}/object_descriptions/state`] = object_descriptions
? "ON"
: "OFF";
cameraStates[`${name}/review_descriptions/state`] = review_descriptions
? "ON"
: "OFF";
});
setWsState((prevState) => ({
...prevState,
...cameraStates,
}));
// we only want this to run initially when the config is loaded
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [wsState["camera_activity"]]);
// ws handler
const { sendJsonMessage, readyState } = useWebSocket(wsUrl, {
onMessage: (event) => {
const data: Update = JSON.parse(event.data);
if (data) {
setWsState((prevState) => ({
...prevState,
[data.topic]: data.payload,
}));
// Notify feed subscribers
if (wsMessageSubscribers.size > 0) {
const feedMsg: WsFeedMessage = {
topic: data.topic,
payload: data.payload,
timestamp: Date.now(),
id: String(wsMessageIdCounter++),
};
wsMessageSubscribers.forEach((cb) => cb(feedMsg));
}
}
},
onOpen: () => {
sendJsonMessage({
topic: "onConnect",
message: "",
retain: false,
});
},
onClose: () => {},
shouldReconnect: () => true,
retryOnError: true,
});
const setState = useCallback(
(message: Update) => {
if (readyState === ReadyState.OPEN) {
sendJsonMessage({
topic: message.topic,
payload: message.payload,
retain: message.retain,
});
}
},
[readyState, sendJsonMessage],
);
return [wsState, setState];
updates[`${name}/recordings/state`] = record ? "ON" : "OFF";
updates[`${name}/enabled/state`] = enabled ? "ON" : "OFF";
updates[`${name}/detect/state`] = detect ? "ON" : "OFF";
updates[`${name}/snapshots/state`] = snapshots ? "ON" : "OFF";
updates[`${name}/audio/state`] = audio ? "ON" : "OFF";
updates[`${name}/audio_transcription/state`] = audio_transcription
? "ON"
: "OFF";
updates[`${name}/notifications/state`] = notifications ? "ON" : "OFF";
updates[`${name}/notifications/suspended`] = notifications_suspended || 0;
updates[`${name}/ptz_autotracker/state`] = autotracking ? "ON" : "OFF";
updates[`${name}/review_alerts/state`] = alerts ? "ON" : "OFF";
updates[`${name}/review_detections/state`] = detections ? "ON" : "OFF";
updates[`${name}/object_descriptions/state`] = object_descriptions
? "ON"
: "OFF";
updates[`${name}/review_descriptions/state`] = review_descriptions
? "ON"
: "OFF";
}
}
export const {
Provider: WsProvider,
useTrackedState: useWsState,
useUpdate: useWsUpdate,
} = createContainer(useValue, { defaultState: {}, concurrentMode: true });
// ---------------------------------------------------------------------------
// Hooks
// ---------------------------------------------------------------------------
/**
* Get the send function for publishing WS messages.
*/
export function useWsUpdate(): WsSend {
const send = useContext(WsSendContext);
if (!send) {
throw new Error("useWsUpdate must be used within WsProvider");
}
return send;
}
/**
* Subscribe to a single WS topic with proper bail-out.
* Only re-renders when the topic's value changes (Object.is comparison).
* Uses useSyncExternalStore zero useEffect, so no PassiveMask flags
* propagate through the fiber tree.
*/
export function useWs(watchTopic: string, publishTopic: string) {
const state = useWsState();
const payload = useSyncExternalStore(
useCallback(
(listener: Listener) => subscribeWsTopic(watchTopic, listener),
[watchTopic],
),
useCallback(() => wsState[watchTopic], [watchTopic]),
);
const sendJsonMessage = useWsUpdate();
const value = { payload: state[watchTopic] || null };
const value = { payload: payload ?? null };
const send = useCallback(
(payload: unknown, retain = false) => {
@ -203,6 +318,10 @@ export function useWs(watchTopic: string, publishTopic: string) {
return { value, send };
}
// ---------------------------------------------------------------------------
// Convenience hooks
// ---------------------------------------------------------------------------
export function useEnabledState(camera: string): {
payload: ToggleableSetting;
send: (payload: ToggleableSetting, retain?: boolean) => void;
@ -413,28 +532,42 @@ export function useFrigateEvents(): { payload: FrigateEvent } {
const {
value: { payload },
} = useWs("events", "");
return { payload: JSON.parse(payload as string) };
const parsed = useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
return { payload: parsed };
}
export function useAudioDetections(): { payload: FrigateAudioDetections } {
const {
value: { payload },
} = useWs("audio_detections", "");
return { payload: JSON.parse(payload as string) };
const parsed = useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
return { payload: parsed };
}
export function useFrigateReviews(): FrigateReview {
const {
value: { payload },
} = useWs("reviews", "");
return useDeepMemo(JSON.parse(payload as string));
return useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
}
export function useFrigateStats(): FrigateStats {
const {
value: { payload },
} = useWs("stats", "");
return useDeepMemo(JSON.parse(payload as string));
return useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
}
export function useInitialCameraState(
@ -446,32 +579,32 @@ export function useInitialCameraState(
const {
value: { payload },
send: sendCommand,
} = useWs("camera_activity", "onConnect");
} = useWs(`camera_activity/${camera}`, "onConnect");
const data = useDeepMemo(JSON.parse(payload as string));
// camera_activity sub-topic payload is already parsed by expandCameraActivity
const data = payload as FrigateCameraState | undefined;
// onConnect is sent once in WsProvider.onopen — no need to re-request on
// every component mount. Components read cached wsState immediately via
// useSyncExternalStore. Only re-request when the user tabs back in.
useEffect(() => {
let listener = undefined;
if (revalidateOnFocus) {
sendCommand("onConnect");
listener = () => {
if (document.visibilityState == "visible") {
sendCommand("onConnect");
}
};
addEventListener("visibilitychange", listener);
}
sendCommand("onConnect");
if (!revalidateOnFocus) return;
return () => {
if (listener) {
removeEventListener("visibilitychange", listener);
const listener = () => {
if (document.visibilityState === "visible") {
sendCommand("onConnect");
}
};
// only refresh when onRefresh value changes
addEventListener("visibilitychange", listener);
return () => {
removeEventListener("visibilitychange", listener);
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [revalidateOnFocus]);
return { payload: data ? data[camera] : undefined };
return { payload: data as FrigateCameraState };
}
export function useModelState(
@ -483,7 +616,10 @@ export function useModelState(
send: sendCommand,
} = useWs("model_state", "modelState");
const data = useDeepMemo(JSON.parse(payload as string));
const data = useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
useEffect(() => {
let listener = undefined;
@ -519,7 +655,10 @@ export function useEmbeddingsReindexProgress(
send: sendCommand,
} = useWs("embeddings_reindex_progress", "embeddingsReindexProgress");
const data = useDeepMemo(JSON.parse(payload as string));
const data = useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
useEffect(() => {
let listener = undefined;
@ -553,8 +692,9 @@ export function useAudioTranscriptionProcessState(
send: sendCommand,
} = useWs("audio_transcription_state", "audioTranscriptionState");
const data = useDeepMemo(
payload ? (JSON.parse(payload as string) as string) : "idle",
const data = useMemo(
() => (payload ? (JSON.parse(payload as string) as string) : "idle"),
[payload],
);
useEffect(() => {
@ -587,7 +727,10 @@ export function useBirdseyeLayout(revalidateOnFocus: boolean = true): {
send: sendCommand,
} = useWs("birdseye_layout", "birdseyeLayout");
const data = useDeepMemo(JSON.parse(payload as string));
const data = useMemo(
() => (payload ? JSON.parse(payload as string) : undefined),
[payload],
);
useEffect(() => {
let listener = undefined;
@ -684,10 +827,14 @@ export function useTrackedObjectUpdate(): {
const {
value: { payload },
} = useWs("tracked_object_update", "");
const parsed = payload
? JSON.parse(payload as string)
: { type: "", id: "", camera: "" };
return { payload: useDeepMemo(parsed) };
const parsed = useMemo(
() =>
payload
? JSON.parse(payload as string)
: { type: "", id: "", camera: "" },
[payload],
);
return { payload: parsed };
}
export function useNotifications(camera: string): {
@ -730,10 +877,14 @@ export function useTriggers(): { payload: TriggerStatus } {
const {
value: { payload },
} = useWs("triggers", "");
const parsed = payload
? JSON.parse(payload as string)
: { name: "", camera: "", event_id: "", type: "", score: 0 };
return { payload: useDeepMemo(parsed) };
const parsed = useMemo(
() =>
payload
? JSON.parse(payload as string)
: { name: "", camera: "", event_id: "", type: "", score: 0 },
[payload],
);
return { payload: parsed };
}
export function useJobStatus(
@ -745,8 +896,9 @@ export function useJobStatus(
send: sendCommand,
} = useWs("job_state", "jobState");
const jobData = useDeepMemo(
payload && typeof payload === "string" ? JSON.parse(payload) : {},
const jobData = useMemo(
() => (payload && typeof payload === "string" ? JSON.parse(payload) : {}),
[payload],
);
const currentJob = jobData[jobType] || null;

11
web/src/api/wsContext.ts Normal file
View File

@ -0,0 +1,11 @@
import { createContext } from "react";
export type Update = {
topic: string;
payload: unknown;
retain: boolean;
};
export type WsSend = (update: Update) => void;
export const WsSendContext = createContext<WsSend | null>(null);