Refactored LimitedQueue to include a counter for the number of items in the queue and updated put and get methods to use the counter

This commit is contained in:
Sergey Krashevich 2023-07-02 02:40:00 +03:00
parent 22ce05be7e
commit c62a427c5c
No known key found for this signature in database
GPG Key ID: 625171324E7D3856

View File

@ -1,7 +1,9 @@
import copy import copy
import ctypes
import datetime import datetime
import json import json
import logging import logging
import multiprocessing
import os import os
import re import re
import shlex import shlex
@ -14,7 +16,7 @@ from collections import Counter
from collections.abc import Mapping from collections.abc import Mapping
from multiprocessing import shared_memory from multiprocessing import shared_memory
from typing import Any, AnyStr, Optional, Tuple from typing import Any, AnyStr, Optional, Tuple
from faster_fifo import Queue as FFQueue from faster_fifo import Queue as FFQueue, DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from queue import Full, Empty from queue import Full, Empty
import time import time
import cv2 import cv2
@ -1222,39 +1224,105 @@ def get_video_properties(url, get_duration=False):
return result return result
class LimitedQueue: def update_yaml_from_url(file_path, url):
def __init__(self, maxsize=0, max_size_bytes=None, loads=None, dumps=None): parsed_url = urllib.parse.urlparse(url)
self.maxsize = maxsize query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True)
self.queue = FFQueue(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.size = 0
def put(self, item, block=True, timeout=None): for key_path_str, new_value_list in query_string.items():
if self.maxsize > 0 and self.size >= self.maxsize: key_path = key_path_str.split(".")
if block: for i in range(len(key_path)):
start_time = time.time() try:
while self.size >= self.maxsize: index = int(key_path[i])
remaining = timeout - (time.time() - start_time) key_path[i] = (key_path[i - 1], index)
if remaining <= 0.0: key_path.pop(i - 1)
raise Full except ValueError:
time.sleep(min(remaining, 0.1)) pass
new_value = new_value_list[0]
update_yaml_file(file_path, key_path, new_value)
def update_yaml_file(file_path, key_path, new_value):
yaml = YAML()
with open(file_path, "r") as f:
data = yaml.load(f)
temp = data
for key in key_path[:-1]:
if isinstance(key, tuple):
if key[0] not in temp:
temp[key[0]] = [{}] * max(1, key[1] + 1)
elif len(temp[key[0]]) <= key[1]:
temp[key[0]] += [{}] * (key[1] - len(temp[key[0]]) + 1)
temp = temp[key[0]][key[1]]
else:
if key not in temp:
temp[key] = []
temp = temp[key]
print(new_value)
last_key = key_path[-1]
if new_value == "":
print(last_key)
if isinstance(last_key, tuple):
del temp[last_key[0]][last_key[1]]
else:
del temp[last_key]
else:
if isinstance(last_key, tuple):
if last_key[0] not in temp:
temp[last_key[0]] = [{}] * max(1, last_key[1] + 1)
elif len(temp[last_key[0]]) <= last_key[1]:
temp[last_key[0]] += [{}] * (last_key[1] - len(temp[last_key[0]]) + 1)
temp[last_key[0]][last_key[1]] = new_value
else:
if (
last_key in temp
and isinstance(temp[last_key], dict)
and isinstance(new_value, dict)
):
temp[last_key].update(new_value)
else: else:
raise Full temp[last_key] = new_value
self.queue.put(item)
self.size += 1
def get(self, block=True, timeout=None): with open(file_path, "w") as f:
if self.size <= 0: yaml.dump(data, f)
if not block:
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
with self.lock: # Acquire the lock
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
with self.lock: # Acquire the lock
if self.size.value <= 0 and not block:
raise Empty raise Empty
start_time = time.time() self.size.value -= 1
while self.size <= 0: return super().get(block=block, timeout=timeout)
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Empty
time.sleep(min(remaining, 0.1))
item = self.queue.get()
self.size -= 1
return item
def qsize(self): def qsize(self):
return self.size return self.size