Add thread-safety to LimitedQueue by implementing a lock for put and get methods

This commit is contained in:
Sergey Krashevich 2023-07-06 17:09:56 +03:00
parent 2fae9dcb93
commit bf234445af
No known key found for this signature in database
GPG Key ID: 625171324E7D3856

View File

@ -1239,29 +1239,33 @@ class LimitedQueue(FFQueue):
self.size = multiprocessing.RawValue( self.size = multiprocessing.RawValue(
ctypes.c_int, 0 ctypes.c_int, 0
) # Add a counter for the number of items in the queue ) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock for thread-safety
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize: with self.lock: # Ensure thread-safety
if block: if self.maxsize > 0 and self.size.value >= self.maxsize:
start_time = time.time() if block:
while self.size.value >= self.maxsize: start_time = time.time()
remaining = timeout - (time.time() - start_time) while self.size.value >= self.maxsize:
if remaining <= 0.0: remaining = timeout - (time.time() - start_time)
raise Full if remaining <= 0.0:
time.sleep(min(remaining, 0.1)) raise Full
else: time.sleep(min(remaining, 0.1))
raise Full else:
self.size.value += 1 raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout) return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT): def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block: item = super().get(block=block, timeout=timeout)
raise Empty with self.lock: # Ensure thread-safety
self.size.value -= 1 if self.size.value <= 0 and not block:
return super().get(block=block, timeout=timeout) raise Empty
self.size.value -= 1
return item
def qsize(self): def qsize(self):
return self.size return self.size.value # Return the value, not the RawValue object
def empty(self): def empty(self):
return self.qsize() == 0 return self.qsize() == 0