add detector model config models

This commit is contained in:
Dennis George 2022-12-08 22:05:54 -06:00
parent 3f524da4ca
commit e45f2f2d32
13 changed files with 180 additions and 114 deletions

View File

@ -3,7 +3,7 @@ from statistics import mean
import multiprocessing as mp import multiprocessing as mp
import numpy as np import numpy as np
import datetime import datetime
from frigate.config import DetectorTypeEnum from frigate.detectors import DetectorTypeEnum
from frigate.object_detection import ( from frigate.object_detection import (
LocalObjectDetector, LocalObjectDetector,
ObjectDetectProcess, ObjectDetectProcess,

View File

@ -9,7 +9,7 @@ from typing import Dict, List, Optional, Tuple, Union
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
import yaml import yaml
from pydantic import BaseModel, Extra, Field, validator from pydantic import BaseModel, Extra, Field, validator, parse_obj_as
from pydantic.fields import PrivateAttr from pydantic.fields import PrivateAttr
from frigate.const import ( from frigate.const import (
@ -24,7 +24,6 @@ from frigate.util import (
get_ffmpeg_arg_list, get_ffmpeg_arg_list,
escape_special_characters, escape_special_characters,
load_config_with_no_duplicates, load_config_with_no_duplicates,
load_labels,
) )
from frigate.ffmpeg_presets import ( from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration, parse_preset_hardware_acceleration,
@ -33,7 +32,8 @@ from frigate.ffmpeg_presets import (
parse_preset_output_rtmp, parse_preset_output_rtmp,
) )
from frigate.version import VERSION from frigate.version import VERSION
from frigate.detectors import DetectorTypeEnum from frigate.detectors.config import ModelConfig, DetectorConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -53,12 +53,6 @@ class FrigateBaseModel(BaseModel):
extra = Extra.forbid extra = Extra.forbid
class DetectorConfig(FrigateBaseModel):
type: DetectorTypeEnum = Field(default=DetectorTypeEnum.cpu, title="Detector Type")
device: str = Field(default="usb", title="Device Type")
num_threads: int = Field(default=3, title="Number of detection threads")
class UIConfig(FrigateBaseModel): class UIConfig(FrigateBaseModel):
use_experimental: bool = Field(default=False, title="Experimental UI") use_experimental: bool = Field(default=False, title="Experimental UI")
@ -720,57 +714,6 @@ class DatabaseConfig(FrigateBaseModel):
) )
class PixelFormatEnum(str, Enum):
rgb = "rgb"
bgr = "bgr"
yuv = "yuv"
class InputTensorEnum(str, Enum):
nchw = "nchw"
nhwc = "nhwc"
class ModelConfig(FrigateBaseModel):
path: Optional[str] = Field(title="Custom Object detection model path.")
labelmap_path: Optional[str] = Field(title="Label map for custom object detector.")
width: int = Field(default=320, title="Object detection model input width.")
height: int = Field(default=320, title="Object detection model input height.")
labelmap: Dict[int, str] = Field(
default_factory=dict, title="Labelmap customization."
)
input_tensor: InputTensorEnum = Field(
default=InputTensorEnum.nhwc, title="Model Input Tensor Shape"
)
input_pixel_format: PixelFormatEnum = Field(
default=PixelFormatEnum.rgb, title="Model Input Pixel Color Format"
)
_merged_labelmap: Optional[Dict[int, str]] = PrivateAttr()
_colormap: Dict[int, Tuple[int, int, int]] = PrivateAttr()
@property
def merged_labelmap(self) -> Dict[int, str]:
return self._merged_labelmap
@property
def colormap(self) -> Dict[int, Tuple[int, int, int]]:
return self._colormap
def __init__(self, **config):
super().__init__(**config)
self._merged_labelmap = {
**load_labels(config.get("labelmap_path", "/labelmap.txt")),
**config.get("labelmap", {}),
}
cmap = plt.cm.get_cmap("tab10", len(self._merged_labelmap.keys()))
self._colormap = {}
for key, val in self._merged_labelmap.items():
self._colormap[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
class LogLevelEnum(str, Enum): class LogLevelEnum(str, Enum):
debug = "debug" debug = "debug"
info = "info" info = "info"
@ -882,10 +825,10 @@ class FrigateConfig(FrigateBaseModel):
) )
ui: UIConfig = Field(default_factory=UIConfig, title="UI configuration.") ui: UIConfig = Field(default_factory=UIConfig, title="UI configuration.")
model: ModelConfig = Field( model: ModelConfig = Field(
default_factory=ModelConfig, title="Detection model configuration." default_factory=ModelConfig, title="Default detection model configuration."
) )
detectors: Dict[str, DetectorConfig] = Field( detectors: Dict[str, DetectorConfig] = Field(
default={name: DetectorConfig(**d) for name, d in DEFAULT_DETECTORS.items()}, default=DEFAULT_DETECTORS,
title="Detector hardware configuration.", title="Detector hardware configuration.",
) )
logger: LoggerConfig = Field( logger: LoggerConfig = Field(
@ -1027,6 +970,13 @@ class FrigateConfig(FrigateBaseModel):
# generate the ffmpeg commands # generate the ffmpeg commands
camera_config.create_ffmpeg_cmds() camera_config.create_ffmpeg_cmds()
config.cameras[name] = camera_config config.cameras[name] = camera_config
for key, detector in config.detectors.items():
detector_config: DetectorConfig = parse_obj_as(DetectorConfig, detector)
if detector_config.model is None:
detector_config.model = config.model
config.detectors[key] = detector_config
return config return config
@validator("cameras") @validator("cameras")

View File

@ -2,34 +2,29 @@ import logging
from enum import Enum from enum import Enum
from .detection_api import DetectionApi from .detection_api import DetectionApi
from .detector_type import DetectorTypeEnum
from .cpu_tfl import CpuTfl from .cpu_tfl import CpuTfl
from .edgetpu_tfl import EdgeTpuTfl from .edgetpu_tfl import EdgeTpuTfl
from .openvino import OvDetector from .openvino import OvDetector
from .config import ModelConfig, DetectorConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DetectorTypeEnum(str, Enum): def create_detector(detector_config: DetectorConfig):
edgetpu = "edgetpu"
openvino = "openvino"
cpu = "cpu"
tensorrt = "tensorrt"
def create_detector(det_type: DetectorTypeEnum, **kwargs):
_api_types = { _api_types = {
DetectorTypeEnum.cpu: CpuTfl, DetectorTypeEnum.cpu: CpuTfl,
DetectorTypeEnum.edgetpu: EdgeTpuTfl, DetectorTypeEnum.edgetpu: EdgeTpuTfl,
DetectorTypeEnum.openvino: OvDetector, DetectorTypeEnum.openvino: OvDetector,
} }
if det_type == DetectorTypeEnum.cpu: if detector_config.type == DetectorTypeEnum.cpu:
logger.warning( logger.warning(
"CPU detectors are not recommended and should only be used for testing or for trial purposes." "CPU detectors are not recommended and should only be used for testing or for trial purposes."
) )
api = _api_types.get(det_type) api = _api_types.get(detector_config.type)
if not api: if not api:
raise ValueError(det_type) raise ValueError(detector_config.type)
return api(**kwargs) return api(detector_config)

View File

@ -0,0 +1,97 @@
import logging
from typing import Dict, List, Optional, Tuple, Union, Literal
from typing_extensions import Annotated
import matplotlib.pyplot as plt
from pydantic import BaseModel, Extra, Field, validator, parse_obj_as
from pydantic.fields import PrivateAttr
from frigate.enums import InputTensorEnum, PixelFormatEnum
from frigate.util import load_labels
from .detector_type import DetectorTypeEnum
logger = logging.getLogger(__name__)
class ModelConfig(BaseModel):
path: Optional[str] = Field(title="Custom Object detection model path.")
labelmap_path: Optional[str] = Field(title="Label map for custom object detector.")
width: int = Field(default=320, title="Object detection model input width.")
height: int = Field(default=320, title="Object detection model input height.")
labelmap: Dict[int, str] = Field(
default_factory=dict, title="Labelmap customization."
)
input_tensor: InputTensorEnum = Field(
default=InputTensorEnum.nhwc, title="Model Input Tensor Shape"
)
input_pixel_format: PixelFormatEnum = Field(
default=PixelFormatEnum.rgb, title="Model Input Pixel Color Format"
)
_merged_labelmap: Optional[Dict[int, str]] = PrivateAttr()
_colormap: Dict[int, Tuple[int, int, int]] = PrivateAttr()
@property
def merged_labelmap(self) -> Dict[int, str]:
return self._merged_labelmap
@property
def colormap(self) -> Dict[int, Tuple[int, int, int]]:
return self._colormap
def __init__(self, **config):
super().__init__(**config)
self._merged_labelmap = {
**load_labels(config.get("labelmap_path", "/labelmap.txt")),
**config.get("labelmap", {}),
}
cmap = plt.cm.get_cmap("tab10", len(self._merged_labelmap.keys()))
self._colormap = {}
for key, val in self._merged_labelmap.items():
self._colormap[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
class Config:
extra = Extra.forbid
class BaseDetectorConfig(BaseModel):
type: DetectorTypeEnum = Field(default=DetectorTypeEnum.cpu, title="Detector Type")
model: ModelConfig = Field(
default_factory=ModelConfig, title="Detector specific model configuration."
)
class Config:
extra = Extra.forbid
arbitrary_types_allowed = True
class CpuDetectorConfig(BaseDetectorConfig):
type: Literal[DetectorTypeEnum.cpu] = Field(
default=DetectorTypeEnum.cpu, title="Detector Type"
)
num_threads: int = Field(default=3, title="Number of detection threads")
class EdgeTpuDetectorConfig(BaseDetectorConfig):
type: Literal[DetectorTypeEnum.edgetpu] = Field(
default=DetectorTypeEnum.edgetpu, title="Detector Type"
)
device: str = Field(default="usb", title="Device Type")
class OpenVinoDetectorConfig(BaseDetectorConfig):
type: Literal[DetectorTypeEnum.openvino] = Field(
default=DetectorTypeEnum.openvino, title="Detector Type"
)
device: str = Field(default="usb", title="Device Type")
DetectorConfig = Annotated[
Union[CpuDetectorConfig, EdgeTpuDetectorConfig, OpenVinoDetectorConfig],
Field(discriminator="type"),
]
DEFAULT_DETECTORS = parse_obj_as(Dict[str, DetectorConfig], {"cpu": {"type": "cpu"}})

View File

@ -2,6 +2,7 @@ import logging
import numpy as np import numpy as np
from .detection_api import DetectionApi from .detection_api import DetectionApi
from .config import CpuDetectorConfig
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
@ -9,9 +10,10 @@ logger = logging.getLogger(__name__)
class CpuTfl(DetectionApi): class CpuTfl(DetectionApi):
def __init__(self, det_device=None, model_config=None, num_threads=3, **kwargs): def __init__(self, detector_config: CpuDetectorConfig):
self.interpreter = tflite.Interpreter( self.interpreter = tflite.Interpreter(
model_path=model_config.path or "/cpu_model.tflite", num_threads=num_threads model_path=detector_config.model.path or "/cpu_model.tflite",
num_threads=detector_config.num_threads,
) )
self.interpreter.allocate_tensors() self.interpreter.allocate_tensors()

View File

@ -0,0 +1,8 @@
from enum import Enum
class DetectorTypeEnum(str, Enum):
edgetpu = "edgetpu"
openvino = "openvino"
cpu = "cpu"
tensorrt = "tensorrt"

View File

@ -2,6 +2,7 @@ import logging
import numpy as np import numpy as np
from .detection_api import DetectionApi from .detection_api import DetectionApi
from .config import EdgeTpuDetectorConfig
import tflite_runtime.interpreter as tflite import tflite_runtime.interpreter as tflite
from tflite_runtime.interpreter import load_delegate from tflite_runtime.interpreter import load_delegate
@ -10,10 +11,10 @@ logger = logging.getLogger(__name__)
class EdgeTpuTfl(DetectionApi): class EdgeTpuTfl(DetectionApi):
def __init__(self, det_device=None, model_config=None, **kwargs): def __init__(self, detector_config: EdgeTpuDetectorConfig):
device_config = {"device": "usb"} device_config = {"device": "usb"}
if not det_device is None: if not detector_config.device is None:
device_config = {"device": det_device} device_config = {"device": detector_config.device}
edge_tpu_delegate = None edge_tpu_delegate = None
@ -22,7 +23,7 @@ class EdgeTpuTfl(DetectionApi):
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config) edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
logger.info("TPU found") logger.info("TPU found")
self.interpreter = tflite.Interpreter( self.interpreter = tflite.Interpreter(
model_path=model_config.path or "/edgetpu_model.tflite", model_path=detector_config.model.path or "/edgetpu_model.tflite",
experimental_delegates=[edge_tpu_delegate], experimental_delegates=[edge_tpu_delegate],
) )
except ValueError: except ValueError:

View File

@ -3,18 +3,19 @@ import numpy as np
import openvino.runtime as ov import openvino.runtime as ov
from .detection_api import DetectionApi from .detection_api import DetectionApi
from .config import OpenVinoDetectorConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class OvDetector(DetectionApi): class OvDetector(DetectionApi):
def __init__(self, det_device=None, model_config=None, num_threads=1, **kwargs): def __init__(self, detector_config: OpenVinoDetectorConfig):
self.ov_core = ov.Core() self.ov_core = ov.Core()
self.ov_model = self.ov_core.read_model(model_config.path) self.ov_model = self.ov_core.read_model(detector_config.model.path)
self.interpreter = self.ov_core.compile_model( self.interpreter = self.ov_core.compile_model(
model=self.ov_model, device_name=det_device model=self.ov_model, device_name=detector_config.device
) )
logger.info(f"Model Input Shape: {self.interpreter.input(0).shape}") logger.info(f"Model Input Shape: {self.interpreter.input(0).shape}")
self.output_indexes = 0 self.output_indexes = 0

12
frigate/enums.py Normal file
View File

@ -0,0 +1,12 @@
from enum import Enum
class PixelFormatEnum(str, Enum):
rgb = "rgb"
bgr = "bgr"
yuv = "yuv"
class InputTensorEnum(str, Enum):
nchw = "nchw"
nhwc = "nhwc"

View File

@ -10,7 +10,7 @@ from abc import ABC, abstractmethod
import numpy as np import numpy as np
from setproctitle import setproctitle from setproctitle import setproctitle
from frigate.config import InputTensorEnum from frigate.enums import InputTensorEnum
from frigate.detectors import create_detector, DetectorTypeEnum from frigate.detectors import create_detector, DetectorTypeEnum
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels
@ -35,12 +35,11 @@ def tensor_transform(desired_shape):
class LocalObjectDetector(ObjectDetector): class LocalObjectDetector(ObjectDetector):
def __init__( def __init__(
self, self,
det_type=DetectorTypeEnum.cpu, detector_config=None,
det_device=None,
model_config=None,
num_threads=3,
labels=None, labels=None,
): ):
model_config = detector_config.model
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
if labels is None: if labels is None:
self.labels = {} self.labels = {}
@ -52,12 +51,7 @@ class LocalObjectDetector(ObjectDetector):
else: else:
self.input_transform = None self.input_transform = None
self.detect_api = create_detector( self.detect_api = create_detector(detector_config)
det_type,
det_device=det_device,
model_config=model_config,
num_threads=num_threads,
)
def detect(self, tensor_input, threshold=0.4): def detect(self, tensor_input, threshold=0.4):
detections = [] detections = []

View File

@ -5,8 +5,8 @@ from pydantic import ValidationError
from frigate.config import ( from frigate.config import (
BirdseyeModeEnum, BirdseyeModeEnum,
FrigateConfig, FrigateConfig,
DetectorTypeEnum,
) )
from frigate.detectors import DetectorTypeEnum
from frigate.util import load_config_with_no_duplicates from frigate.util import load_config_with_no_duplicates

View File

@ -2,7 +2,13 @@ import unittest
from unittest.mock import patch from unittest.mock import patch
import numpy as np import numpy as np
from frigate.config import DetectorTypeEnum, InputTensorEnum, ModelConfig from frigate.enums import InputTensorEnum
from frigate.detectors import DetectorTypeEnum
from frigate.detectors.config import (
CpuDetectorConfig,
EdgeTpuDetectorConfig,
ModelConfig,
)
import frigate.object_detection import frigate.object_detection
@ -12,33 +18,31 @@ class TestLocalObjectDetector(unittest.TestCase):
def test_localdetectorprocess_given_type_cpu_should_call_cputfl_init( def test_localdetectorprocess_given_type_cpu_should_call_cputfl_init(
self, mock_cputfl, mock_edgetputfl self, mock_cputfl, mock_edgetputfl
): ):
test_cfg = ModelConfig() test_cfg = CpuDetectorConfig(num_threads=6)
test_cfg.path = "/test/modelpath" test_cfg.model.path = "/test/modelpath"
test_obj = frigate.object_detection.LocalObjectDetector( test_obj = frigate.object_detection.LocalObjectDetector(
det_type=DetectorTypeEnum.cpu, model_config=test_cfg, num_threads=6 detector_config=test_cfg
) )
assert test_obj is not None assert test_obj is not None
mock_edgetputfl.assert_not_called() mock_edgetputfl.assert_not_called()
mock_cputfl.assert_called_once_with(model_config=test_cfg, num_threads=6) mock_cputfl.assert_called_once_with(detector_config=test_cfg)
@patch("frigate.detectors.edgetpu_tfl.EdgeTpuTfl") @patch("frigate.detectors.edgetpu_tfl.EdgeTpuTfl")
@patch("frigate.detectors.cpu_tfl.CpuTfl") @patch("frigate.detectors.cpu_tfl.CpuTfl")
def test_localdetectorprocess_given_type_edgtpu_should_call_edgtpu_init( def test_localdetectorprocess_given_type_edgtpu_should_call_edgtpu_init(
self, mock_cputfl, mock_edgetputfl self, mock_cputfl, mock_edgetputfl
): ):
test_cfg = ModelConfig() test_cfg = EdgeTpuDetectorConfig(device="usb")
test_cfg.path = "/test/modelpath" test_cfg.model.path = "/test/modelpath"
test_obj = frigate.object_detection.LocalObjectDetector( test_obj = frigate.object_detection.LocalObjectDetector(
det_type=DetectorTypeEnum.edgetpu, detector_config=test_cfg
det_device="usb",
model_config=test_cfg,
) )
assert test_obj is not None assert test_obj is not None
mock_cputfl.assert_not_called() mock_cputfl.assert_not_called()
mock_edgetputfl.assert_called_once_with(det_device="usb", model_config=test_cfg) mock_edgetputfl.assert_called_once_with(detector_config=test_cfg)
@patch("frigate.detectors.cpu_tfl.CpuTfl") @patch("frigate.detectors.cpu_tfl.CpuTfl")
def test_detect_raw_given_tensor_input_should_return_api_detect_raw_result( def test_detect_raw_given_tensor_input_should_return_api_detect_raw_result(
@ -47,7 +51,7 @@ class TestLocalObjectDetector(unittest.TestCase):
TEST_DATA = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] TEST_DATA = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
TEST_DETECT_RESULT = np.ndarray([1, 2, 4, 8, 16, 32]) TEST_DETECT_RESULT = np.ndarray([1, 2, 4, 8, 16, 32])
test_obj_detect = frigate.object_detection.LocalObjectDetector( test_obj_detect = frigate.object_detection.LocalObjectDetector(
det_device=DetectorTypeEnum.cpu detector_config=CpuDetectorConfig()
) )
mock_det_api = mock_cputfl.return_value mock_det_api = mock_cputfl.return_value
@ -65,11 +69,11 @@ class TestLocalObjectDetector(unittest.TestCase):
TEST_DATA = np.zeros((1, 32, 32, 3), np.uint8) TEST_DATA = np.zeros((1, 32, 32, 3), np.uint8)
TEST_DETECT_RESULT = np.ndarray([1, 2, 4, 8, 16, 32]) TEST_DETECT_RESULT = np.ndarray([1, 2, 4, 8, 16, 32])
test_cfg = ModelConfig() test_cfg = CpuDetectorConfig()
test_cfg.input_tensor = InputTensorEnum.nchw test_cfg.model.input_tensor = InputTensorEnum.nchw
test_obj_detect = frigate.object_detection.LocalObjectDetector( test_obj_detect = frigate.object_detection.LocalObjectDetector(
det_device=DetectorTypeEnum.cpu, model_config=test_cfg detector_config=test_cfg
) )
mock_det_api = mock_cputfl.return_value mock_det_api = mock_cputfl.return_value
@ -109,9 +113,10 @@ class TestLocalObjectDetector(unittest.TestCase):
"label-5", "label-5",
] ]
test_cfg = CpuDetectorConfig()
test_cfg.model = ModelConfig()
test_obj_detect = frigate.object_detection.LocalObjectDetector( test_obj_detect = frigate.object_detection.LocalObjectDetector(
det_device=DetectorTypeEnum.cpu, detector_config=test_cfg,
model_config=ModelConfig(),
labels=TEST_LABEL_FILE, labels=TEST_LABEL_FILE,
) )

View File

@ -14,8 +14,9 @@ import numpy as np
import cv2 import cv2
from setproctitle import setproctitle from setproctitle import setproctitle
from frigate.config import CameraConfig, DetectConfig, PixelFormatEnum from frigate.config import CameraConfig, DetectConfig
from frigate.const import CACHE_DIR from frigate.const import CACHE_DIR
from frigate.enums import PixelFormatEnum
from frigate.object_detection import RemoteObjectDetector from frigate.object_detection import RemoteObjectDetector
from frigate.log import LogPipe from frigate.log import LogPipe
from frigate.motion import MotionDetector from frigate.motion import MotionDetector