From 20b11124227d3057acea0a7875e4cc4ede53bda5 Mon Sep 17 00:00:00 2001 From: George Tsiamasiotis Date: Wed, 2 Oct 2024 22:01:41 +0300 Subject: [PATCH] Fast process shutdown --- frigate/app.py | 64 +++++++++++++++++++---------------------- frigate/util/process.py | 36 +++++++++++++++++++++++ 2 files changed, 65 insertions(+), 35 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 27484e43b..0aebd13ec 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -5,6 +5,7 @@ import os import secrets import shutil import threading +from itertools import chain from typing import Any, Optional import psutil @@ -66,6 +67,7 @@ from frigate.stats.util import stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor from frigate.util.builtin import empty_and_close_queue +from frigate.util.process import ProcessStopper from frigate.version import VERSION from frigate.watchdog import FrigateWatchdog @@ -550,52 +552,44 @@ class FrigateApp: ReviewSegment.end_time == None ).execute() - # stop the audio process - if self.audio_process: - self.audio_process.terminate() - self.audio_process.join() + # The list of processes we need to stop + processes: list[mp.Process] = [] - # ensure the capture processes are done + # Stop the audio process + processes.append(self.audio_process) + + # Stop the camera processors + processes.extend( + chain.from_iterable( + (camera.capture_process, camera.process) + for camera in self.cameras.values() + ) + ) + + # Stop all remaining processes + processes.extend(self.detectors.values()) + processes.append(self.output_processor) + processes.append(self.recording_process) + processes.append(self.review_segment_process) + + ProcessStopper(processes) + + # Close pending frame queues for name, camera in self.cameras.items(): - if camera.capture_process is not None: - logger.info(f"Waiting for capture process for {name} to stop") - camera.capture_process.terminate() - camera.capture_process.join() - - # ensure the camera processors are done - for name, camera in self.cameras.items(): - if camera.process is not None: - logger.info(f"Waiting for process for {name} to stop") - camera.process.terminate() - camera.process.join() - logger.info(f"Closing frame queue for {name}") - empty_and_close_queue(camera.camera_metrics.frame_queue) - - # ensure the detectors are done - for detector in self.detectors.values(): - detector.terminate() - detector.join() + logger.info(f"Closing frame queue for {name}") + empty_and_close_queue(camera.camera_metrics.frame_queue) + logger.info("Closing detection queue") empty_and_close_queue(self.detection_queue) - logger.info("Detection queue closed") self.detected_frames_processor.join() + logger.info("Closing detected frames queue") empty_and_close_queue(self.detected_frames_queue) - logger.info("Detected frames queue closed") self.timeline_processor.join() self.event_processor.join() empty_and_close_queue(self.timeline_queue) - logger.info("Timeline queue closed") - - self.output_processor.terminate() - self.output_processor.join() - - self.recording_process.terminate() - self.recording_process.join() - - self.review_segment_process.terminate() - self.review_segment_process.join() + logger.info("Closing timeline queue") self.external_event_processor.stop() self.dispatcher.stop() diff --git a/frigate/util/process.py b/frigate/util/process.py index 886b3d2fb..c4a1906fd 100644 --- a/frigate/util/process.py +++ b/frigate/util/process.py @@ -6,10 +6,13 @@ import sys import threading from functools import wraps from logging.handlers import QueueHandler +from multiprocessing.connection import wait as mp_wait from typing import Any import frigate.log +logger = logging.getLogger(__name__) + class BaseProcess(mp.Process): def __init__(self, **kwargs): @@ -81,3 +84,36 @@ class Process(BaseProcess): signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGINT, receiveSignal) + + +class ProcessStopper(threading.Thread): + def __init__(self, processes: list[mp.Process]): + super().__init__() + self.processes = processes + self.start() + + def run(self): + # Stop all processes registered for stopping + sentinels: dict[int, mp.Process] = {} + for process in self.processes: + if process is None: + continue + process.terminate() + sentinels[process.sentinel] = process + + # Wait for all the processes to shutdown + logger.info(f"Waiting for {len(sentinels)} processes to stop") + while sentinels: + ready = mp_wait(sentinels.keys(), timeout=10) + if ready: + for sentinel in ready: + name = sentinels[sentinel].name + del sentinels[sentinel] + logger.info(f"Process {name} has stopped") + else: + proc: mp.Process = next(iter(sentinels.values())) + logger.warning( + f"{len(sentinels)} processes are still running. " + f"Killing {proc.name or 'one of them'}" + ) + proc.kill()