Improve process watchdog (#22557)
Some checks failed
CI / AMD64 Build (push) Has been cancelled
CI / ARM Build (push) Has been cancelled
CI / Jetson Jetpack 6 (push) Has been cancelled
CI / AMD64 Extra Build (push) Has been cancelled
CI / ARM Extra Build (push) Has been cancelled
CI / Synaptics Build (push) Has been cancelled
CI / Assemble and push default build (push) Has been cancelled

* monitor subprocesses and auto-restart with watchdog

* fix typing

* formatting
This commit is contained in:
Josh Hawkins 2026-03-20 19:02:47 -05:00 committed by GitHub
parent a8da4c4521
commit 6d2b84e202
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 139 additions and 2 deletions

View File

@ -8,7 +8,7 @@ from multiprocessing import Queue
from multiprocessing.managers import DictProxy, SyncManager from multiprocessing.managers import DictProxy, SyncManager
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Callable, Optional
import psutil import psutil
import uvicorn import uvicorn
@ -81,6 +81,7 @@ from frigate.timeline import TimelineProcessor
from frigate.track.object_processing import TrackedObjectProcessor from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import empty_and_close_queue from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import UntrackedSharedMemory from frigate.util.image import UntrackedSharedMemory
from frigate.util.process import FrigateProcess
from frigate.util.services import set_file_limit from frigate.util.services import set_file_limit
from frigate.version import VERSION from frigate.version import VERSION
from frigate.watchdog import FrigateWatchdog from frigate.watchdog import FrigateWatchdog
@ -497,6 +498,47 @@ class FrigateApp:
def start_watchdog(self) -> None: def start_watchdog(self) -> None:
self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event) self.frigate_watchdog = FrigateWatchdog(self.detectors, self.stop_event)
# (attribute on self, key in self.processes, factory)
specs: list[tuple[str, str, Callable[[], FrigateProcess]]] = [
(
"embedding_process",
"embeddings",
lambda: EmbeddingProcess(
self.config, self.embeddings_metrics, self.stop_event
),
),
(
"recording_process",
"recording",
lambda: RecordProcess(self.config, self.stop_event),
),
(
"review_segment_process",
"review_segment",
lambda: ReviewProcess(self.config, self.stop_event),
),
(
"output_processor",
"output",
lambda: OutputProcess(self.config, self.stop_event),
),
]
for attr, key, factory in specs:
if not hasattr(self, attr):
continue
def on_restart(
proc: FrigateProcess, _attr: str = attr, _key: str = key
) -> None:
setattr(self, _attr, proc)
self.processes[_key] = proc.pid or 0
self.frigate_watchdog.register(
key, getattr(self, attr), factory, on_restart
)
self.frigate_watchdog.start() self.frigate_watchdog.start()
def init_auth(self) -> None: def init_auth(self) -> None:

View File

@ -2,19 +2,111 @@ import datetime
import logging import logging
import threading import threading
import time import time
from collections import deque
from dataclasses import dataclass, field
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from frigate.object_detection.base import ObjectDetectProcess from frigate.object_detection.base import ObjectDetectProcess
from frigate.util.process import FrigateProcess
from frigate.util.services import restart_frigate from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MAX_RESTARTS = 5
RESTART_WINDOW_S = 60
@dataclass
class MonitoredProcess:
"""A process monitored by the watchdog for automatic restart."""
name: str
process: FrigateProcess
factory: Callable[[], FrigateProcess]
on_restart: Callable[[FrigateProcess], None] | None = None
restart_timestamps: deque[float] = field(
default_factory=lambda: deque(maxlen=MAX_RESTARTS)
)
def is_restarting_too_fast(self, now: float) -> bool:
while (
self.restart_timestamps
and now - self.restart_timestamps[0] > RESTART_WINDOW_S
):
self.restart_timestamps.popleft()
return len(self.restart_timestamps) >= MAX_RESTARTS
class FrigateWatchdog(threading.Thread): class FrigateWatchdog(threading.Thread):
def __init__(self, detectors: dict[str, ObjectDetectProcess], stop_event: MpEvent): def __init__(
self,
detectors: dict[str, ObjectDetectProcess],
stop_event: MpEvent,
):
super().__init__(name="frigate_watchdog") super().__init__(name="frigate_watchdog")
self.detectors = detectors self.detectors = detectors
self.stop_event = stop_event self.stop_event = stop_event
self._monitored: list[MonitoredProcess] = []
def register(
self,
name: str,
process: FrigateProcess,
factory: Callable[[], FrigateProcess],
on_restart: Callable[[FrigateProcess], None] | None = None,
) -> None:
"""Register a FrigateProcess for monitoring and automatic restart."""
self._monitored.append(
MonitoredProcess(
name=name,
process=process,
factory=factory,
on_restart=on_restart,
)
)
def _check_process(self, entry: MonitoredProcess) -> None:
if entry.process.is_alive():
return
exitcode = entry.process.exitcode
if exitcode == 0:
logger.info("Process %s exited cleanly, not restarting", entry.name)
return
logger.warning(
"Process %s (PID %s) exited with code %s",
entry.name,
entry.process.pid,
exitcode,
)
now = datetime.datetime.now().timestamp()
if entry.is_restarting_too_fast(now):
logger.error(
"Process %s restarting too frequently (%d times in %ds), backing off",
entry.name,
MAX_RESTARTS,
RESTART_WINDOW_S,
)
return
try:
entry.process.close()
new_process = entry.factory()
new_process.start()
entry.process = new_process
entry.restart_timestamps.append(now)
if entry.on_restart:
entry.on_restart(new_process)
logger.info("Restarted %s (PID %s)", entry.name, new_process.pid)
except Exception:
logger.exception("Failed to restart %s", entry.name)
def run(self) -> None: def run(self) -> None:
time.sleep(10) time.sleep(10)
@ -38,4 +130,7 @@ class FrigateWatchdog(threading.Thread):
logger.info("Detection appears to have stopped. Exiting Frigate...") logger.info("Detection appears to have stopped. Exiting Frigate...")
restart_frigate() restart_frigate()
for entry in self._monitored:
self._check_process(entry)
logger.info("Exiting watchdog...") logger.info("Exiting watchdog...")