refactor edgetpu to accept more detector types

This commit is contained in:
YS 2021-12-27 11:12:26 +03:00
parent 86af2a5615
commit 47f3d7c460
7 changed files with 320 additions and 25 deletions

View File

@ -16,7 +16,7 @@ from pydantic import ValidationError
from frigate.config import DetectorTypeEnum, FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.edgetpu import EdgeTPUProcess
from frigate.detection import DetectionProcess
from frigate.events import EventCleanup, EventProcessor
from frigate.http import create_app
from frigate.log import log_process, root_configurer
@ -39,7 +39,7 @@ class FrigateApp:
self.base_config: FrigateConfig = None
self.config: FrigateConfig = None
self.detection_queue = mp.Queue()
self.detectors: Dict[str, EdgeTPUProcess] = {}
self.detectors: Dict[str, DetectionProcess] = {}
self.detection_out_events: Dict[str, mp.Event] = {}
self.detection_shms: List[mp.shared_memory.SharedMemory] = []
self.log_queue = mp.Queue()
@ -181,27 +181,15 @@ class FrigateApp:
self.detection_shms.append(shm_in)
self.detection_shms.append(shm_out)
for name, detector in self.config.detectors.items():
if detector.type == DetectorTypeEnum.cpu:
self.detectors[name] = EdgeTPUProcess(
name,
self.detection_queue,
self.detection_out_events,
model_path,
model_shape,
"cpu",
detector.num_threads,
)
if detector.type == DetectorTypeEnum.edgetpu:
self.detectors[name] = EdgeTPUProcess(
name,
self.detection_queue,
self.detection_out_events,
model_path,
model_shape,
detector.device,
detector.num_threads,
)
for name, detector_config in self.config.detectors.items():
self.detectors[name] = DetectionProcess(
name,
self.detection_queue,
self.detection_out_events,
model_path,
model_shape,
detector_config,
)
def start_detected_frames_processor(self):
self.detected_frames_processor = TrackedObjectProcessor(

View File

@ -866,7 +866,7 @@ class FrigateConfig(FrigateBaseModel):
)
@property
def runtime_config(self):
def runtime_config(self) -> FrigateConfig:
"""Merge camera config with globals."""
config = self.copy(deep=True)

View File

