From c62a427c5c2a7848a462396a420195fb186ebc4a Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Sun, 2 Jul 2023 02:40:00 +0300 Subject: [PATCH] Refactored LimitedQueue to include a counter for the number of items in the queue and updated put and get methods to use the counter --- frigate/util.py | 128 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 30 deletions(-) diff --git a/frigate/util.py b/frigate/util.py index d5259556f..ef372d5cb 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1,7 +1,9 @@ import copy +import ctypes import datetime import json import logging +import multiprocessing import os import re import shlex @@ -14,7 +16,7 @@ from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory from typing import Any, AnyStr, Optional, Tuple -from faster_fifo import Queue as FFQueue +from faster_fifo import Queue as FFQueue, DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT from queue import Full, Empty import time import cv2 @@ -1222,39 +1224,105 @@ def get_video_properties(url, get_duration=False): return result -class LimitedQueue: - def __init__(self, maxsize=0, max_size_bytes=None, loads=None, dumps=None): - self.maxsize = maxsize - self.queue = FFQueue(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) - self.size = 0 +def update_yaml_from_url(file_path, url): + parsed_url = urllib.parse.urlparse(url) + query_string = urllib.parse.parse_qs(parsed_url.query, keep_blank_values=True) - def put(self, item, block=True, timeout=None): - if self.maxsize > 0 and self.size >= self.maxsize: - if block: - start_time = time.time() - while self.size >= self.maxsize: - remaining = timeout - (time.time() - start_time) - if remaining <= 0.0: - raise Full - time.sleep(min(remaining, 0.1)) + for key_path_str, new_value_list in query_string.items(): + key_path = key_path_str.split(".") + for i in range(len(key_path)): + try: + index = int(key_path[i]) + key_path[i] = (key_path[i - 1], index) + key_path.pop(i - 1) + except ValueError: + pass + new_value = new_value_list[0] + update_yaml_file(file_path, key_path, new_value) + + +def update_yaml_file(file_path, key_path, new_value): + yaml = YAML() + with open(file_path, "r") as f: + data = yaml.load(f) + + temp = data + for key in key_path[:-1]: + if isinstance(key, tuple): + if key[0] not in temp: + temp[key[0]] = [{}] * max(1, key[1] + 1) + elif len(temp[key[0]]) <= key[1]: + temp[key[0]] += [{}] * (key[1] - len(temp[key[0]]) + 1) + temp = temp[key[0]][key[1]] + else: + if key not in temp: + temp[key] = [] + temp = temp[key] + print(new_value) + last_key = key_path[-1] + if new_value == "": + print(last_key) + if isinstance(last_key, tuple): + del temp[last_key[0]][last_key[1]] + else: + del temp[last_key] + else: + if isinstance(last_key, tuple): + if last_key[0] not in temp: + temp[last_key[0]] = [{}] * max(1, last_key[1] + 1) + elif len(temp[last_key[0]]) <= last_key[1]: + temp[last_key[0]] += [{}] * (last_key[1] - len(temp[last_key[0]]) + 1) + temp[last_key[0]][last_key[1]] = new_value + else: + if ( + last_key in temp + and isinstance(temp[last_key], dict) + and isinstance(new_value, dict) + ): + temp[last_key].update(new_value) else: - raise Full - self.queue.put(item) - self.size += 1 + temp[last_key] = new_value - def get(self, block=True, timeout=None): - if self.size <= 0: - if not block: + with open(file_path, "w") as f: + yaml.dump(data, f) + + +class LimitedQueue(FFQueue): + def __init__( + self, + maxsize=0, + max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, + loads=None, + dumps=None, + ): + super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) + self.maxsize = maxsize + self.size = multiprocessing.RawValue( + ctypes.c_int, 0 + ) # Add a counter for the number of items in the queue + self.lock = multiprocessing.Lock() # Add a lock + + def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): + with self.lock: # Acquire the lock + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 + return super().put(x, block=block, timeout=timeout) + + def get(self, block=True, timeout=DEFAULT_TIMEOUT): + with self.lock: # Acquire the lock + if self.size.value <= 0 and not block: raise Empty - start_time = time.time() - while self.size <= 0: - remaining = timeout - (time.time() - start_time) - if remaining <= 0.0: - raise Empty - time.sleep(min(remaining, 0.1)) - item = self.queue.get() - self.size -= 1 - return item + self.size.value -= 1 + return super().get(block=block, timeout=timeout) def qsize(self): return self.size