Refactor put and get methods in LimitedQueue to handle queue size and blocking behavior more efficiently

This commit is contained in:
Sergey Krashevich 2023-07-02 03:12:13 +03:00
parent 4e2a529b0e
commit b682d835d0
No known key found for this signature in database
GPG Key ID: 625171324E7D3856

View File

@ -1300,28 +1300,25 @@ class LimitedQueue(FFQueue):
self.size = multiprocessing.RawValue( self.size = multiprocessing.RawValue(
ctypes.c_int, 0 ctypes.c_int, 0
) # Add a counter for the number of items in the queue ) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
with self.lock: # Acquire the lock if self.maxsize > 0 and self.size.value >= self.maxsize:
if self.maxsize > 0 and self.size.value >= self.maxsize: if block:
if block: start_time = time.time()
start_time = time.time() while self.size.value >= self.maxsize:
while self.size.value >= self.maxsize: remaining = timeout - (time.time() - start_time)
remaining = timeout - (time.time() - start_time) if remaining <= 0.0:
if remaining <= 0.0: raise Full
raise Full time.sleep(min(remaining, 0.1))
time.sleep(min(remaining, 0.1)) else:
else: raise Full
raise Full self.size.value += 1
self.size.value += 1
return super().put(x, block=block, timeout=timeout) return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT): def get(self, block=True, timeout=DEFAULT_TIMEOUT):
with self.lock: # Acquire the lock if self.size.value <= 0 and not block:
if self.size.value <= 0 and not block: raise Empty
raise Empty self.size.value -= 1
self.size.value -= 1
return super().get(block=block, timeout=timeout) return super().get(block=block, timeout=timeout)
def qsize(self): def qsize(self):