@ -0,0 +1,114 @@
import logging
import numpy as np
import multiprocessing as mp
from frigate.util import EventsPerSecond
from frigate.config import DetectorConfig, DetectorTypeEnum
logger = logging.getLogger(__name__)
class DetectionProcess:
def __init__(
self,
name,
detection_queue,
out_events,
model_path,
model_shape,
detector_config: DetectorConfig,
):
self.name = name
self.out_events = out_events
self.detection_queue = detection_queue
self.avg_inference_speed = mp.Value("d", 0.01)
self.detection_start = mp.Value("d", 0.0)
self.detect_process = None
self.model_path = model_path
self.model_shape = model_shape
self.detector_config = detector_config
self.detector_target = None
if (
detector_config.type == DetectorTypeEnum.cpu
or detector_config.type == DetectorTypeEnum.edgetpu
):
from .edgetpu import run_detector as edgetpu_detector
self.detector_target = edgetpu_detector
assert self.detector_target, "Invalid detector configuration"
self.start_or_restart()
def stop(self):
self.detect_process.terminate()
logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
logging.info("Detection process didnt exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()
def start_or_restart(self):
self.detection_start.value = 0.0
if (not self.detect_process is None) and self.detect_process.is_alive():
self.stop()
self.detect_process = mp.Process(
target=self.detector_target,
name=f"detector:{self.name}",
args=(
self.name,
self.detection_queue,
self.out_events,
self.avg_inference_speed,
self.detection_start,
self.model_path,
self.model_shape,
self.detector_config,
),
)
self.detect_process.daemon = True
self.detect_process.start()
class RemoteObjectDetector:
def __init__(self, name, labels, detection_queue, event, model_shape):
self.labels = labels
self.name = name
self.fps = EventsPerSecond()
self.detection_queue = detection_queue
self.event = event
self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray(
(1, model_shape[0], model_shape[1], 3), dtype=np.uint8, buffer=self.shm.buf
)
self.out_shm = mp.shared_memory.SharedMemory(
name=f"out-{self.name}", create=False
)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
def detect(self, tensor_input, threshold=0.4):
detections = []
# copy input to shared memory
self.np_shm[:] = tensor_input[:]
self.event.clear()
self.detection_queue.put(self.name)
result = self.event.wait(timeout=10.0)
# if it timed out
if result is None:
return detections
for d in self.out_np_shm:
if d[1] < threshold:
break
detections.append(
(self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
)
self.fps.update()
return detections
def cleanup(self):
self.shm.unlink()
self.out_shm.unlink()

View File

@ -0,0 +1,167 @@
import datetime
import logging
import multiprocessing as mp
import os
import queue
import signal
import threading
from frigate.config import DetectorConfig
from typing import Dict
import numpy as np
# import tflite_runtime.interpreter as tflite
from setproctitle import setproctitle
# from tflite_runtime.interpreter import load_delegate
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen
from .object_detector import ObjectDetector
logger = logging.getLogger(__name__)
class LocalObjectDetector(ObjectDetector):
def __init__(self, tf_device=None, model_path=None, num_threads=3):
self.fps = EventsPerSecond()
# TODO: process_clip
# if labels is None:
# self.labels = {}
# else:
# self.labels = load_labels(labels)
device_config = {"device": "usb"}
if not tf_device is None:
device_config = {"device": tf_device}
edge_tpu_delegate = None
# if tf_device != "cpu":
# try:
# logger.info(f"Attempting to load TPU as {device_config['device']}")
# edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
# logger.info("TPU found")
# self.interpreter = tflite.Interpreter(
# model_path=model_path or "/edgetpu_model.tflite",
# experimental_delegates=[edge_tpu_delegate],
# )
# except ValueError:
# logger.error(
# "No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors."
# )
# raise
# else:
# logger.warning(
# "CPU detectors are not recommended and should only be used for testing or for trial purposes."
# )
# self.interpreter = tflite.Interpreter(
# model_path=model_path or "/cpu_model.tflite", num_threads=num_threads
# )
# self.interpreter.allocate_tensors()
# self.tensor_input_details = self.interpreter.get_input_details()
# self.tensor_output_details = self.interpreter.get_output_details()
def detect(self, tensor_input, threshold=0.4):
# TODO: called from process_clip
detections = []
assert False, "implement detect() for process_clip.py"
# raw_detections = self.detect_raw(tensor_input)
# for d in raw_detections:
# if d[1] < threshold:
# break
# detections.append(
# (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
# )
# self.fps.update()
return detections
def detect_raw(self, tensor_input):
# self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
# self.interpreter.invoke()
# boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
# class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
# scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
# count = int(
# self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
# )
detections = np.zeros((20, 6), np.float32)
# for i in range(count):
# if scores[i] < 0.4 or i == 20:
# break
# detections[i] = [
# class_ids[i],
# float(scores[i]),
# boxes[i][0],
# boxes[i][1],
# boxes[i][2],
# boxes[i][3],
# ]
return detections
def run_detector(
name: str,
detection_queue: mp.Queue,
out_events: Dict[str, mp.Event],
avg_speed,
start,
model_path,
model_shape,
detector_config: DetectorConfig,
):
threading.current_thread().name = f"detector:{name}"
logger = logging.getLogger(f"detector.{name}")
logger.info(f"Starting detection process: {os.getpid()}")
setproctitle(f"frigate.detector.{name}")
listen()
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(
tf_device=detector_config.device,
model_path=model_path,
num_threads=detector_config.num_threads,
)
outputs = {}
for name in out_events.keys():
out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np}
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=5)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id, (1, model_shape[0], model_shape[1], 3)
)
if input_frame is None:
continue
# detect and send the output
start.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - start.value
outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set()
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10

View File

@ -0,0 +1,7 @@
from abc import ABC, abstractmethod
class ObjectDetector(ABC):
@abstractmethod
def detect(self, tensor_input, threshold=0.4):
pass

View File

@ -620,6 +620,25 @@ def load_labels(path, encoding="utf-8"):
else:
return {index: line.strip() for index, line in enumerate(lines)}
def load_labels(path, encoding="utf-8"):
"""Loads labels from file (with or without index numbers).
Args:
path: path to label file.
encoding: label file encoding.
Returns:
Dictionary mapping indices to labels.
"""
with open(path, "r", encoding=encoding) as f:
lines = f.readlines()
if not lines:
return {}
if lines[0].split(" ", maxsplit=1)[0].isdigit():
pairs = [line.split(" ", maxsplit=1) for line in lines]
return {int(index): label.strip() for index, label in pairs}
else:
return {index: line.strip() for index, line in enumerate(lines)}
class FrameManager(ABC):
@abstractmethod
def create(self, name, size) -> AnyStr:

View File

@ -15,7 +15,7 @@ from cv2 import cv2, reduce
from setproctitle import setproctitle
from frigate.config import CameraConfig, DetectConfig
from frigate.edgetpu import RemoteObjectDetector
from frigate.detection import RemoteObjectDetector
from frigate.log import LogPipe
from frigate.motion import MotionDetector
from frigate.objects import ObjectTracker