Fast process shutdown

This commit is contained in:
George Tsiamasiotis 2024-10-02 22:01:41 +03:00
parent f90172c2db
commit 20b1112422
2 changed files with 65 additions and 35 deletions

View File

@ -5,6 +5,7 @@ import os
import secrets import secrets
import shutil import shutil
import threading import threading
from itertools import chain
from typing import Any, Optional from typing import Any, Optional
import psutil import psutil
@ -66,6 +67,7 @@ from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor from frigate.timeline import TimelineProcessor
from frigate.util.builtin import empty_and_close_queue from frigate.util.builtin import empty_and_close_queue
from frigate.util.process import ProcessStopper
from frigate.version import VERSION from frigate.version import VERSION
from frigate.watchdog import FrigateWatchdog from frigate.watchdog import FrigateWatchdog
@ -550,52 +552,44 @@ class FrigateApp:
ReviewSegment.end_time == None ReviewSegment.end_time == None
).execute() ).execute()
# stop the audio process # The list of processes we need to stop
if self.audio_process: processes: list[mp.Process] = []
self.audio_process.terminate()
self.audio_process.join()
# ensure the capture processes are done # Stop the audio process
for name, camera in self.cameras.items(): processes.append(self.audio_process)
if camera.capture_process is not None:
logger.info(f"Waiting for capture process for {name} to stop")
camera.capture_process.terminate()
camera.capture_process.join()
# ensure the camera processors are done # Stop the camera processors
processes.extend(
chain.from_iterable(
(camera.capture_process, camera.process)
for camera in self.cameras.values()
)
)
# Stop all remaining processes
processes.extend(self.detectors.values())
processes.append(self.output_processor)
processes.append(self.recording_process)
processes.append(self.review_segment_process)
ProcessStopper(processes)
# Close pending frame queues
for name, camera in self.cameras.items(): for name, camera in self.cameras.items():
if camera.process is not None:
logger.info(f"Waiting for process for {name} to stop")
camera.process.terminate()
camera.process.join()
logger.info(f"Closing frame queue for {name}") logger.info(f"Closing frame queue for {name}")
empty_and_close_queue(camera.camera_metrics.frame_queue) empty_and_close_queue(camera.camera_metrics.frame_queue)
# ensure the detectors are done logger.info("Closing detection queue")
for detector in self.detectors.values():
detector.terminate()
detector.join()
empty_and_close_queue(self.detection_queue) empty_and_close_queue(self.detection_queue)
logger.info("Detection queue closed")
self.detected_frames_processor.join() self.detected_frames_processor.join()
logger.info("Closing detected frames queue")
empty_and_close_queue(self.detected_frames_queue) empty_and_close_queue(self.detected_frames_queue)
logger.info("Detected frames queue closed")
self.timeline_processor.join() self.timeline_processor.join()
self.event_processor.join() self.event_processor.join()
empty_and_close_queue(self.timeline_queue) empty_and_close_queue(self.timeline_queue)
logger.info("Timeline queue closed") logger.info("Closing timeline queue")
self.output_processor.terminate()
self.output_processor.join()
self.recording_process.terminate()
self.recording_process.join()
self.review_segment_process.terminate()
self.review_segment_process.join()
self.external_event_processor.stop() self.external_event_processor.stop()
self.dispatcher.stop() self.dispatcher.stop()

View File

@ -6,10 +6,13 @@ import sys
import threading import threading
from functools import wraps from functools import wraps
from logging.handlers import QueueHandler from logging.handlers import QueueHandler
from multiprocessing.connection import wait as mp_wait
from typing import Any from typing import Any
import frigate.log import frigate.log
logger = logging.getLogger(__name__)
class BaseProcess(mp.Process): class BaseProcess(mp.Process):
def __init__(self, **kwargs): def __init__(self, **kwargs):
@ -81,3 +84,36 @@ class Process(BaseProcess):
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal) signal.signal(signal.SIGINT, receiveSignal)
class ProcessStopper(threading.Thread):
def __init__(self, processes: list[mp.Process]):
super().__init__()
self.processes = processes
self.start()
def run(self):
# Stop all processes registered for stopping
sentinels: dict[int, mp.Process] = {}
for process in self.processes:
if process is None:
continue
process.terminate()
sentinels[process.sentinel] = process
# Wait for all the processes to shutdown
logger.info(f"Waiting for {len(sentinels)} processes to stop")
while sentinels:
ready = mp_wait(sentinels.keys(), timeout=10)
if ready:
for sentinel in ready:
name = sentinels[sentinel].name
del sentinels[sentinel]
logger.info(f"Process {name} has stopped")
else:
proc: mp.Process = next(iter(sentinels.values()))
logger.warning(
f"{len(sentinels)} processes are still running. "
f"Killing {proc.name or 'one of them'}"
)
proc.kill()