diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index 5398b91e5..ae3bf3f53 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -116,8 +116,7 @@ def run_detector(detection_queue, out_events: Dict[str, mp.Event], avg_speed, st while True: connection_id = detection_queue.get() - input_frame = frame_manager.get(connection_id, (1,300,300,3)) - + input_frame = frame_manager.get((1,300,300,3), name=connection_id) if input_frame is None: continue diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 669ef1418..65ced9f30 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -61,7 +61,7 @@ class CameraState(): self._current_frame = np.zeros((self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]), np.uint8) self.current_frame_lock = threading.Lock() self.current_frame_time = 0.0 - self.previous_frame_id = None + self.previous_frame_idx = None self.callbacks = defaultdict(lambda: []) def get_current_frame(self, draw=False): @@ -120,11 +120,11 @@ class CameraState(): def on(self, event_type: str, callback: Callable[[Dict], None]): self.callbacks[event_type].append(callback) - def update(self, frame_time, tracked_objects): + def update(self, idx, frame_time, tracked_objects): self.current_frame_time = frame_time # get the new frame and delete the old frame frame_id = f"{self.name}{frame_time}" - current_frame = self.frame_manager.get(frame_id, (self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1])) + current_frame = self.frame_manager.get((self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]), idx) current_ids = tracked_objects.keys() previous_ids = self.tracked_objects.keys() @@ -237,9 +237,9 @@ class CameraState(): with self.current_frame_lock: self._current_frame = current_frame - if not self.previous_frame_id is None: - self.frame_manager.delete(self.previous_frame_id) - self.previous_frame_id = frame_id + if not self.previous_frame_idx is None: + self.frame_manager.close(self.previous_frame_idx) + self.previous_frame_idx = idx class TrackedObjectProcessor(threading.Thread): def __init__(self, camera_config, client, topic_prefix, tracked_objects_queue, event_queue, stop_event): @@ -251,7 +251,6 @@ class TrackedObjectProcessor(threading.Thread): self.event_queue = event_queue self.stop_event = stop_event self.camera_states: Dict[str, CameraState] = {} - self.frame_manager = SharedMemoryFrameManager() def start(camera, obj): # publish events to mqtt @@ -303,7 +302,7 @@ class TrackedObjectProcessor(threading.Thread): self.client.publish(f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False) for camera in self.camera_config.keys(): - camera_state = CameraState(camera, self.camera_config[camera], self.frame_manager) + camera_state = CameraState(camera, self.camera_config[camera], SharedMemoryFrameManager(camera)) camera_state.on('start', start) camera_state.on('update', update) camera_state.on('end', end) @@ -363,13 +362,13 @@ class TrackedObjectProcessor(threading.Thread): break try: - camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get(True, 10) + camera, idx, frame_time, current_tracked_objects = self.tracked_objects_queue.get(True, 10) except queue.Empty: continue camera_state = self.camera_states[camera] - camera_state.update(frame_time, current_tracked_objects) + camera_state.update(idx, frame_time, current_tracked_objects) # update zone status for each label for zone in camera_state.config['zones'].keys(): diff --git a/frigate/util.py b/frigate/util.py index 9040577ae..7a5d3accd 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod import datetime +from io import UnsupportedOperation import time import signal import traceback @@ -10,7 +11,7 @@ import threading import matplotlib.pyplot as plt import hashlib from multiprocessing import shared_memory -from typing import AnyStr +from typing import AnyStr, Optional, Tuple def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'): if color is None: @@ -216,28 +217,128 @@ class DictFrameManager(FrameManager): del self.frames[name] class SharedMemoryFrameManager(FrameManager): - def __init__(self): - self.shm_store = {} + + """Initialize a SharedMemoryFrameManager. + Frames are stored as SharedMemory segments, named _ + The first byte of each slot represents whether the segment is "freed". + 0 means the segment is free and available for use + Any other value means the segment is in use. - def create(self, name, size) -> AnyStr: - shm = shared_memory.SharedMemory(name=name, create=True, size=size) - self.shm_store[name] = shm - return shm.buf + Args: + name: The name of the instance to open. + If not set, a name argument *must* be passed in to each operation. + size: The size of frames created by this frame manager. + If not set, the frame manager will be read only. + """ + def __init__(self, name: Optional[str] = None, size : Optional[int]= None): + # list of opened shms + self.shm_lists = {} + # If name is set, create the dict for the default namespace + self.shm_lists[name] = {} + self.name = name + self.size = size + + """Create a new frame. - def get(self, name, shape): - if name in self.shm_store: - shm = self.shm_store[name] + Returns: + A tuple containing the allocated frame id and the frame itself. + + Raises: + UnsupportedOperation: If the size of the frame manager is not set. + """ + def create(self, name = None) -> Tuple[int, AnyStr]: + + if name is None: + name = self.name + + if name is None: + raise UnsupportedOperation("Cannot create without name") + + if self.size is None: + raise UnsupportedOperation("Cannot create new frame without size set") + + if name not in self.shm_lists: + self.shm_lists[name] = {} + + shm_list = self.shm_lists[name] + # Check if we have any free SHMs in our list + for idx in shm_list: + shm = shm_list[idx] + if shm.buf[0] == 0: + shm.buf[0] = 1 + return idx, shm.buf[1:] + + # No free SHMs. Try to create a new SHM. + # We'll start after the maximum index, but this may create a sparse list. + # Having a sparse list shouldn't really matter though. + idx = 0 if not shm_list else max(shm_list) + 1 + + while True: + try: + shm = shared_memory.SharedMemory(name=f"{self.name}_{idx}", create=True, size=self.size+1) + shm.buf[0] = 1 + shm_list[idx] = shm + return idx, shm.buf[1:] + except FileExistsError: + # The SHM already exists. Return it if it is free. + shm = shared_memory.SharedMemory(name=f"{self.name}_{idx}") + if (shm.buf[0] == 0): + shm.buf[0] = 1 + shm_list[idx] = shm + return idx, shm.buf[1:] + # It's locked, so we move to the next idx. + idx = idx + 1 + + """ Get an existing frame. + + Args: + shape: The shape of the resulting numpy array + index: The index number of the frame to get. + If the index is not set, fetches the shm with . + + Returns: + A numpy array with the frame. + """ + def get(self, shape, index : Optional[int] = None, name : Optional[str] = None): + if name is None: + name = self.name + + if name is None: + raise UnsupportedOperation("Cannot get without name") + + # If the shm isn't already open + if name not in self.shm_lists: + self.shm_lists[name] = {} + shm_list = self.shm_lists[name] + + if index is None: + index = -1 # special value for "root" index + if index not in shm_list: + # The shm isn't open yet, so open it and cache it in our list + memory_name = name if index == -1 else f"{name}_{index}" + shm = shared_memory.SharedMemory(name=memory_name) + shm_list[index] = shm else: - shm = shared_memory.SharedMemory(name=name) - self.shm_store[name] = shm - return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf) + shm = shm_list[index] + + buffer = shm.buf if index is -1 else shm.buf[1:] + return np.ndarray(shape, dtype=np.uint8, buffer=buffer) - def close(self, name): - if name in self.shm_store: - self.shm_store[name].close() - del self.shm_store[name] + """ Close the frame, freeing it for reuse. + + Args: + index: The index number of the frame to free. + """ + def close(self, index : int, name : Optional[str] = None): + if name is None: + name = self.name + + if name is None: + raise UnsupportedOperation("Cannot close without name") + self.shm_lists[name][index].buf[0] = 0 def delete(self, name): + ### will be deprecated if name in self.shm_store: self.shm_store[name].close() self.shm_store[name].unlink() diff --git a/frigate/video.py b/frigate/video.py index c60714ca1..5180a2b14 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -11,6 +11,7 @@ import numpy as np import copy import itertools import json +import traceback import base64 from typing import Dict, List from collections import defaultdict @@ -133,16 +134,15 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram break current_frame.value = datetime.datetime.now().timestamp() - frame_name = f"{camera_name}{current_frame.value}" - frame_buffer = frame_manager.create(frame_name, frame_size) + #frame_name = f"{camera_name}{current_frame.value}" + idx, frame_buffer = frame_manager.create() try: frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) except: print(f"{camera_name}: ffmpeg sent a broken frame. something is wrong.") - if ffmpeg_process.poll() != None: print(f"{camera_name}: ffmpeg process is not running. exiting capture thread...") - frame_manager.delete(frame_name) + frame_manager.close(idx) break continue @@ -152,20 +152,17 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram frame_num += 1 if (frame_num % take_frame) != 0: skipped_eps.update() - frame_manager.delete(frame_name) + frame_manager.close(idx) continue # if the queue is full, skip this frame if frame_queue.full(): skipped_eps.update() - frame_manager.delete(frame_name) + frame_manager.close(idx) continue - # close the frame - frame_manager.close(frame_name) - # add to the queue - frame_queue.put(current_frame.value) + frame_queue.put((idx, current_frame.value)) class CameraWatchdog(threading.Thread): def __init__(self, name, config, frame_queue, camera_fps, ffmpeg_pid, stop_event): @@ -224,7 +221,7 @@ class CameraCapture(threading.Thread): self.take_frame = take_frame self.fps = fps self.skipped_fps = EventsPerSecond() - self.frame_manager = SharedMemoryFrameManager() + self.frame_manager = SharedMemoryFrameManager(name, frame_shape[0] * frame_shape[1] * 3 // 2) self.ffmpeg_process = ffmpeg_process self.current_frame = mp.Value('d', 0.0) self.last_frame = 0 @@ -279,7 +276,7 @@ def track_camera(name, config, detection_queue, result_connection, detected_obje object_tracker = ObjectTracker(10) - frame_manager = SharedMemoryFrameManager() + frame_manager = SharedMemoryFrameManager(name) process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector, object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask, stop_event) @@ -335,16 +332,16 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, break try: - frame_time = frame_queue.get(True, 10) + idx, frame_time = frame_queue.get(True, 10) except queue.Empty: continue current_frame_time.value = frame_time - frame = frame_manager.get(f"{camera_name}{frame_time}", (frame_shape[0]*3//2, frame_shape[1])) + frame = frame_manager.get((frame_shape[0]*3//2, frame_shape[1]), idx) if frame is None: - print(f"{camera_name}: frame {frame_time} is not in memory store.") + print(f"{camera_name}: frame {frame_time} (index {idx}) is not in memory store.") continue # look for motion @@ -419,11 +416,11 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, # add to the queue if not full if(detected_objects_queue.full()): - frame_manager.delete(f"{camera_name}{frame_time}") + frame_manager.close(idx) continue else: fps_tracker.update() fps.value = fps_tracker.eps() - detected_objects_queue.put((camera_name, frame_time, object_tracker.tracked_objects)) + detected_objects_queue.put((camera_name, idx, frame_time, object_tracker.tracked_objects)) detection_fps.value = object_detector.fps.eps() - frame_manager.close(f"{camera_name}{frame_time}") + frame_manager.close(idx)