intentionally handle queues during shutdown and carefully manage shutdown order

This commit is contained in:
Blake Blackshear 2024-06-04 06:50:20 -05:00
parent 5b42c91a91
commit 458e069d79
2 changed files with 53 additions and 8 deletions

View File

@ -731,43 +731,82 @@ class FrigateApp:
ReviewSegment.end_time == None ReviewSegment.end_time == None
).execute() ).execute()
# Stop Communicators # ensure the capture processes are done
self.inter_process_communicator.stop() for camera in self.camera_metrics.keys():
self.inter_config_updater.stop() capture_process = self.camera_metrics[camera]["capture_process"]
self.inter_detection_proxy.stop() logger.info(f"Waiting for capture process for {camera} to stop")
capture_process.join()
# ensure the detectors are done
for detector in self.detectors.values(): for detector in self.detectors.values():
detector.stop() detector.stop()
# Empty the detection queue and set the events for all requests # Empty the detection queue and set the events for all requests
while not self.detection_queue.empty(): while not self.detection_queue.empty():
connection_id = self.detection_queue.get(timeout=1) connection_id = self.detection_queue.get(False)
self.detection_out_events[connection_id].set() self.detection_out_events[connection_id].set()
self.detection_queue.close()
self.detection_queue.join_thread() # ensure the camera processors are done
for camera in self.camera_metrics.keys():
camera_process = self.camera_metrics[camera]["process"]
logger.info(f"Waiting for process for {camera} to stop")
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
frame_queue = self.camera_metrics[camera]["frame_queue"]
frame_queue.close()
frame_queue.join_thread()
# Empty the detection queue
while not self.detection_queue.empty():
connection_id = self.detection_queue.get(False)
self.detection_out_events[connection_id].set()
# Empty the detected frames queue
while not self.detected_frames_queue.empty():
self.detected_frames_queue.get(False)
self.external_event_processor.stop() self.external_event_processor.stop()
self.dispatcher.stop() self.dispatcher.stop()
self.detected_frames_processor.join() self.detected_frames_processor.join()
self.ptz_autotracker_thread.join() self.ptz_autotracker_thread.join()
self.event_processor.join() self.event_processor.join()
# Empty the timeline queue
while not self.timeline_queue.empty():
self.timeline_queue.get(False)
self.timeline_processor.join()
self.event_cleanup.join() self.event_cleanup.join()
self.record_cleanup.join() self.record_cleanup.join()
self.stats_emitter.join() self.stats_emitter.join()
self.frigate_watchdog.join() self.frigate_watchdog.join()
self.db.stop() self.db.stop()
# Stop Communicators
self.inter_process_communicator.stop()
self.inter_config_updater.stop()
self.inter_detection_proxy.stop()
while len(self.detection_shms) > 0: while len(self.detection_shms) > 0:
shm = self.detection_shms.pop() shm = self.detection_shms.pop()
shm.close() shm.close()
shm.unlink() shm.unlink()
# Close queues
for queue in [ for queue in [
self.detection_queue,
self.detected_frames_queue, self.detected_frames_queue,
self.log_queue, self.timeline_queue,
]: ]:
if queue is not None: if queue is not None:
while not queue.empty(): while not queue.empty():
queue.get_nowait() queue.get_nowait()
queue.close() queue.close()
queue.join_thread() queue.join_thread()
# Empty log queue
if self.log_queue is not None:
while not self.log_queue.empty():
self.log_queue.get_nowait()
self.log_queue.close()
self.log_queue.join_thread()

View File

@ -446,6 +446,12 @@ def track_camera(
region_grid, region_grid,
) )
# empty the frame queue
logger.info(f"{name}: emptying frame queue")
while not frame_queue.empty():
frame_time = frame_queue.get(False)
frame_manager.delete(f"{name}{frame_time}")
logger.info(f"{name}: exiting subprocess") logger.info(f"{name}: exiting subprocess")