diff --git a/frigate/app.py b/frigate/app.py index f433fd50f4..6b0f7baab0 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -4,6 +4,7 @@ import multiprocessing as mp import os import secrets import shutil +import threading from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent from typing import Optional @@ -101,7 +102,9 @@ class FrigateApp: self.processes: dict[str, int] = {} self.embeddings: Optional[EmbeddingsContext] = None self.region_grids: dict[str, list[list[dict[str, int]]]] = {} - self.frame_manager = SharedMemoryFrameManager() + self.frame_manager = SharedMemoryFrameManager( + frame_shape=(config.model.height, config.model.width, 3) + ) self.config = config def ensure_dirs(self) -> None: @@ -359,9 +362,12 @@ class FrigateApp: try: largest_frame = max( [ - det.model.height * det.model.width * 3 - if det.model is not None - else 320 + ( + det.model.height * det.model.width * 3 * det.model.max_batch + + 8 + if det.model is not None + else 320 + ) for det in self.config.detectors.values() ] ) @@ -375,7 +381,9 @@ class FrigateApp: try: shm_out = UntrackedSharedMemory( - name=f"out-{name}", create=True, size=20 * 6 * 4 + name=f"out-{name}", + create=True, + size=20 * 6 * 4 * self.config.model.max_batch + 8, ) except FileExistsError: shm_out = UntrackedSharedMemory(name=f"out-{name}") diff --git a/frigate/detectors/detector_config.py b/frigate/detectors/detector_config.py index ce77384938..74e174f63e 100644 --- a/frigate/detectors/detector_config.py +++ b/frigate/detectors/detector_config.py @@ -47,6 +47,7 @@ class ModelConfig(BaseModel): labelmap_path: Optional[str] = Field( None, title="Label map for custom object detector." ) + max_batch: int = Field(default=4, title="Max batch size.") width: int = Field(default=320, title="Object detection model input width.") height: int = Field(default=320, title="Object detection model input height.") labelmap: Dict[int, str] = Field( diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 8e88ae578a..073f949d15 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -18,7 +18,7 @@ from frigate.detectors.detector_config import ( InputTensorEnum, ) from frigate.util.builtin import EventsPerSecond, load_labels -from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory +from frigate.util.image import SharedMemoryFrameManager, SharedMemoryResultManager from frigate.util.services import listen logger = logging.getLogger(__name__) @@ -111,23 +111,19 @@ def run_detector( signal.signal(signal.SIGINT, receiveSignal) frame_manager = SharedMemoryFrameManager() + result_manager = SharedMemoryResultManager() object_detector = LocalObjectDetector(detector_config=detector_config) - outputs = {} for name in out_events.keys(): - out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) - out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) - outputs[name] = {"shm": out_shm, "np": out_np} + result_manager.create(name=f"out-{name}") while not stop_event.is_set(): try: connection_id = detection_queue.get(timeout=1) except queue.Empty: continue - input_frame = frame_manager.get( - connection_id, - (1, detector_config.model.height, detector_config.model.width, 3), - ) + + input_frame = frame_manager.get_frame(connection_id) if input_frame is None: logger.warning(f"Failed to get frame {connection_id} from SHM") @@ -138,7 +134,8 @@ def run_detector( detections = object_detector.detect_raw(input_frame) duration = datetime.datetime.now().timestamp() - start.value frame_manager.close(connection_id) - outputs[connection_id]["np"][:] = detections[:] + + result_manager.write_result(f"out-{connection_id}", detections) out_events[connection_id].set() start.value = 0.0 @@ -198,21 +195,27 @@ class ObjectDetectProcess: class RemoteObjectDetector: - def __init__(self, name, labels, detection_queue, event, model_config, stop_event): + def __init__( + self, + name, + labels, + detection_queue, + event, + model_config, + stop_event, + frame_manager: SharedMemoryFrameManager, + ): self.labels = labels self.name = name self.fps = EventsPerSecond() self.detection_queue = detection_queue self.event = event self.stop_event = stop_event - self.shm = UntrackedSharedMemory(name=self.name, create=False) - self.np_shm = np.ndarray( - (1, model_config.height, model_config.width, 3), - dtype=np.uint8, - buffer=self.shm.buf, + self.frame_manager = frame_manager + + self.result_manager = SharedMemoryResultManager( + max_frame=model_config.max_batch ) - self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) - self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) def detect(self, tensor_input, threshold=0.4): detections = [] @@ -221,7 +224,8 @@ class RemoteObjectDetector: return detections # copy input to shared memory - self.np_shm[:] = tensor_input[:] + self.frame_manager.write_frame(self.name, tensor_input) + self.event.clear() self.detection_queue.put(self.name) result = self.event.wait(timeout=5.0) @@ -230,15 +234,23 @@ class RemoteObjectDetector: if result is None: return detections - for d in self.out_np_shm: - if d[1] < threshold: - break - detections.append( - (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) - ) + batch_result_np = self.result_manager.get_result(f"out-{self.name}") + + if not isinstance(batch_result_np, np.ndarray): + return detections + + for result_np in batch_result_np: + tmp_detections = [] + for d in result_np: + if d[1] < threshold: + break + tmp_detections.append( + (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) + ) + detections.append(tmp_detections) self.fps.update() return detections def cleanup(self): - self.shm.unlink() - self.out_shm.unlink() + self.result_manager.cleanup() + self.frame_manager.cleanup() diff --git a/frigate/video.py b/frigate/video.py index b14f8567cf..665ae83772 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -9,6 +9,7 @@ import threading import time import cv2 +import numpy as np from setproctitle import setproctitle from frigate.camera import CameraMetrics, PTZMetrics @@ -503,14 +504,20 @@ def track_camera( name=config.name, ptz_metrics=ptz_metrics, ) + + frame_manager = SharedMemoryFrameManager(max_frame=model_config.max_batch) object_detector = RemoteObjectDetector( - name, labelmap, detection_queue, result_connection, model_config, stop_event + name, + labelmap, + detection_queue, + result_connection, + model_config, + stop_event, + frame_manager, ) object_tracker = NorfairTracker(config, ptz_metrics) - frame_manager = SharedMemoryFrameManager() - # create communication for region grid updates requestor = InterProcessRequestor() @@ -549,35 +556,53 @@ def detect( object_detector, frame, model_config: ModelConfig, - region, + regions, objects_to_track, object_filters, + multi_batch: bool = False, ): - tensor_input = create_tensor_input(frame, model_config, region) + if multi_batch: + tensor_list = [] + for i, region in enumerate(regions): + if i > model_config.max_batch: + logger.info(f"batch is too large, skipping") + break + tensor_list.append(create_tensor_input(frame, model_config, region)) + tensor_input = np.concatenate(tensor_list, axis=0) + else: + tensor_input = create_tensor_input(frame, model_config, regions) + region_detections_list = object_detector.detect(tensor_input) detections = [] - region_detections = object_detector.detect(tensor_input) - for d in region_detections: - box = d[2] - size = region[2] - region[0] - x_min = int(max(0, (box[1] * size) + region[0])) - y_min = int(max(0, (box[0] * size) + region[1])) - x_max = int(min(detect_config.width - 1, (box[3] * size) + region[0])) - y_max = int(min(detect_config.height - 1, (box[2] * size) + region[1])) + if not multi_batch: + region_detections_list = [region_detections_list] + regions = [regions] - # ignore objects that were detected outside the frame - if (x_min >= detect_config.width - 1) or (y_min >= detect_config.height - 1): - continue + for region_detections, region in zip(region_detections_list, regions): + for d in region_detections: + box = d[2] + size = region[2] - region[0] + x_min = int(max(0, (box[1] * size) + region[0])) + y_min = int(max(0, (box[0] * size) + region[1])) + x_max = int(min(detect_config.width - 1, (box[3] * size) + region[0])) + y_max = int(min(detect_config.height - 1, (box[2] * size) + region[1])) + + # ignore objects that were detected outside the frame + if (x_min >= detect_config.width - 1) or ( + y_min >= detect_config.height - 1 + ): + continue + + width = x_max - x_min + height = y_max - y_min + area = width * height + ratio = width / max(1, height) + det = (d[0], d[1], (x_min, y_min, x_max, y_max), area, ratio, region) + # apply object filters + if is_object_filtered(det, objects_to_track, object_filters): + continue + detections.append(det) - width = x_max - x_min - height = y_max - y_min - area = width * height - ratio = width / max(1, height) - det = (d[0], d[1], (x_min, y_min, x_max, y_max), area, ratio, region) - # apply object filters - if is_object_filtered(det, objects_to_track, object_filters): - continue - detections.append(det) return detections @@ -618,6 +643,8 @@ def process_frames( attributes_map = model_config.attributes_map all_attributes = model_config.all_attributes + multi_batch = model_config.max_batch > 1 + # remove license_plate from attributes if this camera is a dedicated LPR cam if camera_config.type == CameraTypeEnum.lpr: modified_attributes_map = model_config.attributes_map.copy() @@ -813,18 +840,32 @@ def process_frames( if obj["id"] in stationary_object_ids ] - for region in regions: + if multi_batch and len(regions) > 0: detections.extend( detect( detect_config, object_detector, frame, model_config, - region, + regions, objects_to_track, object_filters, + multi_batch, ) ) + else: + for region in regions: + detections.extend( + detect( + detect_config, + object_detector, + frame, + model_config, + region, + objects_to_track, + object_filters, + ) + ) consolidated_detections = reduce_detections(frame_shape, detections)