bug: fix locks/accounting in LimitedQueue

This commit is contained in:
Michael Wei 2023-07-07 13:34:24 -07:00
parent 00b9a490bb
commit 3631dd4bba
2 changed files with 43 additions and 47 deletions

View File

@ -1,7 +1,6 @@
"""Utilities for builtin types manipulation.""" """Utilities for builtin types manipulation."""
import copy import copy
import ctypes
import datetime import datetime
import logging import logging
import multiprocessing import multiprocessing
@ -11,7 +10,7 @@ import time
import urllib.parse import urllib.parse
from collections import Counter from collections import Counter
from collections.abc import Mapping from collections.abc import Mapping
from queue import Empty, Full from queue import Full
from typing import Any, Tuple from typing import Any, Tuple
import numpy as np import numpy as np
@ -75,39 +74,40 @@ class LimitedQueue(FFQueue):
): ):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock for thread-safety self.lock = multiprocessing.Lock() # Add a lock for thread-safety
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): def put(self, x, block=True, timeout=None):
with self.lock: # Ensure thread-safety # ensure only one writer.
if self.maxsize > 0 and self.size.value >= self.maxsize: with self.lock:
# block/full due to num elems
if self.maxsize > 0 and self.qsize() >= self.maxsize:
if block: if block:
start_time = time.time() if timeout is None:
while self.size.value >= self.maxsize: while self.qsize() >= self.maxsize:
remaining = timeout - (time.time() - start_time) time.sleep(
if remaining <= 0.0: 0.1
raise Full ) # 0.1s, might want to replace this with a signal.
time.sleep(min(remaining, 0.1)) else:
start_time = time.time()
while self.qsize() >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else: else:
raise Full raise Full
self.size.value += 1 # block/full due to underlying circular buffer being full
return super().put(x, block=block, timeout=timeout) if block and timeout is None:
# workaround for https://github.com/alex-petrenko/faster-fifo/issues/42
def get(self, block=True, timeout=DEFAULT_TIMEOUT): while True:
item = super().get(block=block, timeout=timeout) try:
with self.lock: # Ensure thread-safety return super().put(x, block=block, timeout=DEFAULT_TIMEOUT)
if self.size.value <= 0 and not block: except Full:
raise Empty logger.warn("Queue was full, retrying in 1s")
self.size.value -= 1 time.sleep(1)
return item return super().put(
x, block=block, timeout=DEFAULT_TIMEOUT if timeout is None else timeout
def qsize(self): )
return self.size.value
def empty(self):
return self.qsize() == 0
def full(self): def full(self):
return self.qsize() == self.maxsize return self.qsize() == self.maxsize

View File

@ -1025,21 +1025,17 @@ def process_frames(
f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg", f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg",
bgr_frame, bgr_frame,
) )
# add to the queue if not full # add to the queue, blocking in case the queue is full.
if detected_objects_queue.full(): fps_tracker.update()
frame_manager.delete(f"{camera_name}{frame_time}") fps.value = fps_tracker.eps()
continue detected_objects_queue.put(
else: (
fps_tracker.update() camera_name,
fps.value = fps_tracker.eps() frame_time,
detected_objects_queue.put( detections,
( motion_boxes,
camera_name, regions,
frame_time,
detections,
motion_boxes,
regions,
)
) )
detection_fps.value = object_detector.fps.eps() )
frame_manager.close(f"{camera_name}{frame_time}") detection_fps.value = object_detector.fps.eps()
frame_manager.close(f"{camera_name}{frame_time}")