frigate/frigate/watchdog.py
Josh Hawkins 6d2b84e202
Some checks failed
CI / AMD64 Build (push) Has been cancelled
CI / ARM Build (push) Has been cancelled
CI / Jetson Jetpack 6 (push) Has been cancelled
CI / AMD64 Extra Build (push) Has been cancelled
CI / ARM Extra Build (push) Has been cancelled
CI / Synaptics Build (push) Has been cancelled
CI / Assemble and push default build (push) Has been cancelled
Improve process watchdog (#22557)
* monitor subprocesses and auto-restart with watchdog

* fix typing

* formatting
2026-03-20 19:02:47 -05:00

137 lines
4.3 KiB
Python

import datetime
import logging
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from frigate.object_detection.base import ObjectDetectProcess
from frigate.util.process import FrigateProcess
from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__)
MAX_RESTARTS = 5
RESTART_WINDOW_S = 60
@dataclass
class MonitoredProcess:
"""A process monitored by the watchdog for automatic restart."""
name: str
process: FrigateProcess
factory: Callable[[], FrigateProcess]
on_restart: Callable[[FrigateProcess], None] | None = None
restart_timestamps: deque[float] = field(
default_factory=lambda: deque(maxlen=MAX_RESTARTS)
)
def is_restarting_too_fast(self, now: float) -> bool:
while (
self.restart_timestamps
and now - self.restart_timestamps[0] > RESTART_WINDOW_S
):
self.restart_timestamps.popleft()
return len(self.restart_timestamps) >= MAX_RESTARTS
class FrigateWatchdog(threading.Thread):
def __init__(
self,
detectors: dict[str, ObjectDetectProcess],
stop_event: MpEvent,
):
super().__init__(name="frigate_watchdog")
self.detectors = detectors
self.stop_event = stop_event
self._monitored: list[MonitoredProcess] = []
def register(
self,
name: str,
process: FrigateProcess,
factory: Callable[[], FrigateProcess],
on_restart: Callable[[FrigateProcess], None] | None = None,
) -> None:
"""Register a FrigateProcess for monitoring and automatic restart."""
self._monitored.append(
MonitoredProcess(
name=name,
process=process,
factory=factory,
on_restart=on_restart,
)
)
def _check_process(self, entry: MonitoredProcess) -> None:
if entry.process.is_alive():
return
exitcode = entry.process.exitcode
if exitcode == 0:
logger.info("Process %s exited cleanly, not restarting", entry.name)
return
logger.warning(
"Process %s (PID %s) exited with code %s",
entry.name,
entry.process.pid,
exitcode,
)
now = datetime.datetime.now().timestamp()
if entry.is_restarting_too_fast(now):
logger.error(
"Process %s restarting too frequently (%d times in %ds), backing off",
entry.name,
MAX_RESTARTS,
RESTART_WINDOW_S,
)
return
try:
entry.process.close()
new_process = entry.factory()
new_process.start()
entry.process = new_process
entry.restart_timestamps.append(now)
if entry.on_restart:
entry.on_restart(new_process)
logger.info("Restarted %s (PID %s)", entry.name, new_process.pid)
except Exception:
logger.exception("Failed to restart %s", entry.name)
def run(self) -> None:
time.sleep(10)
while not self.stop_event.wait(10):
now = datetime.datetime.now().timestamp()
# check the detection processes
for detector in self.detectors.values():
detection_start = detector.detection_start.value # type: ignore[attr-defined]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
if detection_start > 0.0 and now - detection_start > 10:
logger.info(
"Detection appears to be stuck. Restarting detection process..."
)
detector.start_or_restart()
elif (
detector.detect_process is not None
and not detector.detect_process.is_alive()
):
logger.info("Detection appears to have stopped. Exiting Frigate...")
restart_frigate()
for entry in self._monitored:
self._check_process(entry)
logger.info("Exiting watchdog...")