more carefully manage shutdown to avoid threadlocks

This commit is contained in:
Blake Blackshear 2024-06-05 06:45:17 -05:00
parent 458e069d79
commit bb31e83ea1
9 changed files with 66 additions and 53 deletions

View File

@ -68,7 +68,7 @@ from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, PTZMetricsTypes from frigate.types import CameraMetricsTypes, PTZMetricsTypes
from frigate.util.builtin import save_default_config from frigate.util.builtin import empty_and_close_queue, save_default_config
from frigate.util.config import migrate_frigate_config from frigate.util.config import migrate_frigate_config
from frigate.util.object import get_camera_regions_grid from frigate.util.object import get_camera_regions_grid
from frigate.version import VERSION from frigate.version import VERSION
@ -521,8 +521,9 @@ class FrigateApp:
logger.info(f"Capture process started for {name}: {capture_process.pid}") logger.info(f"Capture process started for {name}: {capture_process.pid}")
def start_audio_processors(self) -> None: def start_audio_processors(self) -> None:
self.audio_process = None
if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0: if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0:
audio_process = mp.Process( self.audio_process = mp.Process(
target=listen_to_audio, target=listen_to_audio,
name="audio_capture", name="audio_capture",
args=( args=(
@ -530,10 +531,10 @@ class FrigateApp:
self.camera_metrics, self.camera_metrics,
), ),
) )
audio_process.daemon = True self.audio_process.daemon = True
audio_process.start() self.audio_process.start()
self.processes["audio_detector"] = audio_process.pid or 0 self.processes["audio_detector"] = self.audio_process.pid or 0
logger.info(f"Audio process started: {audio_process.pid}") logger.info(f"Audio process started: {self.audio_process.pid}")
def start_timeline_processor(self) -> None: def start_timeline_processor(self) -> None:
self.timeline_processor = TimelineProcessor( self.timeline_processor = TimelineProcessor(
@ -706,9 +707,9 @@ class FrigateApp:
self.check_shm() self.check_shm()
self.init_auth() self.init_auth()
# Flask only listens for SIGINT, so we need to catch SIGTERM and send SIGINT
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
self.stop() os.kill(os.getpid(), signal.SIGINT)
sys.exit()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
@ -717,10 +718,13 @@ class FrigateApp:
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
logger.info("Flask has exited...")
self.stop() self.stop()
def stop(self) -> None: def stop(self) -> None:
logger.info("Stopping...") logger.info("Stopping...")
self.stop_event.set() self.stop_event.set()
# set an end_time on entries without an end_time before exiting # set an end_time on entries without an end_time before exiting
@ -731,51 +735,57 @@ class FrigateApp:
ReviewSegment.end_time == None ReviewSegment.end_time == None
).execute() ).execute()
# stop the audio process
if self.audio_process is not None:
self.audio_process.terminate()
self.audio_process.join()
# ensure the capture processes are done # ensure the capture processes are done
for camera in self.camera_metrics.keys(): for camera in self.camera_metrics.keys():
capture_process = self.camera_metrics[camera]["capture_process"] capture_process = self.camera_metrics[camera]["capture_process"]
logger.info(f"Waiting for capture process for {camera} to stop") logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.terminate()
capture_process.join() capture_process.join()
# ensure the detectors are done
for detector in self.detectors.values():
detector.stop()
# Empty the detection queue and set the events for all requests
while not self.detection_queue.empty():
connection_id = self.detection_queue.get(False)
self.detection_out_events[connection_id].set()
# ensure the camera processors are done # ensure the camera processors are done
for camera in self.camera_metrics.keys(): for camera in self.camera_metrics.keys():
camera_process = self.camera_metrics[camera]["process"] camera_process = self.camera_metrics[camera]["process"]
logger.info(f"Waiting for process for {camera} to stop") logger.info(f"Waiting for process for {camera} to stop")
camera_process.terminate()
camera_process.join() camera_process.join()
logger.info(f"Closing frame queue for {camera}") logger.info(f"Closing frame queue for {camera}")
frame_queue = self.camera_metrics[camera]["frame_queue"] frame_queue = self.camera_metrics[camera]["frame_queue"]
frame_queue.close() empty_and_close_queue(frame_queue)
frame_queue.join_thread()
# Empty the detection queue # ensure the detectors are done
while not self.detection_queue.empty(): for detector in self.detectors.values():
connection_id = self.detection_queue.get(False) detector.stop()
self.detection_out_events[connection_id].set()
# Empty the detected frames queue empty_and_close_queue(self.detection_queue)
while not self.detected_frames_queue.empty(): logger.info("Detection queue closed")
self.detected_frames_queue.get(False)
self.detected_frames_processor.join()
empty_and_close_queue(self.detected_frames_queue)
logger.info("Detected frames queue closed")
self.timeline_processor.join()
self.event_processor.join()
empty_and_close_queue(self.timeline_queue)
logger.info("Timeline queue closed")
self.output_processor.terminate()
self.output_processor.join()
self.recording_process.terminate()
self.recording_process.join()
self.review_segment_process.terminate()
self.review_segment_process.join()
self.external_event_processor.stop() self.external_event_processor.stop()
self.dispatcher.stop() self.dispatcher.stop()
self.detected_frames_processor.join()
self.ptz_autotracker_thread.join() self.ptz_autotracker_thread.join()
self.event_processor.join()
# Empty the timeline queue
while not self.timeline_queue.empty():
self.timeline_queue.get(False)
self.timeline_processor.join()
self.event_cleanup.join() self.event_cleanup.join()
self.record_cleanup.join() self.record_cleanup.join()
self.stats_emitter.join() self.stats_emitter.join()
@ -792,21 +802,7 @@ class FrigateApp:
shm.close() shm.close()
shm.unlink() shm.unlink()
# Close queues self.log_process.terminate()
for queue in [ self.log_process.join()
self.detection_queue,
self.detected_frames_queue,
self.timeline_queue,
]:
if queue is not None:
while not queue.empty():
queue.get_nowait()
queue.close()
queue.join_thread()
# Empty log queue os._exit(os.EX_OK)
if self.log_queue is not None:
while not self.log_queue.empty():
self.log_queue.get_nowait()
self.log_queue.close()
self.log_queue.join_thread()

