Reuse shared memory segments

This commit is contained in:
Michael Wei 2020-11-28 01:51:14 +00:00
parent 893e6b40a7
commit e78cec0f10
4 changed files with 143 additions and 47 deletions

View File

@ -116,8 +116,7 @@ def run_detector(detection_queue, out_events: Dict[str, mp.Event], avg_speed, st
while True:
connection_id = detection_queue.get()
input_frame = frame_manager.get(connection_id, (1,300,300,3))
input_frame = frame_manager.get((1,300,300,3), name=connection_id)
if input_frame is None:
continue

View File

@ -61,7 +61,7 @@ class CameraState():
self._current_frame = np.zeros((self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]), np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.previous_frame_id = None
self.previous_frame_idx = None
self.callbacks = defaultdict(lambda: [])
def get_current_frame(self, draw=False):
@ -120,11 +120,11 @@ class CameraState():
def on(self, event_type: str, callback: Callable[[Dict], None]):
self.callbacks[event_type].append(callback)
def update(self, frame_time, tracked_objects):
def update(self, idx, frame_time, tracked_objects):
self.current_frame_time = frame_time
# get the new frame and delete the old frame
frame_id = f"{self.name}{frame_time}"
current_frame = self.frame_manager.get(frame_id, (self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]))
current_frame = self.frame_manager.get((self.config['frame_shape'][0]*3//2, self.config['frame_shape'][1]), idx)
current_ids = tracked_objects.keys()
previous_ids = self.tracked_objects.keys()
@ -237,9 +237,9 @@ class CameraState():
with self.current_frame_lock:
self._current_frame = current_frame
if not self.previous_frame_id is None:
self.frame_manager.delete(self.previous_frame_id)
self.previous_frame_id = frame_id
if not self.previous_frame_idx is None:
self.frame_manager.close(self.previous_frame_idx)
self.previous_frame_idx = idx
class TrackedObjectProcessor(threading.Thread):
def __init__(self, camera_config, client, topic_prefix, tracked_objects_queue, event_queue, stop_event):
@ -251,7 +251,6 @@ class TrackedObjectProcessor(threading.Thread):
self.event_queue = event_queue
self.stop_event = stop_event
self.camera_states: Dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager()
def start(camera, obj):
# publish events to mqtt
@ -303,7 +302,7 @@ class TrackedObjectProcessor(threading.Thread):
self.client.publish(f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False)
for camera in self.camera_config.keys():
camera_state = CameraState(camera, self.camera_config[camera], self.frame_manager)
camera_state = CameraState(camera, self.camera_config[camera], SharedMemoryFrameManager(camera))
camera_state.on('start', start)
camera_state.on('update', update)
camera_state.on('end', end)
@ -363,13 +362,13 @@ class TrackedObjectProcessor(threading.Thread):
break
try:
camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get(True, 10)
camera, idx, frame_time, current_tracked_objects = self.tracked_objects_queue.get(True, 10)
except queue.Empty:
continue
camera_state = self.camera_states[camera]
camera_state.update(frame_time, current_tracked_objects)
camera_state.update(idx, frame_time, current_tracked_objects)
# update zone status for each label
for zone in camera_state.config['zones'].keys():

View File

@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
import datetime
from io import UnsupportedOperation
import time
import signal
import traceback
@ -10,7 +11,7 @@ import threading
import matplotlib.pyplot as plt
import hashlib
from multiprocessing import shared_memory
from typing import AnyStr
from typing import AnyStr, Optional, Tuple
def draw_box_with_label(frame, x_min, y_min, x_max, y_max, label, info, thickness=2, color=None, position='ul'):
if color is None:
@ -216,28 +217,128 @@ class DictFrameManager(FrameManager):
del self.frames[name]
class SharedMemoryFrameManager(FrameManager):
def __init__(self):
self.shm_store = {}
"""Initialize a SharedMemoryFrameManager.
Frames are stored as SharedMemory segments, named <name>_<slot>
The first byte of each slot represents whether the segment is "freed".
0 means the segment is free and available for use
Any other value means the segment is in use.
def create(self, name, size) -> AnyStr:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shm_store[name] = shm
return shm.buf
Args:
name: The name of the instance to open.
If not set, a name argument *must* be passed in to each operation.
size: The size of frames created by this frame manager.
If not set, the frame manager will be read only.
"""
def __init__(self, name: Optional[str] = None, size : Optional[int]= None):
# list of opened shms
self.shm_lists = {}
# If name is set, create the dict for the default namespace
self.shm_lists[name] = {}
self.name = name
self.size = size
"""Create a new frame.
def get(self, name, shape):
if name in self.shm_store:
shm = self.shm_store[name]
Returns:
A tuple containing the allocated frame id and the frame itself.
Raises:
UnsupportedOperation: If the size of the frame manager is not set.
"""
def create(self, name = None) -> Tuple[int, AnyStr]:
if name is None:
name = self.name
if name is None:
raise UnsupportedOperation("Cannot create without name")
if self.size is None:
raise UnsupportedOperation("Cannot create new frame without size set")
if name not in self.shm_lists:
self.shm_lists[name] = {}
shm_list = self.shm_lists[name]
# Check if we have any free SHMs in our list
for idx in shm_list:
shm = shm_list[idx]
if shm.buf[0] == 0:
shm.buf[0] = 1
return idx, shm.buf[1:]
# No free SHMs. Try to create a new SHM.
# We'll start after the maximum index, but this may create a sparse list.
# Having a sparse list shouldn't really matter though.
idx = 0 if not shm_list else max(shm_list) + 1
while True:
try:
shm = shared_memory.SharedMemory(name=f"{self.name}_{idx}", create=True, size=self.size+1)
shm.buf[0] = 1
shm_list[idx] = shm
return idx, shm.buf[1:]
except FileExistsError:
# The SHM already exists. Return it if it is free.
shm = shared_memory.SharedMemory(name=f"{self.name}_{idx}")
if (shm.buf[0] == 0):
shm.buf[0] = 1
shm_list[idx] = shm
return idx, shm.buf[1:]
# It's locked, so we move to the next idx.
idx = idx + 1
""" Get an existing frame.
Args:
shape: The shape of the resulting numpy array
index: The index number of the frame to get.
If the index is not set, fetches the shm with <name>.
Returns:
A numpy array with the frame.
"""
def get(self, shape, index : Optional[int] = None, name : Optional[str] = None):
if name is None:
name = self.name
if name is None:
raise UnsupportedOperation("Cannot get without name")
# If the shm isn't already open
if name not in self.shm_lists:
self.shm_lists[name] = {}
shm_list = self.shm_lists[name]
if index is None:
index = -1 # special value for "root" index
if index not in shm_list:
# The shm isn't open yet, so open it and cache it in our list
memory_name = name if index == -1 else f"{name}_{index}"
shm = shared_memory.SharedMemory(name=memory_name)
shm_list[index] = shm
else:
shm = shared_memory.SharedMemory(name=name)
self.shm_store[name] = shm
return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
shm = shm_list[index]
buffer = shm.buf if index is -1 else shm.buf[1:]
return np.ndarray(shape, dtype=np.uint8, buffer=buffer)
def close(self, name):
if name in self.shm_store:
self.shm_store[name].close()
del self.shm_store[name]
""" Close the frame, freeing it for reuse.
Args:
index: The index number of the frame to free.
"""
def close(self, index : int, name : Optional[str] = None):
if name is None:
name = self.name
if name is None:
raise UnsupportedOperation("Cannot close without name")
self.shm_lists[name][index].buf[0] = 0
def delete(self, name):
### will be deprecated
if name in self.shm_store:
self.shm_store[name].close()
self.shm_store[name].unlink()

View File

@ -11,6 +11,7 @@ import numpy as np
import copy
import itertools
import json
import traceback
import base64
from typing import Dict, List
from collections import defaultdict
@ -133,16 +134,15 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
break
current_frame.value = datetime.datetime.now().timestamp()
frame_name = f"{camera_name}{current_frame.value}"
frame_buffer = frame_manager.create(frame_name, frame_size)
#frame_name = f"{camera_name}{current_frame.value}"
idx, frame_buffer = frame_manager.create()
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
except:
print(f"{camera_name}: ffmpeg sent a broken frame. something is wrong.")
if ffmpeg_process.poll() != None:
print(f"{camera_name}: ffmpeg process is not running. exiting capture thread...")
frame_manager.delete(frame_name)
frame_manager.close(idx)
break
continue
@ -152,20 +152,17 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
frame_num += 1
if (frame_num % take_frame) != 0:
skipped_eps.update()
frame_manager.delete(frame_name)
frame_manager.close(idx)
continue
# if the queue is full, skip this frame
if frame_queue.full():
skipped_eps.update()
frame_manager.delete(frame_name)
frame_manager.close(idx)
continue
# close the frame
frame_manager.close(frame_name)
# add to the queue
frame_queue.put(current_frame.value)
frame_queue.put((idx, current_frame.value))
class CameraWatchdog(threading.Thread):
def __init__(self, name, config, frame_queue, camera_fps, ffmpeg_pid, stop_event):
@ -224,7 +221,7 @@ class CameraCapture(threading.Thread):
self.take_frame = take_frame
self.fps = fps
self.skipped_fps = EventsPerSecond()
self.frame_manager = SharedMemoryFrameManager()
self.frame_manager = SharedMemoryFrameManager(name, frame_shape[0] * frame_shape[1] * 3 // 2)
self.ffmpeg_process = ffmpeg_process
self.current_frame = mp.Value('d', 0.0)
self.last_frame = 0
@ -279,7 +276,7 @@ def track_camera(name, config, detection_queue, result_connection, detected_obje
object_tracker = ObjectTracker(10)
frame_manager = SharedMemoryFrameManager()
frame_manager = SharedMemoryFrameManager(name)
process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector,
object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask, stop_event)
@ -335,16 +332,16 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
break
try:
frame_time = frame_queue.get(True, 10)
idx, frame_time = frame_queue.get(True, 10)
except queue.Empty:
continue
current_frame_time.value = frame_time
frame = frame_manager.get(f"{camera_name}{frame_time}", (frame_shape[0]*3//2, frame_shape[1]))
frame = frame_manager.get((frame_shape[0]*3//2, frame_shape[1]), idx)
if frame is None:
print(f"{camera_name}: frame {frame_time} is not in memory store.")
print(f"{camera_name}: frame {frame_time} (index {idx}) is not in memory store.")
continue
# look for motion
@ -419,11 +416,11 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
# add to the queue if not full
if(detected_objects_queue.full()):
frame_manager.delete(f"{camera_name}{frame_time}")
frame_manager.close(idx)
continue
else:
fps_tracker.update()
fps.value = fps_tracker.eps()
detected_objects_queue.put((camera_name, frame_time, object_tracker.tracked_objects))
detected_objects_queue.put((camera_name, idx, frame_time, object_tracker.tracked_objects))
detection_fps.value = object_detector.fps.eps()
frame_manager.close(f"{camera_name}{frame_time}")
frame_manager.close(idx)