frigate/frigate/watchdog.py
Josh Hawkins 95b5b89ed9
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Miscellaneous fixes (#23032)
* ensure embeddings process restarts after maintainer thread crash

* add docs link to media sync settings

* fix color

Co-authored-by: Copilot <copilot@github.com>

* match link color with other sections

* ensure recording staleness threshold scales with segment_time

* docs tweak

* Fix llama.cpp media marker

* Fix gemini tools call

---------

Co-authored-by: Copilot <copilot@github.com>
Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2026-04-29 16:20:19 -06:00

140 lines
4.4 KiB
Python

import datetime
import logging
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
from frigate.object_detection.base import ObjectDetectProcess
from frigate.util.process import FrigateProcess
from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__)
MAX_RESTARTS = 5
RESTART_WINDOW_S = 60
@dataclass
class MonitoredProcess:
"""A process monitored by the watchdog for automatic restart."""
name: str
process: FrigateProcess
factory: Callable[[], FrigateProcess]
on_restart: Callable[[FrigateProcess], None] | None = None
restart_timestamps: deque[float] = field(
default_factory=lambda: deque(maxlen=MAX_RESTARTS)
)
clean_exit_logged: bool = False
def is_restarting_too_fast(self, now: float) -> bool:
while (
self.restart_timestamps
and now - self.restart_timestamps[0] > RESTART_WINDOW_S
):
self.restart_timestamps.popleft()
return len(self.restart_timestamps) >= MAX_RESTARTS
class FrigateWatchdog(threading.Thread):
def __init__(
self,
detectors: dict[str, ObjectDetectProcess],
stop_event: MpEvent,
):
super().__init__(name="frigate_watchdog")
self.detectors = detectors
self.stop_event = stop_event
self._monitored: list[MonitoredProcess] = []
def register(
self,
name: str,
process: FrigateProcess,
factory: Callable[[], FrigateProcess],
on_restart: Callable[[FrigateProcess], None] | None = None,
) -> None:
"""Register a FrigateProcess for monitoring and automatic restart."""
self._monitored.append(
MonitoredProcess(
name=name,
process=process,
factory=factory,
on_restart=on_restart,
)
)
def _check_process(self, entry: MonitoredProcess) -> None:
if entry.process.is_alive():
return
exitcode = entry.process.exitcode
if exitcode == 0:
if not entry.clean_exit_logged:
logger.info("Process %s exited cleanly, not restarting", entry.name)
entry.clean_exit_logged = True
return
logger.warning(
"Process %s (PID %s) exited with code %s",
entry.name,
entry.process.pid,
exitcode,
)
now = datetime.datetime.now().timestamp()
if entry.is_restarting_too_fast(now):
logger.error(
"Process %s restarting too frequently (%d times in %ds), backing off",
entry.name,
MAX_RESTARTS,
RESTART_WINDOW_S,
)
return
try:
entry.process.close()
new_process = entry.factory()
new_process.start()
entry.process = new_process
entry.restart_timestamps.append(now)
if entry.on_restart:
entry.on_restart(new_process)
logger.info("Restarted %s (PID %s)", entry.name, new_process.pid)
except Exception:
logger.exception("Failed to restart %s", entry.name)
def run(self) -> None:
time.sleep(10)
while not self.stop_event.wait(10):
now = datetime.datetime.now().timestamp()
# check the detection processes
for detector in self.detectors.values():
detection_start = detector.detection_start.value # type: ignore[attr-defined]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
if detection_start > 0.0 and now - detection_start > 10:
logger.info(
"Detection appears to be stuck. Restarting detection process..."
)
detector.start_or_restart()
elif (
detector.detect_process is not None
and not detector.detect_process.is_alive()
):
logger.info("Detection appears to have stopped. Exiting Frigate...")
restart_frigate()
for entry in self._monitored:
self._check_process(entry)
logger.info("Exiting watchdog...")