diff --git a/frigate/app.py b/frigate/app.py index 051c845e1..46ff9dc41 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -16,7 +16,7 @@ from pydantic import ValidationError from frigate.config import DetectorTypeEnum, FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR -from frigate.edgetpu import EdgeTPUProcess +from frigate.detection import DetectionProcess from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer @@ -39,7 +39,7 @@ class FrigateApp: self.base_config: FrigateConfig = None self.config: FrigateConfig = None self.detection_queue = mp.Queue() - self.detectors: Dict[str, EdgeTPUProcess] = {} + self.detectors: Dict[str, DetectionProcess] = {} self.detection_out_events: Dict[str, mp.Event] = {} self.detection_shms: List[mp.shared_memory.SharedMemory] = [] self.log_queue = mp.Queue() @@ -181,27 +181,15 @@ class FrigateApp: self.detection_shms.append(shm_in) self.detection_shms.append(shm_out) - for name, detector in self.config.detectors.items(): - if detector.type == DetectorTypeEnum.cpu: - self.detectors[name] = EdgeTPUProcess( - name, - self.detection_queue, - self.detection_out_events, - model_path, - model_shape, - "cpu", - detector.num_threads, - ) - if detector.type == DetectorTypeEnum.edgetpu: - self.detectors[name] = EdgeTPUProcess( - name, - self.detection_queue, - self.detection_out_events, - model_path, - model_shape, - detector.device, - detector.num_threads, - ) + for name, detector_config in self.config.detectors.items(): + self.detectors[name] = DetectionProcess( + name, + self.detection_queue, + self.detection_out_events, + model_path, + model_shape, + detector_config, + ) def start_detected_frames_processor(self): self.detected_frames_processor = TrackedObjectProcessor( diff --git a/frigate/config.py b/frigate/config.py index 59a7ef8c1..f2ceece65 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -866,7 +866,7 @@ class FrigateConfig(FrigateBaseModel): ) @property - def runtime_config(self): + def runtime_config(self) -> FrigateConfig: """Merge camera config with globals.""" config = self.copy(deep=True) diff --git a/frigate/detection/__init__.py b/frigate/detection/__init__.py new file mode 100644 index 000000000..d0ff60440 --- /dev/null +++ b/frigate/detection/__init__.py @@ -0,0 +1,114 @@ +import logging +import numpy as np +import multiprocessing as mp +from frigate.util import EventsPerSecond +from frigate.config import DetectorConfig, DetectorTypeEnum + +logger = logging.getLogger(__name__) + + +class DetectionProcess: + def __init__( + self, + name, + detection_queue, + out_events, + model_path, + model_shape, + detector_config: DetectorConfig, + ): + self.name = name + self.out_events = out_events + self.detection_queue = detection_queue + self.avg_inference_speed = mp.Value("d", 0.01) + self.detection_start = mp.Value("d", 0.0) + self.detect_process = None + self.model_path = model_path + self.model_shape = model_shape + self.detector_config = detector_config + + self.detector_target = None + if ( + detector_config.type == DetectorTypeEnum.cpu + or detector_config.type == DetectorTypeEnum.edgetpu + ): + from .edgetpu import run_detector as edgetpu_detector + + self.detector_target = edgetpu_detector + + assert self.detector_target, "Invalid detector configuration" + + self.start_or_restart() + + def stop(self): + self.detect_process.terminate() + logging.info("Waiting for detection process to exit gracefully...") + self.detect_process.join(timeout=30) + if self.detect_process.exitcode is None: + logging.info("Detection process didnt exit. Force killing...") + self.detect_process.kill() + self.detect_process.join() + + def start_or_restart(self): + self.detection_start.value = 0.0 + if (not self.detect_process is None) and self.detect_process.is_alive(): + self.stop() + self.detect_process = mp.Process( + target=self.detector_target, + name=f"detector:{self.name}", + args=( + self.name, + self.detection_queue, + self.out_events, + self.avg_inference_speed, + self.detection_start, + self.model_path, + self.model_shape, + self.detector_config, + ), + ) + self.detect_process.daemon = True + self.detect_process.start() + + +class RemoteObjectDetector: + def __init__(self, name, labels, detection_queue, event, model_shape): + self.labels = labels + self.name = name + self.fps = EventsPerSecond() + self.detection_queue = detection_queue + self.event = event + self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False) + self.np_shm = np.ndarray( + (1, model_shape[0], model_shape[1], 3), dtype=np.uint8, buffer=self.shm.buf + ) + self.out_shm = mp.shared_memory.SharedMemory( + name=f"out-{self.name}", create=False + ) + self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) + + def detect(self, tensor_input, threshold=0.4): + detections = [] + + # copy input to shared memory + self.np_shm[:] = tensor_input[:] + self.event.clear() + self.detection_queue.put(self.name) + result = self.event.wait(timeout=10.0) + + # if it timed out + if result is None: + return detections + + for d in self.out_np_shm: + if d[1] < threshold: + break + detections.append( + (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) + ) + self.fps.update() + return detections + + def cleanup(self): + self.shm.unlink() + self.out_shm.unlink() diff --git a/frigate/detection/edgetpu.py b/frigate/detection/edgetpu.py new file mode 100644 index 000000000..4919ca805 --- /dev/null +++ b/frigate/detection/edgetpu.py @@ -0,0 +1,167 @@ +import datetime +import logging +import multiprocessing as mp +import os +import queue +import signal +import threading +from frigate.config import DetectorConfig +from typing import Dict + +import numpy as np + +# import tflite_runtime.interpreter as tflite +from setproctitle import setproctitle + +# from tflite_runtime.interpreter import load_delegate + +from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen +from .object_detector import ObjectDetector + +logger = logging.getLogger(__name__) + + +class LocalObjectDetector(ObjectDetector): + def __init__(self, tf_device=None, model_path=None, num_threads=3): + self.fps = EventsPerSecond() + # TODO: process_clip + # if labels is None: + # self.labels = {} + # else: + # self.labels = load_labels(labels) + + device_config = {"device": "usb"} + if not tf_device is None: + device_config = {"device": tf_device} + + edge_tpu_delegate = None + + # if tf_device != "cpu": + # try: + # logger.info(f"Attempting to load TPU as {device_config['device']}") + # edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config) + # logger.info("TPU found") + # self.interpreter = tflite.Interpreter( + # model_path=model_path or "/edgetpu_model.tflite", + # experimental_delegates=[edge_tpu_delegate], + # ) + # except ValueError: + # logger.error( + # "No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors." + # ) + # raise + # else: + # logger.warning( + # "CPU detectors are not recommended and should only be used for testing or for trial purposes." + # ) + # self.interpreter = tflite.Interpreter( + # model_path=model_path or "/cpu_model.tflite", num_threads=num_threads + # ) + + # self.interpreter.allocate_tensors() + + # self.tensor_input_details = self.interpreter.get_input_details() + # self.tensor_output_details = self.interpreter.get_output_details() + + def detect(self, tensor_input, threshold=0.4): + # TODO: called from process_clip + detections = [] + assert False, "implement detect() for process_clip.py" + + # raw_detections = self.detect_raw(tensor_input) + + # for d in raw_detections: + # if d[1] < threshold: + # break + # detections.append( + # (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) + # ) + # self.fps.update() + return detections + + def detect_raw(self, tensor_input): + # self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) + # self.interpreter.invoke() + + # boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] + # class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] + # scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] + # count = int( + # self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] + # ) + + detections = np.zeros((20, 6), np.float32) + + # for i in range(count): + # if scores[i] < 0.4 or i == 20: + # break + # detections[i] = [ + # class_ids[i], + # float(scores[i]), + # boxes[i][0], + # boxes[i][1], + # boxes[i][2], + # boxes[i][3], + # ] + + return detections + + +def run_detector( + name: str, + detection_queue: mp.Queue, + out_events: Dict[str, mp.Event], + avg_speed, + start, + model_path, + model_shape, + detector_config: DetectorConfig, +): + threading.current_thread().name = f"detector:{name}" + logger = logging.getLogger(f"detector.{name}") + logger.info(f"Starting detection process: {os.getpid()}") + setproctitle(f"frigate.detector.{name}") + listen() + + stop_event = mp.Event() + + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector( + tf_device=detector_config.device, + model_path=model_path, + num_threads=detector_config.num_threads, + ) + + outputs = {} + for name in out_events.keys(): + out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False) + out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) + outputs[name] = {"shm": out_shm, "np": out_np} + + while not stop_event.is_set(): + try: + connection_id = detection_queue.get(timeout=5) + except queue.Empty: + continue + input_frame = frame_manager.get( + connection_id, (1, model_shape[0], model_shape[1], 3) + ) + + if input_frame is None: + continue + + # detect and send the output + start.value = datetime.datetime.now().timestamp() + detections = object_detector.detect_raw(input_frame) + duration = datetime.datetime.now().timestamp() - start.value + outputs[connection_id]["np"][:] = detections[:] + out_events[connection_id].set() + start.value = 0.0 + + avg_speed.value = (avg_speed.value * 9 + duration) / 10 diff --git a/frigate/detection/object_detector.py b/frigate/detection/object_detector.py new file mode 100644 index 000000000..d12569b6b --- /dev/null +++ b/frigate/detection/object_detector.py @@ -0,0 +1,7 @@ +from abc import ABC, abstractmethod + + +class ObjectDetector(ABC): + @abstractmethod + def detect(self, tensor_input, threshold=0.4): + pass diff --git a/frigate/util.py b/frigate/util.py index 85835e933..5d0608c2e 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -620,6 +620,25 @@ def load_labels(path, encoding="utf-8"): else: return {index: line.strip() for index, line in enumerate(lines)} +def load_labels(path, encoding="utf-8"): + """Loads labels from file (with or without index numbers). + Args: + path: path to label file. + encoding: label file encoding. + Returns: + Dictionary mapping indices to labels. + """ + with open(path, "r", encoding=encoding) as f: + lines = f.readlines() + if not lines: + return {} + + if lines[0].split(" ", maxsplit=1)[0].isdigit(): + pairs = [line.split(" ", maxsplit=1) for line in lines] + return {int(index): label.strip() for index, label in pairs} + else: + return {index: line.strip() for index, line in enumerate(lines)} + class FrameManager(ABC): @abstractmethod def create(self, name, size) -> AnyStr: diff --git a/frigate/video.py b/frigate/video.py index 19770dccc..d7c3d2b56 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -15,7 +15,7 @@ from cv2 import cv2, reduce from setproctitle import setproctitle from frigate.config import CameraConfig, DetectConfig -from frigate.edgetpu import RemoteObjectDetector +from frigate.detection import RemoteObjectDetector from frigate.log import LogPipe from frigate.motion import MotionDetector from frigate.objects import ObjectTracker