Refactor EdgeTPU and CPU model handling to detector submodules.

This commit is contained in:
Nate Meyer 2022-08-07 02:32:21 -04:00
parent b4d4adb75b
commit c359c7c9bd
9 changed files with 189 additions and 90 deletions

View File

@ -3,10 +3,15 @@ from statistics import mean
import multiprocessing as mp
import numpy as np
import datetime
from frigate.edgetpu import LocalObjectDetector, EdgeTPUProcess, RemoteObjectDetector, load_labels
from frigate.object_detection import (
LocalObjectDetector,
ObjectDetectProcess,
RemoteObjectDetector,
load_labels,
)
my_frame = np.expand_dims(np.full((300,300,3), 1, np.uint8), axis=0)
labels = load_labels('/labelmap.txt')
my_frame = np.expand_dims(np.full((300, 300, 3), 1, np.uint8), axis=0)
labels = load_labels("/labelmap.txt")
######
# Minimal same process runner
@ -39,20 +44,23 @@ labels = load_labels('/labelmap.txt')
def start(id, num_detections, detection_queue, event):
object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue, event)
start = datetime.datetime.now().timestamp()
object_detector = RemoteObjectDetector(
str(id), "/labelmap.txt", detection_queue, event
)
start = datetime.datetime.now().timestamp()
frame_times = []
for x in range(0, num_detections):
start_frame = datetime.datetime.now().timestamp()
detections = object_detector.detect(my_frame)
frame_times.append(datetime.datetime.now().timestamp()-start_frame)
frame_times = []
for x in range(0, num_detections):
start_frame = datetime.datetime.now().timestamp()
detections = object_detector.detect(my_frame)
frame_times.append(datetime.datetime.now().timestamp() - start_frame)
duration = datetime.datetime.now().timestamp() - start
object_detector.cleanup()
print(f"{id} - Processed for {duration:.2f} seconds.")
print(f"{id} - FPS: {object_detector.fps.eps():.2f}")
print(f"{id} - Average frame processing time: {mean(frame_times)*1000:.2f}ms")
duration = datetime.datetime.now().timestamp()-start
object_detector.cleanup()
print(f"{id} - Processed for {duration:.2f} seconds.")
print(f"{id} - FPS: {object_detector.fps.eps():.2f}")
print(f"{id} - Average frame processing time: {mean(frame_times)*1000:.2f}ms")
######
# Separate process runner
@ -71,23 +79,25 @@ camera_processes = []
events = {}
for x in range(0, 10):
events[str(x)] = mp.Event()
events[str(x)] = mp.Event()
detection_queue = mp.Queue()
edgetpu_process_1 = EdgeTPUProcess(detection_queue, events, 'usb:0')
edgetpu_process_2 = EdgeTPUProcess(detection_queue, events, 'usb:1')
edgetpu_process_1 = ObjectDetectProcess(detection_queue, events, "usb:0")
edgetpu_process_2 = ObjectDetectProcess(detection_queue, events, "usb:1")
for x in range(0, 10):
camera_process = mp.Process(target=start, args=(x, 300, detection_queue, events[str(x)]))
camera_process.daemon = True
camera_processes.append(camera_process)
camera_process = mp.Process(
target=start, args=(x, 300, detection_queue, events[str(x)])
)
camera_process.daemon = True
camera_processes.append(camera_process)
start_time = datetime.datetime.now().timestamp()
for p in camera_processes:
p.start()
p.start()
for p in camera_processes:
p.join()
p.join()
duration = datetime.datetime.now().timestamp()-start_time
duration = datetime.datetime.now().timestamp() - start_time
print(f"Total - Processed for {duration:.2f} seconds.")

View File

