From 6d2b84e20233ad784ce524b08f1a702ec6cc1554 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Fri, 20 Mar 2026 19:02:47 -0500 Subject: [PATCH] Improve process watchdog (#22557) * monitor subprocesses and auto-restart with watchdog * fix typing * formatting --- frigate/app.py | 44 +++++++++++++++++++- frigate/watchdog.py | 97 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index fef37813a..750f1ad23 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -8,7 +8,7 @@ from multiprocessing import Queue from multiprocessing.managers import DictProxy, SyncManager from multiprocessing.synchronize import Event as MpEvent from pathlib import Path -from typing import Optional +from typing import Callable, Optional import psutil import uvicorn @@ -81,6 +81,7 @@ from frigate.timeline import TimelineProcessor from frigate.track.object_processing import TrackedObjectProcessor from frigate.util.builtin import empty_and_close_queue from frigate.util.image import UntrackedSharedMemory +from frigate.util.process import FrigateProcess from frigate.util.services import set_file_limit from frigate.version import VERSION from frigate.watchdog import FrigateWatchdog @@ -497,6 +498,47 @@ class FrigateApp: def start_watchdog(self) -> None: self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event) + + # (attribute on self, key in self.processes, factory) + specs: list[tuple[str, str, Callable[[], FrigateProcess]]] = [ + ( + "embedding_process", + "embeddings", + lambda: EmbeddingProcess( + self.config, self.embeddings_metrics, self.stop_event + ), + ), + ( + "recording_process", + "recording", + lambda: RecordProcess(self.config, self.stop_event), + ), + ( + "review_segment_process", + "review_segment", + lambda: ReviewProcess(self.config, self.stop_event), + ), + ( + "output_processor", + "output", + lambda: OutputProcess(self.config, self.stop_event), + ), + ] + + for attr, key, factory in specs: + if not hasattr(self, attr): + continue + + def on_restart( + proc: FrigateProcess, _attr: str = attr, _key: str = key + ) -> None: + setattr(self, _attr, proc) + self.processes[_key] = proc.pid or 0 + + self.frigate_watchdog.register( + key, getattr(self, attr), factory, on_restart + ) + self.frigate_watchdog.start() def init_auth(self) -> None: diff --git a/frigate/watchdog.py b/frigate/watchdog.py index 4c49de1a0..63fd16629 100644 --- a/frigate/watchdog.py +++ b/frigate/watchdog.py @@ -2,19 +2,111 @@ import datetime import logging import threading import time +from collections import deque +from dataclasses import dataclass, field from multiprocessing.synchronize import Event as MpEvent +from typing import Callable from frigate.object_detection.base import ObjectDetectProcess +from frigate.util.process import FrigateProcess from frigate.util.services import restart_frigate logger = logging.getLogger(__name__) +MAX_RESTARTS = 5 +RESTART_WINDOW_S = 60 + + +@dataclass +class MonitoredProcess: + """A process monitored by the watchdog for automatic restart.""" + + name: str + process: FrigateProcess + factory: Callable[[], FrigateProcess] + on_restart: Callable[[FrigateProcess], None] | None = None + restart_timestamps: deque[float] = field( + default_factory=lambda: deque(maxlen=MAX_RESTARTS) + ) + + def is_restarting_too_fast(self, now: float) -> bool: + while ( + self.restart_timestamps + and now - self.restart_timestamps[0] > RESTART_WINDOW_S + ): + self.restart_timestamps.popleft() + return len(self.restart_timestamps) >= MAX_RESTARTS + class FrigateWatchdog(threading.Thread): - def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: MpEvent): + def __init__( + self, + detectors: dict[str, ObjectDetectProcess], + stop_event: MpEvent, + ): super().__init__(name="frigate_watchdog") self.detectors = detectors self.stop_event = stop_event + self._monitored: list[MonitoredProcess] = [] + + def register( + self, + name: str, + process: FrigateProcess, + factory: Callable[[], FrigateProcess], + on_restart: Callable[[FrigateProcess], None] | None = None, + ) -> None: + """Register a FrigateProcess for monitoring and automatic restart.""" + self._monitored.append( + MonitoredProcess( + name=name, + process=process, + factory=factory, + on_restart=on_restart, + ) + ) + + def _check_process(self, entry: MonitoredProcess) -> None: + if entry.process.is_alive(): + return + + exitcode = entry.process.exitcode + if exitcode == 0: + logger.info("Process %s exited cleanly, not restarting", entry.name) + return + + logger.warning( + "Process %s (PID %s) exited with code %s", + entry.name, + entry.process.pid, + exitcode, + ) + + now = datetime.datetime.now().timestamp() + + if entry.is_restarting_too_fast(now): + logger.error( + "Process %s restarting too frequently (%d times in %ds), backing off", + entry.name, + MAX_RESTARTS, + RESTART_WINDOW_S, + ) + return + + try: + entry.process.close() + new_process = entry.factory() + new_process.start() + + entry.process = new_process + entry.restart_timestamps.append(now) + + if entry.on_restart: + entry.on_restart(new_process) + + logger.info("Restarted %s (PID %s)", entry.name, new_process.pid) + except Exception: + logger.exception("Failed to restart %s", entry.name) def run(self) -> None: time.sleep(10) @@ -38,4 +130,7 @@ class FrigateWatchdog(threading.Thread): logger.info("Detection appears to have stopped. Exiting Frigate...") restart_frigate() + for entry in self._monitored: + self._check_process(entry) + logger.info("Exiting watchdog...")