View File

@ -83,6 +83,7 @@ def listen_to_audio(
logger.info("Exiting audio detector...") logger.info("Exiting audio detector...")
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
logger.info(f"Audio process received signal {signalNumber}")
stop_event.set() stop_event.set()
exit_process() exit_process()

View File

@ -57,8 +57,8 @@ def log_process(log_queue: Queue) -> None:
while True: while True:
try: try:
record = log_queue.get(timeout=1) record = log_queue.get(block=True, timeout=1.0)
except (queue.Empty, KeyboardInterrupt): except queue.Empty:
if stop_event.is_set(): if stop_event.is_set():
break break
continue continue

View File

@ -38,6 +38,7 @@ def output_frames(
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
logger.info(f"Output frames process received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)

View File

@ -22,6 +22,7 @@ def manage_recordings(config: FrigateConfig) -> None:
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
logger.info(f"Recording manager process received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)

View File

@ -20,6 +20,7 @@ def manage_review_segments(config: FrigateConfig) -> None:
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
logger.info(f"Manage review segments process received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)

View File

@ -3,6 +3,8 @@
import copy import copy
import datetime import datetime
import logging import logging
import multiprocessing as mp
import queue
import re import re
import shlex import shlex
import urllib.parse import urllib.parse
@ -337,3 +339,13 @@ def clear_and_unlink(file: Path, missing_ok: bool = True) -> None:
pass pass
file.unlink(missing_ok=missing_ok) file.unlink(missing_ok=missing_ok)
def empty_and_close_queue(q: mp.Queue):
while True:
try:
q.get(block=True, timeout=0.5)
except queue.Empty:
q.close()
q.join_thread()
return

View File

@ -33,7 +33,7 @@ def restart_frigate():
proc.terminate() proc.terminate()
# otherwise, just try and exit frigate # otherwise, just try and exit frigate
else: else:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGINT)
def print_stack(sig, frame): def print_stack(sig, frame):

View File

@ -360,6 +360,7 @@ def capture_camera(name, config: CameraConfig, process_info):
stop_event = mp.Event() stop_event = mp.Event()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
logger.info(f"Capture camera received signal {signalNumber}")
stop_event.set() stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)