From 2d585e84581e867e043e5429163edd4c13cd13b6 Mon Sep 17 00:00:00 2001 From: YS Date: Mon, 27 Dec 2021 11:12:26 +0300 Subject: [PATCH] refactor edgetpu to accept more detector types --- frigate/app.py | 34 +-- frigate/config.py | 5 +- frigate/detection/__init__.py | 114 +++++++ frigate/detection/edgetpu.py | 167 ++++++++++ frigate/detection/object_detector.py | 7 + frigate/edgetpu.py | 284 ------------------ frigate/object_processing.py | 1 - .../{process_clip.py => process_clip_del.py} | 1 + frigate/util.py | 19 ++ frigate/video.py | 2 +- 10 files changed, 322 insertions(+), 312 deletions(-) create mode 100644 frigate/detection/__init__.py create mode 100644 frigate/detection/edgetpu.py create mode 100644 frigate/detection/object_detector.py delete mode 100644 frigate/edgetpu.py rename frigate/{process_clip.py => process_clip_del.py} (99%) diff --git a/frigate/app.py b/frigate/app.py index 7b991d9db..b5c2955e6 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -16,7 +16,7 @@ from pydantic import ValidationError from frigate.config import DetectorTypeEnum, FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR -from frigate.edgetpu import EdgeTPUProcess +from frigate.detection import DetectionProcess from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer @@ -39,7 +39,7 @@ class FrigateApp: self.base_config: FrigateConfig = None self.config: FrigateConfig = None self.detection_queue = mp.Queue() - self.detectors: Dict[str, EdgeTPUProcess] = {} + self.detectors: Dict[str, DetectionProcess] = {} self.detection_out_events: Dict[str, mp.Event] = {} self.detection_shms: List[mp.shared_memory.SharedMemory] = [] self.log_queue = mp.Queue() @@ -172,27 +172,15 @@ class FrigateApp: self.detection_shms.append(shm_in) self.detection_shms.append(shm_out) - for name, detector in self.config.detectors.items(): - if detector.type == DetectorTypeEnum.cpu: - self.detectors[name] = EdgeTPUProcess( - name, - self.detection_queue, - self.detection_out_events, - model_path, - model_shape, - "cpu", - detector.num_threads, - ) - if detector.type == DetectorTypeEnum.edgetpu: - self.detectors[name] = EdgeTPUProcess( - name, - self.detection_queue, - self.detection_out_events, - model_path, - model_shape, - detector.device, - detector.num_threads, - ) + for name, detector_config in self.config.detectors.items(): + self.detectors[name] = DetectionProcess( + name, + self.detection_queue, + self.detection_out_events, + model_path, + model_shape, + detector_config, + ) def start_detected_frames_processor(self): self.detected_frames_processor = TrackedObjectProcessor( diff --git a/frigate/config.py b/frigate/config.py index 91d66afb4..0796c2a10 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -13,8 +13,7 @@ from pydantic import BaseModel, Extra, Field, validator, root_validator from pydantic.fields import PrivateAttr from frigate.const import BASE_DIR, CACHE_DIR, RECORD_DIR -from frigate.edgetpu import load_labels -from frigate.util import create_mask, deep_merge +from frigate.util import create_mask, deep_merge, load_labels logger = logging.getLogger(__name__) @@ -845,7 +844,7 @@ class FrigateConfig(FrigateBaseModel): ) @property - def runtime_config(self): + def runtime_config(self) -> FrigateConfig: """Merge camera config with globals.""" config = self.copy(deep=True) diff --git a/frigate/detection/__init__.py b/frigate/detection/__init__.py new file mode 100644 index 000000000..d0ff60440 --- /dev/null +++ b/frigate/detection/__init__.py @@ -0,0 +1,114 @@ +import logging +import numpy as np +import multiprocessing as mp +from frigate.util import EventsPerSecond +from frigate.config import DetectorConfig, DetectorTypeEnum + +logger = logging.getLogger(__name__) + + +class DetectionProcess: + def __init__( + self, + name, + detection_queue, + out_events, + model_path, + model_shape, + detector_config: DetectorConfig, + ): + self.name = name + self.out_events = out_events + self.detection_queue = detection_queue + self.avg_inference_speed = mp.Value("d", 0.01) + self.detection_start = mp.Value("d", 0.0) + self.detect_process = None + self.model_path = model_path + self.model_shape = model_shape + self.detector_config = detector_config + + self.detector_target = None + if ( + detector_config.type == DetectorTypeEnum.cpu + or detector_config.type == DetectorTypeEnum.edgetpu + ): + from .edgetpu import run_detector as edgetpu_detector + + self.detector_target = edgetpu_detector + + assert self.detector_target, "Invalid detector configuration" + + self.start_or_restart() + + def stop(self): + self.detect_process.terminate() + logging.info("Waiting for detection process to exit gracefully...") + self.detect_process.join(timeout=30) + if self.detect_process.exitcode is None: + logging.info("Detection process didnt exit. Force killing...") + self.detect_process.kill() + self.detect_process.join() + + def start_or_restart(self): + self.detection_start.value = 0.0 + if (not self.detect_process is None) and self.detect_process.is_alive(): + self.stop() + self.detect_process = mp.Process( + target=self.detector_target, + name=f"detector:{self.name}", + args=( + self.name, + self.detection_queue, + self.out_events, + self.avg_inference_speed, + self.detection_start, + self.model_path, + self.model_shape, + self.detector_config, + ), + ) + self.detect_process.daemon = True + self.detect_process.start() + + +class RemoteObjectDetector: + def __init__(self, name, labels, detection_queue, event, model_shape): + self.labels = labels + self.name = name + self.fps = EventsPerSecond() + self.detection_queue = detection_queue + self.event = event + self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False) + self.np_shm = np.ndarray( + (1, model_shape[0], model_shape[1], 3), dtype=np.uint8, buffer=self.shm.buf + ) + self.out_shm = mp.shared_memory.SharedMemory( + name=f"out-{self.name}", create=False + ) + self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) + + def detect(self, tensor_input, threshold=0.4): + detections = [] + + # copy input to shared memory + self.np_shm[:] = tensor_input[:] + self.event.clear() + self.detection_queue.put(self.name) + result = self.event.wait(timeout=10.0) + + # if it timed out + if result is None: + return detections + + for d in self.out_np_shm: + if d[1] < threshold: + break + detections.append( + (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) + ) + self.fps.update() + return detections + + def cleanup(self): + self.shm.unlink() + self.out_shm.unlink() diff --git a/frigate/detection/edgetpu.py b/frigate/detection/edgetpu.py new file mode 100644 index 000000000..4919ca805 --- /dev/null +++ b/frigate/detection/edgetpu.py @@ -0,0 +1,167 @@ +import datetime +import logging +import multiprocessing as mp +import os +import queue +import signal +import threading +from frigate.config import DetectorConfig +from typing import Dict + +import numpy as np + +# import tflite_runtime.interpreter as tflite +from setproctitle import setproctitle + +# from tflite_runtime.interpreter import load_delegate + +from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen +from .object_detector import ObjectDetector + +logger = logging.getLogger(__name__) + + +class LocalObjectDetector(ObjectDetector): + def __init__(self, tf_device=None, model_path=None, num_threads=3): + self.fps = EventsPerSecond() + # TODO: process_clip + # if labels is None: + # self.labels = {} + # else: + # self.labels = load_labels(labels) + + device_config = {"device": "usb"} + if not tf_device is None: + device_config = {"device": tf_device} + + edge_tpu_delegate = None + + # if tf_device != "cpu": + # try: + # logger.info(f"Attempting to load TPU as {device_config['device']}") + # edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config) + # logger.info("TPU found") + # self.interpreter = tflite.Interpreter( + # model_path=model_path or "/edgetpu_model.tflite", + # experimental_delegates=[edge_tpu_delegate], + # ) + # except ValueError: + # logger.error( + # "No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors." + # ) + # raise + # else: + # logger.warning( + # "CPU detectors are not recommended and should only be used for testing or for trial purposes." + # ) + # self.interpreter = tflite.Interpreter( + # model_path=model_path or "/cpu_model.tflite", num_threads=num_threads + # ) + + # self.interpreter.allocate_tensors() + + # self.tensor_input_details = self.interpreter.get_input_details() + # self.tensor_output_details = self.interpreter.get_output_details() + + def detect(self, tensor_input, threshold=0.4): + # TODO: called from process_clip + detections = [] + assert False, "implement detect() for process_clip.py" + + # raw_detections = self.detect_raw(tensor_input) + + # for d in raw_detections: + # if d[1] < threshold: + # break + # detections.append( + # (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) + # ) + # self.fps.update() + return detections + + def detect_raw(self, tensor_input): + # self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) + # self.interpreter.invoke() + + # boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] + # class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] + # scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] + # count = int( + # self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] + # ) + + detections = np.zeros((20, 6), np.float32) + + # for i in range(count): + # if scores[i] < 0.4 or i == 20: + # break + # detections[i] = [ + # class_ids[i], + # float(scores[i]), + # boxes[i][0], + # boxes[i][1], + # boxes[i][2], + # boxes[i][3], + # ] + + return detections + + +def run_detector( + name: str, + detection_queue: mp.Queue, + out_events: Dict[str, mp.Event], + avg_speed, + start, + model_path, + model_shape, + detector_config: DetectorConfig, +): + threading.current_thread().name = f"detector:{name}" + logger = logging.getLogger(f"detector.{name}") + logger.info(f"Starting detection process: {os.getpid()}") + setproctitle(f"frigate.detector.{name}") + listen() + + stop_event = mp.Event() + + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector( + tf_device=detector_config.device, + model_path=model_path, + num_threads=detector_config.num_threads, + ) + + outputs = {} + for name in out_events.keys(): + out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False) + out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) + outputs[name] = {"shm": out_shm, "np": out_np} + + while not stop_event.is_set(): + try: + connection_id = detection_queue.get(timeout=5) + except queue.Empty: + continue + input_frame = frame_manager.get( + connection_id, (1, model_shape[0], model_shape[1], 3) + ) + + if input_frame is None: + continue + + # detect and send the output + start.value = datetime.datetime.now().timestamp() + detections = object_detector.detect_raw(input_frame) + duration = datetime.datetime.now().timestamp() - start.value + outputs[connection_id]["np"][:] = detections[:] + out_events[connection_id].set() + start.value = 0.0 + + avg_speed.value = (avg_speed.value * 9 + duration) / 10 diff --git a/frigate/detection/object_detector.py b/frigate/detection/object_detector.py new file mode 100644 index 000000000..d12569b6b --- /dev/null +++ b/frigate/detection/object_detector.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class ObjectDetector(ABC): + @abstractmethod + def detect(self, tensor_input, threshold=0.4): + pass diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py deleted file mode 100644 index 1992c6b35..000000000 --- a/frigate/edgetpu.py +++ /dev/null @@ -1,284 +0,0 @@ -import datetime -import logging -import multiprocessing as mp -import os -import queue -import signal -import threading -from abc import ABC, abstractmethod -from typing import Dict - -import numpy as np -import tflite_runtime.interpreter as tflite -from setproctitle import setproctitle -from tflite_runtime.interpreter import load_delegate - -from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen - -logger = logging.getLogger(__name__) - - -def load_labels(path, encoding="utf-8"): - """Loads labels from file (with or without index numbers). - Args: - path: path to label file. - encoding: label file encoding. - Returns: - Dictionary mapping indices to labels. - """ - with open(path, "r", encoding=encoding) as f: - lines = f.readlines() - if not lines: - return {} - - if lines[0].split(" ", maxsplit=1)[0].isdigit(): - pairs = [line.split(" ", maxsplit=1) for line in lines] - return {int(index): label.strip() for index, label in pairs} - else: - return {index: line.strip() for index, line in enumerate(lines)} - - -class ObjectDetector(ABC): - @abstractmethod - def detect(self, tensor_input, threshold=0.4): - pass - - -class LocalObjectDetector(ObjectDetector): - def __init__(self, tf_device=None, model_path=None, num_threads=3, labels=None): - self.fps = EventsPerSecond() - if labels is None: - self.labels = {} - else: - self.labels = load_labels(labels) - - device_config = {"device": "usb"} - if not tf_device is None: - device_config = {"device": tf_device} - - edge_tpu_delegate = None - - if tf_device != "cpu": - try: - logger.info(f"Attempting to load TPU as {device_config['device']}") - edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config) - logger.info("TPU found") - self.interpreter = tflite.Interpreter( - model_path=model_path or "/edgetpu_model.tflite", - experimental_delegates=[edge_tpu_delegate], - ) - except ValueError: - logger.error( - "No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors." - ) - raise - else: - logger.warning( - "CPU detectors are not recommended and should only be used for testing or for trial purposes." - ) - self.interpreter = tflite.Interpreter( - model_path=model_path or "/cpu_model.tflite", num_threads=num_threads - ) - - self.interpreter.allocate_tensors() - - self.tensor_input_details = self.interpreter.get_input_details() - self.tensor_output_details = self.interpreter.get_output_details() - - def detect(self, tensor_input, threshold=0.4): - detections = [] - - raw_detections = self.detect_raw(tensor_input) - - for d in raw_detections: - if d[1] < threshold: - break - detections.append( - (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) - ) - self.fps.update() - return detections - - def detect_raw(self, tensor_input): - self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) - self.interpreter.invoke() - - boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] - class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] - scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] - count = int( - self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] - ) - - detections = np.zeros((20, 6), np.float32) - - for i in range(count): - if scores[i] < 0.4 or i == 20: - break - detections[i] = [ - class_ids[i], - float(scores[i]), - boxes[i][0], - boxes[i][1], - boxes[i][2], - boxes[i][3], - ] - - return detections - - -def run_detector( - name: str, - detection_queue: mp.Queue, - out_events: Dict[str, mp.Event], - avg_speed, - start, - model_path, - model_shape, - tf_device, - num_threads, -): - threading.current_thread().name = f"detector:{name}" - logger = logging.getLogger(f"detector.{name}") - logger.info(f"Starting detection process: {os.getpid()}") - setproctitle(f"frigate.detector.{name}") - listen() - - stop_event = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - frame_manager = SharedMemoryFrameManager() - object_detector = LocalObjectDetector( - tf_device=tf_device, model_path=model_path, num_threads=num_threads - ) - - outputs = {} - for name in out_events.keys(): - out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False) - out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) - outputs[name] = {"shm": out_shm, "np": out_np} - - while not stop_event.is_set(): - try: - connection_id = detection_queue.get(timeout=5) - except queue.Empty: - continue - input_frame = frame_manager.get( - connection_id, (1, model_shape[0], model_shape[1], 3) - ) - - if input_frame is None: - continue - - # detect and send the output - start.value = datetime.datetime.now().timestamp() - detections = object_detector.detect_raw(input_frame) - duration = datetime.datetime.now().timestamp() - start.value - outputs[connection_id]["np"][:] = detections[:] - out_events[connection_id].set() - start.value = 0.0 - - avg_speed.value = (avg_speed.value * 9 + duration) / 10 - - -class EdgeTPUProcess: - def __init__( - self, - name, - detection_queue, - out_events, - model_path, - model_shape, - tf_device=None, - num_threads=3, - ): - self.name = name - self.out_events = out_events - self.detection_queue = detection_queue - self.avg_inference_speed = mp.Value("d", 0.01) - self.detection_start = mp.Value("d", 0.0) - self.detect_process = None - self.model_path = model_path - self.model_shape = model_shape - self.tf_device = tf_device - self.num_threads = num_threads - self.start_or_restart() - - def stop(self): - self.detect_process.terminate() - logging.info("Waiting for detection process to exit gracefully...") - self.detect_process.join(timeout=30) - if self.detect_process.exitcode is None: - logging.info("Detection process didnt exit. Force killing...") - self.detect_process.kill() - self.detect_process.join() - - def start_or_restart(self): - self.detection_start.value = 0.0 - if (not self.detect_process is None) and self.detect_process.is_alive(): - self.stop() - self.detect_process = mp.Process( - target=run_detector, - name=f"detector:{self.name}", - args=( - self.name, - self.detection_queue, - self.out_events, - self.avg_inference_speed, - self.detection_start, - self.model_path, - self.model_shape, - self.tf_device, - self.num_threads, - ), - ) - self.detect_process.daemon = True - self.detect_process.start() - - -class RemoteObjectDetector: - def __init__(self, name, labels, detection_queue, event, model_shape): - self.labels = labels - self.name = name - self.fps = EventsPerSecond() - self.detection_queue = detection_queue - self.event = event - self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False) - self.np_shm = np.ndarray( - (1, model_shape[0], model_shape[1], 3), dtype=np.uint8, buffer=self.shm.buf - ) - self.out_shm = mp.shared_memory.SharedMemory( - name=f"out-{self.name}", create=False - ) - self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) - - def detect(self, tensor_input, threshold=0.4): - detections = [] - - # copy input to shared memory - self.np_shm[:] = tensor_input[:] - self.event.clear() - self.detection_queue.put(self.name) - result = self.event.wait(timeout=10.0) - - # if it timed out - if result is None: - return detections - - for d in self.out_np_shm: - if d[1] < threshold: - break - detections.append( - (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) - ) - self.fps.update() - return detections - - def cleanup(self): - self.shm.unlink() - self.out_shm.unlink() diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 2597893d7..bfbc0e414 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -18,7 +18,6 @@ import numpy as np from frigate.config import CameraConfig, SnapshotsConfig, RecordConfig, FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR -from frigate.edgetpu import load_labels from frigate.util import ( SharedMemoryFrameManager, calculate_region, diff --git a/frigate/process_clip.py b/frigate/process_clip_del.py similarity index 99% rename from frigate/process_clip.py rename to frigate/process_clip_del.py index 50ef4d860..edc381b87 100644 --- a/frigate/process_clip.py +++ b/frigate/process_clip_del.py @@ -12,6 +12,7 @@ import cv2 import numpy as np from frigate.config import FRIGATE_CONFIG_SCHEMA, FrigateConfig +#TODO: refactor from frigate.edgetpu import LocalObjectDetector from frigate.motion import MotionDetector from frigate.object_processing import CameraState diff --git a/frigate/util.py b/frigate/util.py index 24a75b775..aa7abd004 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -602,6 +602,25 @@ def add_mask(mask, mask_img): cv2.fillPoly(mask_img, pts=[contour], color=(0)) +def load_labels(path, encoding="utf-8"): + """Loads labels from file (with or without index numbers). + Args: + path: path to label file. + encoding: label file encoding. + Returns: + Dictionary mapping indices to labels. + """ + with open(path, "r", encoding=encoding) as f: + lines = f.readlines() + if not lines: + return {} + + if lines[0].split(" ", maxsplit=1)[0].isdigit(): + pairs = [line.split(" ", maxsplit=1) for line in lines] + return {int(index): label.strip() for index, label in pairs} + else: + return {index: line.strip() for index, line in enumerate(lines)} + class FrameManager(ABC): @abstractmethod def create(self, name, size) -> AnyStr: diff --git a/frigate/video.py b/frigate/video.py index 30bbe1c73..c4d18cf5a 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -15,7 +15,7 @@ from cv2 import cv2 import numpy as np from frigate.config import CameraConfig -from frigate.edgetpu import RemoteObjectDetector +from frigate.detection import RemoteObjectDetector from frigate.log import LogPipe from frigate.motion import MotionDetector from frigate.objects import ObjectTracker