mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-12 22:25:24 +03:00
used a fixed number of shms for in memory frames
This commit is contained in:
parent
89a478ce0a
commit
6e4c45e3c1
@ -656,11 +656,10 @@ class CameraState:
|
||||
def on(self, event_type: str, callback: Callable[[dict], None]):
|
||||
self.callbacks[event_type].append(callback)
|
||||
|
||||
def update(self, frame_time, current_detections, motion_boxes, regions):
|
||||
def update(self, frame_name, frame_time, current_detections, motion_boxes, regions):
|
||||
# get the new frame
|
||||
frame_id = f"{self.name}{frame_time}"
|
||||
current_frame = self.frame_manager.get(
|
||||
frame_id, self.camera_config.frame_shape_yuv
|
||||
frame_name, self.camera_config.frame_shape_yuv
|
||||
)
|
||||
|
||||
tracked_objects = self.tracked_objects.copy()
|
||||
@ -856,7 +855,7 @@ class CameraState:
|
||||
self._current_frame = current_frame
|
||||
if self.previous_frame_id is not None:
|
||||
self.frame_manager.close(self.previous_frame_id)
|
||||
self.previous_frame_id = frame_id
|
||||
self.previous_frame_id = frame_name
|
||||
|
||||
|
||||
class TrackedObjectProcessor(threading.Thread):
|
||||
@ -1166,6 +1165,7 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
try:
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
@ -1177,7 +1177,7 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
camera_state = self.camera_states[camera]
|
||||
|
||||
camera_state.update(
|
||||
frame_time, current_tracked_objects, motion_boxes, regions
|
||||
frame_name, frame_time, current_tracked_objects, motion_boxes, regions
|
||||
)
|
||||
|
||||
self.update_mqtt_motion(camera, frame_time, motion_boxes)
|
||||
@ -1190,6 +1190,7 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
self.detection_publisher.send_data(
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
tracked_objects,
|
||||
motion_boxes,
|
||||
@ -1281,4 +1282,5 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
self.detection_publisher.stop()
|
||||
self.event_sender.stop()
|
||||
self.event_end_subscriber.stop()
|
||||
self.frame_manager.cleanup()
|
||||
logger.info("Exiting object processor...")
|
||||
|
||||
@ -333,6 +333,7 @@ class BirdsEyeFrameManager:
|
||||
self.cameras[camera] = {
|
||||
"dimensions": [settings.detect.width, settings.detect.height],
|
||||
"last_active_frame": 0.0,
|
||||
"current_frame_name": "",
|
||||
"current_frame": 0.0,
|
||||
"layout_frame": 0.0,
|
||||
"channel_dims": {
|
||||
@ -352,20 +353,18 @@ class BirdsEyeFrameManager:
|
||||
logger.debug("Clearing the birdseye frame")
|
||||
self.frame[:] = self.blank_frame
|
||||
|
||||
def copy_to_position(self, position, camera=None, frame_time=None):
|
||||
def copy_to_position(self, position, camera=None, frame_name=None):
|
||||
if camera is None:
|
||||
frame = None
|
||||
channel_dims = None
|
||||
else:
|
||||
try:
|
||||
frame = self.frame_manager.get(
|
||||
f"{camera}{frame_time}", self.config.cameras[camera].frame_shape_yuv
|
||||
frame_name, self.config.cameras[camera].frame_shape_yuv
|
||||
)
|
||||
except FileNotFoundError:
|
||||
# TODO: better frame management would prevent this edge case
|
||||
logger.warning(
|
||||
f"Unable to copy frame {camera}{frame_time} to birdseye."
|
||||
)
|
||||
logger.warning(f"Unable to copy frame {frame_name} to birdseye.")
|
||||
return
|
||||
channel_dims = self.cameras[camera]["channel_dims"]
|
||||
|
||||
@ -523,7 +522,9 @@ class BirdsEyeFrameManager:
|
||||
for row in self.camera_layout:
|
||||
for position in row:
|
||||
self.copy_to_position(
|
||||
position[1], position[0], self.cameras[position[0]]["current_frame"]
|
||||
position[1],
|
||||
position[0],
|
||||
self.cameras[position[0]]["current_frame_name"],
|
||||
)
|
||||
|
||||
return True
|
||||
@ -671,7 +672,9 @@ class BirdsEyeFrameManager:
|
||||
else:
|
||||
return standard_candidate_layout
|
||||
|
||||
def update(self, camera, object_count, motion_count, frame_time, frame) -> bool:
|
||||
def update(
|
||||
self, camera, object_count, motion_count, frame_name, frame_time
|
||||
) -> bool:
|
||||
# don't process if birdseye is disabled for this camera
|
||||
camera_config = self.config.cameras[camera].birdseye
|
||||
|
||||
@ -688,6 +691,7 @@ class BirdsEyeFrameManager:
|
||||
return False
|
||||
|
||||
# update the last active frame for the camera
|
||||
self.cameras[camera]["current_frame_name"] = frame_name
|
||||
self.cameras[camera]["current_frame"] = frame_time
|
||||
if self.camera_active(camera_config.mode, object_count, motion_count):
|
||||
self.cameras[camera]["last_active_frame"] = frame_time
|
||||
@ -754,8 +758,8 @@ class Birdseye:
|
||||
camera: str,
|
||||
current_tracked_objects: list[dict[str, any]],
|
||||
motion_boxes: list[list[int]],
|
||||
frame_name: str,
|
||||
frame_time: float,
|
||||
frame,
|
||||
) -> None:
|
||||
# check if there is an updated config
|
||||
while True:
|
||||
@ -774,8 +778,8 @@ class Birdseye:
|
||||
camera,
|
||||
len([o for o in current_tracked_objects if not o["stationary"]]),
|
||||
len(motion_boxes),
|
||||
frame_name,
|
||||
frame_time,
|
||||
frame,
|
||||
):
|
||||
frame_bytes = self.birdseye_manager.frame.tobytes()
|
||||
|
||||
|
||||
@ -45,7 +45,6 @@ def output_frames(
|
||||
signal.signal(signal.SIGINT, receiveSignal)
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
previous_frames = {}
|
||||
|
||||
# start a websocket server on 8082
|
||||
WebSocketWSGIHandler.http_version = "1.1"
|
||||
@ -87,15 +86,14 @@ def output_frames(
|
||||
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
regions,
|
||||
) = data
|
||||
|
||||
frame_id = f"{camera}{frame_time}"
|
||||
|
||||
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
|
||||
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
|
||||
|
||||
# send camera frame to ffmpeg process if websockets are connected
|
||||
if any(
|
||||
@ -116,8 +114,8 @@ def output_frames(
|
||||
camera,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
frame_name,
|
||||
frame_time,
|
||||
frame,
|
||||
)
|
||||
|
||||
# send frames for low fps recording
|
||||
@ -125,12 +123,6 @@ def output_frames(
|
||||
current_tracked_objects, motion_boxes, frame_time, frame
|
||||
)
|
||||
|
||||
# delete frames after they have been used for output
|
||||
if camera in previous_frames:
|
||||
frame_manager.delete(f"{camera}{previous_frames[camera]}")
|
||||
|
||||
previous_frames[camera] = frame_time
|
||||
|
||||
move_preview_frames("clips")
|
||||
|
||||
while True:
|
||||
@ -141,15 +133,15 @@ def output_frames(
|
||||
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
regions,
|
||||
) = data
|
||||
|
||||
frame_id = f"{camera}{frame_time}"
|
||||
frame = frame_manager.get(frame_id, config.cameras[camera].frame_shape_yuv)
|
||||
frame_manager.delete(frame_id)
|
||||
frame = frame_manager.get(frame_name, config.cameras[camera].frame_shape_yuv)
|
||||
frame_manager.close(frame_name)
|
||||
|
||||
detection_subscriber.stop()
|
||||
|
||||
|
||||
@ -63,7 +63,7 @@ class PtzMotionEstimator:
|
||||
self.ptz_metrics["ptz_reset"].set()
|
||||
logger.debug(f"{config.name}: Motion estimator init")
|
||||
|
||||
def motion_estimator(self, detections, frame_time, camera):
|
||||
def motion_estimator(self, detections, frame_name, frame_time, camera):
|
||||
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
|
||||
if self.ptz_metrics["ptz_reset"].is_set():
|
||||
self.ptz_metrics["ptz_reset"].clear()
|
||||
@ -94,9 +94,8 @@ class PtzMotionEstimator:
|
||||
f"{camera}: Motion estimator running - frame time: {frame_time}"
|
||||
)
|
||||
|
||||
frame_id = f"{camera}{frame_time}"
|
||||
yuv_frame = self.frame_manager.get(
|
||||
frame_id, self.camera_config.frame_shape_yuv
|
||||
frame_name, self.camera_config.frame_shape_yuv
|
||||
)
|
||||
|
||||
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
|
||||
@ -134,7 +133,7 @@ class PtzMotionEstimator:
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.frame_manager.close(frame_id)
|
||||
self.frame_manager.close(frame_name)
|
||||
|
||||
return self.coord_transformations
|
||||
|
||||
|
||||
@ -233,6 +233,7 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
def update_existing_segment(
|
||||
self,
|
||||
segment: PendingReviewSegment,
|
||||
frame_name: str,
|
||||
frame_time: float,
|
||||
objects: list[TrackedObject],
|
||||
) -> None:
|
||||
@ -283,25 +284,23 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
|
||||
if should_update:
|
||||
try:
|
||||
frame_id = f"{camera_config.name}{frame_time}"
|
||||
yuv_frame = self.frame_manager.get(
|
||||
frame_id, camera_config.frame_shape_yuv
|
||||
frame_name, camera_config.frame_shape_yuv
|
||||
)
|
||||
self.update_segment(
|
||||
segment, camera_config, yuv_frame, active_objects, prev_data
|
||||
)
|
||||
self.frame_manager.close(frame_id)
|
||||
self.frame_manager.close(frame_name)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
else:
|
||||
if not segment.has_frame:
|
||||
try:
|
||||
frame_id = f"{camera_config.name}{frame_time}"
|
||||
yuv_frame = self.frame_manager.get(
|
||||
frame_id, camera_config.frame_shape_yuv
|
||||
frame_name, camera_config.frame_shape_yuv
|
||||
)
|
||||
segment.save_full_frame(camera_config, yuv_frame)
|
||||
self.frame_manager.close(frame_id)
|
||||
self.frame_manager.close(frame_name)
|
||||
self.update_segment(segment, camera_config, None, [], prev_data)
|
||||
except FileNotFoundError:
|
||||
return
|
||||
@ -316,6 +315,7 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
def check_if_new_segment(
|
||||
self,
|
||||
camera: str,
|
||||
frame_name: str,
|
||||
frame_time: float,
|
||||
objects: list[TrackedObject],
|
||||
) -> None:
|
||||
@ -390,14 +390,13 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
)
|
||||
|
||||
try:
|
||||
frame_id = f"{camera_config.name}{frame_time}"
|
||||
yuv_frame = self.frame_manager.get(
|
||||
frame_id, camera_config.frame_shape_yuv
|
||||
frame_name, camera_config.frame_shape_yuv
|
||||
)
|
||||
self.active_review_segments[camera].update_frame(
|
||||
camera_config, yuv_frame, active_objects
|
||||
)
|
||||
self.frame_manager.close(frame_id)
|
||||
self.frame_manager.close(frame_name)
|
||||
self.new_segment(self.active_review_segments[camera])
|
||||
except FileNotFoundError:
|
||||
return
|
||||
@ -425,6 +424,7 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
if topic == DetectionTypeEnum.video:
|
||||
(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
motion_boxes,
|
||||
@ -451,7 +451,9 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
|
||||
if not self.config.cameras[camera].record.enabled:
|
||||
if current_segment:
|
||||
self.update_existing_segment(current_segment, frame_time, [])
|
||||
self.update_existing_segment(
|
||||
current_segment, frame_name, frame_time, []
|
||||
)
|
||||
|
||||
continue
|
||||
|
||||
@ -459,6 +461,7 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
if topic == DetectionTypeEnum.video:
|
||||
self.update_existing_segment(
|
||||
current_segment,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
)
|
||||
@ -502,6 +505,7 @@ class ReviewSegmentMaintainer(threading.Thread):
|
||||
if topic == DetectionTypeEnum.video:
|
||||
self.check_if_new_segment(
|
||||
camera,
|
||||
frame_name,
|
||||
frame_time,
|
||||
current_tracked_objects,
|
||||
)
|
||||
|
||||
@ -9,5 +9,5 @@ class ObjectTracker(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def match_and_update(self, frame_time: float, detections) -> None:
|
||||
def match_and_update(self, frame_name: str, frame_time: float, detections) -> None:
|
||||
pass
|
||||
|
||||
@ -129,7 +129,7 @@ class CentroidTracker(ObjectTracker):
|
||||
|
||||
self.tracked_objects[id].update(new_obj)
|
||||
|
||||
def update_frame_times(self, frame_time):
|
||||
def update_frame_times(self, frame_name, frame_time):
|
||||
for id in list(self.tracked_objects.keys()):
|
||||
self.tracked_objects[id]["frame_time"] = frame_time
|
||||
self.tracked_objects[id]["motionless_count"] += 1
|
||||
|
||||
@ -269,7 +269,7 @@ class NorfairTracker(ObjectTracker):
|
||||
|
||||
self.tracked_objects[id].update(obj)
|
||||
|
||||
def update_frame_times(self, frame_time):
|
||||
def update_frame_times(self, frame_name, frame_time):
|
||||
# if the object was there in the last frame, assume it's still there
|
||||
detections = [
|
||||
(
|
||||
@ -283,9 +283,9 @@ class NorfairTracker(ObjectTracker):
|
||||
for id, obj in self.tracked_objects.items()
|
||||
if self.disappeared[id] == 0
|
||||
]
|
||||
self.match_and_update(frame_time, detections=detections)
|
||||
self.match_and_update(frame_name, frame_time, detections=detections)
|
||||
|
||||
def match_and_update(self, frame_time, detections):
|
||||
def match_and_update(self, frame_name, frame_time, detections):
|
||||
norfair_detections = []
|
||||
|
||||
for obj in detections:
|
||||
@ -323,7 +323,7 @@ class NorfairTracker(ObjectTracker):
|
||||
)
|
||||
|
||||
coord_transformations = self.ptz_motion_estimator.motion_estimator(
|
||||
detections, frame_time, self.camera_name
|
||||
detections, frame_name, frame_time, self.camera_name
|
||||
)
|
||||
|
||||
tracked_objects = self.tracker.update(
|
||||
|
||||
@ -664,6 +664,10 @@ class FrameManager(ABC):
|
||||
def delete(self, name):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
|
||||
class DictFrameManager(FrameManager):
|
||||
def __init__(self):
|
||||
@ -684,6 +688,9 @@ class DictFrameManager(FrameManager):
|
||||
def delete(self, name):
|
||||
del self.frames[name]
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
|
||||
class SharedMemoryFrameManager(FrameManager):
|
||||
def __init__(self):
|
||||
@ -713,6 +720,13 @@ class SharedMemoryFrameManager(FrameManager):
|
||||
self.shm_store[name].unlink()
|
||||
del self.shm_store[name]
|
||||
|
||||
def cleanup(self):
|
||||
frames = list(self.shm_store.keys())
|
||||
for name in frames:
|
||||
self.shm_store[name].close()
|
||||
self.shm_store[name].unlink()
|
||||
del self.shm_store[name]
|
||||
|
||||
|
||||
def create_mask(frame_shape, mask):
|
||||
mask_img = np.zeros(frame_shape, np.uint8)
|
||||
|
||||
@ -108,13 +108,22 @@ def capture_frames(
|
||||
frame_rate.start()
|
||||
skipped_eps = EventsPerSecond()
|
||||
skipped_eps.start()
|
||||
|
||||
# create 10 shms for storing frames
|
||||
frame_shms = []
|
||||
# TODO: i might have to recreate these in the loop so i know if i am overwriting one that hasnt been deleted yet
|
||||
for frame in range(0, 9):
|
||||
name = f"{camera_name}{frame:>02}"
|
||||
frame_shms.append({"name": name, "shm": frame_manager.create(name, frame_size)})
|
||||
frame_index = 0
|
||||
|
||||
while True:
|
||||
fps.value = frame_rate.eps()
|
||||
skipped_fps.value = skipped_eps.eps()
|
||||
|
||||
current_frame.value = datetime.datetime.now().timestamp()
|
||||
frame_name = f"{camera_name}{current_frame.value}"
|
||||
frame_buffer = frame_manager.create(frame_name, frame_size)
|
||||
frame_name = frame_shms[frame_index]["name"]
|
||||
frame_buffer = frame_shms[frame_index]["shm"]
|
||||
try:
|
||||
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
|
||||
except Exception:
|
||||
@ -127,7 +136,6 @@ def capture_frames(
|
||||
logger.error(
|
||||
f"{camera_name}: ffmpeg process is not running. exiting capture thread..."
|
||||
)
|
||||
frame_manager.delete(frame_name)
|
||||
break
|
||||
continue
|
||||
|
||||
@ -136,13 +144,15 @@ def capture_frames(
|
||||
# don't lock the queue to check, just try since it should rarely be full
|
||||
try:
|
||||
# add to the queue
|
||||
frame_queue.put(current_frame.value, False)
|
||||
# close the frame
|
||||
frame_manager.close(frame_name)
|
||||
frame_queue.put((frame_name, current_frame.value), False)
|
||||
except queue.Full:
|
||||
# if the queue is full, skip this frame
|
||||
skipped_eps.update()
|
||||
frame_manager.delete(frame_name)
|
||||
|
||||
# go to the next frame shm
|
||||
frame_index = 0 if frame_index == 9 else frame_index + 1
|
||||
|
||||
frame_manager.cleanup()
|
||||
|
||||
|
||||
class CameraWatchdog(threading.Thread):
|
||||
@ -450,8 +460,10 @@ def track_camera(
|
||||
# empty the frame queue
|
||||
logger.info(f"{name}: emptying frame queue")
|
||||
while not frame_queue.empty():
|
||||
frame_time = frame_queue.get(False)
|
||||
frame_manager.delete(f"{name}{frame_time}")
|
||||
frame_name, _ = frame_queue.get(False)
|
||||
frame_manager.delete(frame_name)
|
||||
|
||||
frame_manager.cleanup()
|
||||
|
||||
logger.info(f"{name}: exiting subprocess")
|
||||
|
||||
@ -550,9 +562,9 @@ def process_frames(
|
||||
|
||||
try:
|
||||
if exit_on_empty:
|
||||
frame_time = frame_queue.get(False)
|
||||
frame_name, frame_time = frame_queue.get(False)
|
||||
else:
|
||||
frame_time = frame_queue.get(True, 1)
|
||||
frame_name, frame_time = frame_queue.get(True, 1)
|
||||
except queue.Empty:
|
||||
if exit_on_empty:
|
||||
logger.info("Exiting track_objects...")
|
||||
@ -562,9 +574,7 @@ def process_frames(
|
||||
current_frame_time.value = frame_time
|
||||
ptz_metrics["ptz_frame_time"].value = frame_time
|
||||
|
||||
frame = frame_manager.get(
|
||||
f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1])
|
||||
)
|
||||
frame = frame_manager.get(frame_name, (frame_shape[0] * 3 // 2, frame_shape[1]))
|
||||
|
||||
if frame is None:
|
||||
logger.info(f"{camera_name}: frame {frame_time} is not in memory store.")
|
||||
@ -705,7 +715,7 @@ def process_frames(
|
||||
object_tracker.match_and_update(frame_time, tracked_detections)
|
||||
# else, just update the frame times for the stationary objects
|
||||
else:
|
||||
object_tracker.update_frame_times(frame_time)
|
||||
object_tracker.update_frame_times(frame_name, frame_time)
|
||||
|
||||
# group the attribute detections based on what label they apply to
|
||||
attribute_detections = {}
|
||||
@ -805,7 +815,6 @@ def process_frames(
|
||||
)
|
||||
# add to the queue if not full
|
||||
if detected_objects_queue.full():
|
||||
frame_manager.delete(f"{camera_name}{frame_time}")
|
||||
continue
|
||||
else:
|
||||
fps_tracker.update()
|
||||
@ -813,6 +822,7 @@ def process_frames(
|
||||
detected_objects_queue.put(
|
||||
(
|
||||
camera_name,
|
||||
frame_name,
|
||||
frame_time,
|
||||
detections,
|
||||
motion_boxes,
|
||||
@ -820,7 +830,7 @@ def process_frames(
|
||||
)
|
||||
)
|
||||
detection_fps.value = object_detector.fps.eps()
|
||||
frame_manager.close(f"{camera_name}{frame_time}")
|
||||
frame_manager.close(frame_name)
|
||||
|
||||
motion_detector.stop()
|
||||
requestor.stop()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user