From bf234445af1ddcfbed03efc54e320949777ffd69 Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Thu, 6 Jul 2023 17:09:56 +0300 Subject: [PATCH] Add thread-safety to LimitedQueue by implementing a lock for put and get methods --- frigate/util.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/frigate/util.py b/frigate/util.py index cc9bb03c9..3a8746100 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1239,29 +1239,33 @@ class LimitedQueue(FFQueue): self.size = multiprocessing.RawValue( ctypes.c_int, 0 ) # Add a counter for the number of items in the queue + self.lock = multiprocessing.Lock() # Add a lock for thread-safety def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): - if self.maxsize > 0 and self.size.value >= self.maxsize: - if block: - start_time = time.time() - while self.size.value >= self.maxsize: - remaining = timeout - (time.time() - start_time) - if remaining <= 0.0: - raise Full - time.sleep(min(remaining, 0.1)) - else: - raise Full - self.size.value += 1 + with self.lock: # Ensure thread-safety + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 return super().put(x, block=block, timeout=timeout) def get(self, block=True, timeout=DEFAULT_TIMEOUT): - if self.size.value <= 0 and not block: - raise Empty - self.size.value -= 1 - return super().get(block=block, timeout=timeout) + item = super().get(block=block, timeout=timeout) + with self.lock: # Ensure thread-safety + if self.size.value <= 0 and not block: + raise Empty + self.size.value -= 1 + return item def qsize(self): - return self.size + return self.size.value # Return the value, not the RawValue object def empty(self): return self.qsize() == 0