@ -15,7 +15,7 @@ from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config import DetectorTypeEnum, FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.edgetpu import EdgeTPUProcess
from frigate.object_detection import ObjectDetectProcess
from frigate.events import EventCleanup, EventProcessor
from frigate.http import create_app
from frigate.log import log_process, root_configurer
@ -39,7 +39,7 @@ class FrigateApp:
def __init__(self) -> None:
self.stop_event: Event = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, EdgeTPUProcess] = {}
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, Event] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
@ -199,7 +199,7 @@ class FrigateApp:
for name, detector in self.config.detectors.items():
if detector.type == DetectorTypeEnum.cpu:
self.detectors[name] = EdgeTPUProcess(
self.detectors[name] = ObjectDetectProcess(
name,
self.detection_queue,
self.detection_out_events,
@ -209,7 +209,7 @@ class FrigateApp:
detector.num_threads,
)
if detector.type == DetectorTypeEnum.edgetpu:
self.detectors[name] = EdgeTPUProcess(
self.detectors[name] = ObjectDetectProcess(
name,
self.detection_queue,
self.detection_out_events,

0
frigate/detector.py Normal file
View File

View File

@ -0,0 +1,46 @@
import logging
import numpy as np
from frigate.detectors.detection_api import DetectionApi
import tflite_runtime.interpreter as tflite
logger = logging.getLogger(__name__)
class CpuTfl(DetectionApi):
def __init__(self, tf_device=None, model_path=None, num_threads=3):
self.interpreter = tflite.Interpreter(
model_path=model_path or "/cpu_model.tflite", num_threads=3
)
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
count = int(
self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
)
detections = np.zeros((20, 6), np.float32)
for i in range(count):
if scores[i] < 0.4 or i == 20:
break
detections[i] = [
class_ids[i],
float(scores[i]),
boxes[i][0],
boxes[i][1],
boxes[i][2],
boxes[i][3],
]
return detections

View File

@ -0,0 +1,17 @@
import logging
from abc import ABC, abstractmethod
from typing import Dict
logger = logging.getLogger(__name__)
class DetectionApi(ABC):
@abstractmethod
def __init__(self, tf_device=None, model_path=None):
pass
@abstractmethod
def detect_raw(self, tensor_input):
pass

View File

@ -0,0 +1,63 @@
import logging
import numpy as np
from frigate.detectors.detection_api import DetectionApi
import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate
logger = logging.getLogger(__name__)
class EdgeTpuTfl(DetectionApi):
def __init__(self, tf_device=None, model_path=None):
device_config = {"device": "usb"}
if not tf_device is None:
device_config = {"device": tf_device}
edge_tpu_delegate = None
try:
logger.info(f"Attempting to load TPU as {device_config['device']}")
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
logger.info("TPU found")
self.interpreter = tflite.Interpreter(
model_path=model_path or "/edgetpu_model.tflite",
experimental_delegates=[edge_tpu_delegate],
)
except ValueError:
logger.error(
"No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors."
)
raise
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
count = int(
self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
)
detections = np.zeros((20, 6), np.float32)
for i in range(count):
if scores[i] < 0.4 or i == 20:
break
detections[i] = [
class_ids[i],
float(scores[i]),
boxes[i][0],
boxes[i][1],
boxes[i][2],
boxes[i][3],
]
return detections

View File

@ -8,9 +8,10 @@ import threading
from abc import ABC, abstractmethod
import numpy as np
import tflite_runtime.interpreter as tflite
from setproctitle import setproctitle
from tflite_runtime.interpreter import load_delegate
from frigate.config import DetectorTypeEnum
from frigate.detectors.edgetpu_tfl import EdgeTpuTfl
from frigate.detectors.cpu_tfl import CpuTfl
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels
@ -24,46 +25,30 @@ class ObjectDetector(ABC):
class LocalObjectDetector(ObjectDetector):
def __init__(self, tf_device=None, model_path=None, num_threads=3, labels=None):
def __init__(
self,
det_type=DetectorTypeEnum.cpu,
det_device=None,
model_path=None,
num_threads=3,
labels=None,
):
self.fps = EventsPerSecond()
if labels is None:
self.labels = {}
else:
self.labels = load_labels(labels)
device_config = {"device": "usb"}
if not tf_device is None:
device_config = {"device": tf_device}
edge_tpu_delegate = None
if tf_device != "cpu":
try:
logger.info(f"Attempting to load TPU as {device_config['device']}")
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
logger.info("TPU found")
self.interpreter = tflite.Interpreter(
model_path=model_path or "/edgetpu_model.tflite",
experimental_delegates=[edge_tpu_delegate],
)
except ValueError:
logger.error(
"No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors."
)
raise
if det_type == DetectorTypeEnum.edgetpu:
self.detectApi = EdgeTpuTfl(tf_device=det_device, model_path=model_path)
else:
logger.warning(
"CPU detectors are not recommended and should only be used for testing or for trial purposes."
)
self.interpreter = tflite.Interpreter(
model_path=model_path or "/cpu_model.tflite", num_threads=num_threads
self.detectApi = CpuTfl(
tf_device=det_device, model_path=model_path, num_threads=num_threads
)
self.interpreter.allocate_tensors()
self.tensor_input_details = self.interpreter.get_input_details()
self.tensor_output_details = self.interpreter.get_output_details()
def detect(self, tensor_input, threshold=0.4):
detections = []
@ -79,31 +64,7 @@ class LocalObjectDetector(ObjectDetector):
return detections
def detect_raw(self, tensor_input):
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke()
boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
count = int(
self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
)
detections = np.zeros((20, 6), np.float32)
for i in range(count):
if scores[i] < 0.4 or i == 20:
break
detections[i] = [
class_ids[i],
float(scores[i]),
boxes[i][0],
boxes[i][1],
boxes[i][2],
boxes[i][3],
]
return detections
return self.detectApi.detect_raw(tensor_input=tensor_input)
def run_detector(
@ -114,7 +75,7 @@ def run_detector(
start,
model_path,
model_shape,
tf_device,
det_device,
num_threads,
):
threading.current_thread().name = f"detector:{name}"
@ -133,7 +94,7 @@ def run_detector(
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(
tf_device=tf_device, model_path=model_path, num_threads=num_threads
det_device=det_device, model_path=model_path, num_threads=num_threads
)
outputs = {}
@ -165,7 +126,7 @@ def run_detector(
avg_speed.value = (avg_speed.value * 9 + duration) / 10
class EdgeTPUProcess:
class ObjectDetectProcess:
def __init__(
self,
name,

View File

@ -11,11 +11,13 @@ import time
from collections import defaultdict
import numpy as np
from cv2 import cv2, reduce
import cv2
# from cv2 import cv2, reduce
from setproctitle import setproctitle
from frigate.config import CameraConfig, DetectConfig
from frigate.edgetpu import RemoteObjectDetector
from frigate.object_detection import RemoteObjectDetector
from frigate.log import LogPipe
from frigate.motion import MotionDetector
from frigate.objects import ObjectTracker

View File

@ -16,7 +16,7 @@ import cv2
import numpy as np
from frigate.config import FrigateConfig
from frigate.edgetpu import LocalObjectDetector
from frigate.object_detection import LocalObjectDetector
from frigate.motion import MotionDetector
from frigate.object_processing import CameraState
from frigate.objects import ObjectTracker