mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-02 17:25:22 +03:00
integrate yolov5 and yolov5_pytorch
This commit is contained in:
parent
a292f272e9
commit
112820826f
@ -11,10 +11,11 @@ services:
|
||||
volumes:
|
||||
- /etc/localtime:/etc/localtime:ro
|
||||
- .:/lab/frigate:cached
|
||||
- ./config/config.yml:/config/config.yml:ro
|
||||
- ./config/config.yml:/config/config.yml:rw
|
||||
- ./debug:/media/frigate
|
||||
- /dev/bus/usb:/dev/bus/usb
|
||||
- /dev/dri:/dev/dri # for intel hwaccel, needs to be updated for your hardware
|
||||
- ./frigate:/opt/frigate/frigate
|
||||
#- /dev/bus/usb:/dev/bus/usb
|
||||
#- /dev/dri:/dev/dri # for intel hwaccel, needs to be updated for your hardware
|
||||
ports:
|
||||
- "1935:1935"
|
||||
- "5000:5000"
|
||||
|
||||
@ -26,3 +26,40 @@ Models for both CPU and EdgeTPU (Coral) are bundled in the image. You can use yo
|
||||
- Labels: `/labelmap.txt`
|
||||
|
||||
You also need to update the [model config](/configuration/advanced#model) if they differ from the defaults.
|
||||
|
||||
You can also try improving the speed using a YOLOv3-tiny model, quantized to work on the edge TPU.
|
||||
|
||||
A compiled model exists [here](https://github.com/guichristmann/edge-tpu-tiny-yolo/tree/master/models)
|
||||
|
||||
Add it as a volume mount in your docker-compose file:
|
||||
```yaml
|
||||
volumes:
|
||||
- /path/to/quant_coco-tiny-v3-relu_edgetpu.tflite:/edgetpu_model.tflite
|
||||
```
|
||||
|
||||
And then set the configuration for the model in config.yml:
|
||||
|
||||
```yaml
|
||||
model:
|
||||
# Required: height of the trained model
|
||||
height: 416
|
||||
# Required: width of the trained model
|
||||
width: 416
|
||||
# Required: type of model (ssd or yolo)
|
||||
model_type: 'yolo'
|
||||
# Required: path of label map
|
||||
label_path: '/labelmap.txt'
|
||||
# Optional: (but required for yolo) - anchors, comma separated
|
||||
anchors: '10,14, 23,27, 37,58, 81,82, 135,169, 344,319'
|
||||
```
|
||||
|
||||
### Customizing the Labelmap
|
||||
|
||||
The labelmap can be customized to your needs. A common reason to do this is to combine multiple object types that are easily confused when you don't need to be as granular such as car/truck. You must retain the same number of labels, but you can change the names. To change:
|
||||
|
||||
- Download the [COCO labelmap](https://dl.google.com/coral/canned_models/coco_labels.txt)
|
||||
- Modify the label names as desired. For example, change `7 truck` to `7 car`
|
||||
- Mount the new file at `/labelmap.txt` in the container with an additional volume
|
||||
```
|
||||
-v ./config/labelmap.txt:/labelmap.txt
|
||||
```
|
||||
|
||||
@ -2,6 +2,7 @@ import json
|
||||
import logging
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import pprint
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
@ -158,8 +159,7 @@ class FrigateApp:
|
||||
self.mqtt_relay.start()
|
||||
|
||||
def start_detectors(self):
|
||||
model_path = self.config.model.path
|
||||
model_shape = (self.config.model.height, self.config.model.width)
|
||||
|
||||
for name in self.config.cameras.keys():
|
||||
self.detection_out_events[name] = mp.Event()
|
||||
|
||||
@ -188,8 +188,7 @@ class FrigateApp:
|
||||
name,
|
||||
self.detection_queue,
|
||||
self.detection_out_events,
|
||||
model_path,
|
||||
model_shape,
|
||||
self.config.model,
|
||||
"cpu",
|
||||
detector.num_threads,
|
||||
)
|
||||
@ -198,8 +197,7 @@ class FrigateApp:
|
||||
name,
|
||||
self.detection_queue,
|
||||
self.detection_out_events,
|
||||
model_path,
|
||||
model_shape,
|
||||
self.config.model,
|
||||
detector.device,
|
||||
detector.num_threads,
|
||||
)
|
||||
@ -310,6 +308,7 @@ class FrigateApp:
|
||||
try:
|
||||
try:
|
||||
self.init_config()
|
||||
pprint.pprint(self.config)
|
||||
except Exception as e:
|
||||
print("*************************************************************")
|
||||
print("*************************************************************")
|
||||
|
||||
@ -653,6 +653,8 @@ class DatabaseConfig(FrigateBaseModel):
|
||||
|
||||
class ModelConfig(FrigateBaseModel):
|
||||
path: Optional[str] = Field(title="Custom Object detection model path.")
|
||||
type: str = Field(default="ssd", title="Model type")
|
||||
anchors: Optional[str] = Field(default="", title="Optional but required for yolo3")
|
||||
labelmap_path: Optional[str] = Field(title="Label map for custom object detector.")
|
||||
width: int = Field(default=320, title="Object detection model input width.")
|
||||
height: int = Field(default=320, title="Object detection model input height.")
|
||||
|
||||
@ -14,10 +14,33 @@ from setproctitle import setproctitle
|
||||
from tflite_runtime.interpreter import load_delegate
|
||||
|
||||
from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels
|
||||
from frigate.yolov5.edgetpumodel import EdgeTPUModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def load_labels(path, encoding='utf-8'):
|
||||
"""Loads labels from file (with or without index numbers).
|
||||
Args:
|
||||
path: path to label file.
|
||||
encoding: label file encoding.
|
||||
Returns:
|
||||
Dictionary mapping indices to labels.
|
||||
"""
|
||||
logger.warn(f"Loaded labels from {path}")
|
||||
with open(path, 'r', encoding=encoding) as f:
|
||||
lines = f.readlines()
|
||||
|
||||
if not lines:
|
||||
return {}
|
||||
|
||||
if lines[0].split(' ', maxsplit=1)[0].isdigit():
|
||||
pairs = [line.split(' ', maxsplit=1) for line in lines]
|
||||
return {int(index): label.strip() for index, label in pairs}
|
||||
else:
|
||||
return {index: line.strip() for index, line in enumerate(lines)}
|
||||
|
||||
|
||||
class ObjectDetector(ABC):
|
||||
@abstractmethod
|
||||
def detect(self, tensor_input, threshold=0.4):
|
||||
@ -25,13 +48,22 @@ class ObjectDetector(ABC):
|
||||
|
||||
|
||||
class LocalObjectDetector(ObjectDetector):
|
||||
def __init__(self, tf_device=None, model_path=None, num_threads=3, labels=None):
|
||||
def __init__(self, model_config, tf_device=None, num_threads=3):
|
||||
self.fps = EventsPerSecond()
|
||||
if labels is None:
|
||||
self.labels = {}
|
||||
else:
|
||||
self.labels = load_labels(labels)
|
||||
if model_config.labelmap_path:
|
||||
self.labels = load_labels(model_config.labelmap_path)
|
||||
self.model_config = model_config
|
||||
|
||||
if self.model_config.type == 'yolov5':
|
||||
model = EdgeTPUModel(model_config.path, None)
|
||||
input_size = model.get_image_size()
|
||||
x = (255 * np.random.random((3, *input_size))).astype(np.uint8)
|
||||
model.forward(x)
|
||||
self.yolov5Model = model
|
||||
if self.model_config.type == 'yolov5_pytorch':
|
||||
from frigate.yolov5_pytorch import ObjectDetection as Yolov5ObjectDetector
|
||||
self.yolov5ObjectDetector = Yolov5ObjectDetector()
|
||||
|
||||
device_config = {"device": "usb"}
|
||||
if not tf_device is None:
|
||||
device_config = {"device": tf_device}
|
||||
@ -44,7 +76,7 @@ class LocalObjectDetector(ObjectDetector):
|
||||
edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
|
||||
logger.info("TPU found")
|
||||
self.interpreter = tflite.Interpreter(
|
||||
model_path=model_path or "/edgetpu_model.tflite",
|
||||
model_path=model_config.path or "/edgetpu_model.tflite",
|
||||
experimental_delegates=[edge_tpu_delegate],
|
||||
)
|
||||
except ValueError:
|
||||
@ -57,7 +89,7 @@ class LocalObjectDetector(ObjectDetector):
|
||||
"CPU detectors are not recommended and should only be used for testing or for trial purposes."
|
||||
)
|
||||
self.interpreter = tflite.Interpreter(
|
||||
model_path=model_path or "/cpu_model.tflite", num_threads=num_threads
|
||||
model_path=model_config.path or "/cpu_model.tflite", num_threads=num_threads
|
||||
)
|
||||
|
||||
self.interpreter.allocate_tensors()
|
||||
@ -65,6 +97,11 @@ class LocalObjectDetector(ObjectDetector):
|
||||
self.tensor_input_details = self.interpreter.get_input_details()
|
||||
self.tensor_output_details = self.interpreter.get_output_details()
|
||||
|
||||
|
||||
if model_config.anchors != "":
|
||||
anchors = [float(x) for x in model_config.anchors.split(',')]
|
||||
self.anchors = np.array(anchors).reshape(-1, 2)
|
||||
|
||||
def detect(self, tensor_input, threshold=0.4):
|
||||
detections = []
|
||||
|
||||
@ -79,7 +116,104 @@ class LocalObjectDetector(ObjectDetector):
|
||||
self.fps.update()
|
||||
return detections
|
||||
|
||||
def sigmoid(self, x):
|
||||
return 1. / (1 + np.exp(-x))
|
||||
|
||||
def detect_raw(self, tensor_input):
|
||||
if self.model_config.type == "ssd":
|
||||
raw_detections = self.detect_ssd(tensor_input)
|
||||
elif self.model_config.type == "yolov3":
|
||||
raw_detections = self.detect_yolov3(tensor_input)
|
||||
elif self.model_config.type == "yolov5":
|
||||
raw_detections = self.detect_yolov5(tensor_input)
|
||||
elif self.model_config.type == "yolov5_pytorch":
|
||||
raw_detections = self.detect_yolov5_pytorch(tensor_input)
|
||||
else:
|
||||
logger.error(f"Unsupported model type {self.model_config.type}")
|
||||
raw_detections = []
|
||||
return raw_detections
|
||||
|
||||
|
||||
def get_interpreter_details(self):
|
||||
# Get input and output tensor details
|
||||
input_details = self.interpreter.get_input_details()
|
||||
output_details = self.interpreter.get_output_details()
|
||||
input_shape = input_details[0]["shape"]
|
||||
return input_details, output_details, input_shape
|
||||
|
||||
# from util.py in https://github.com/guichristmann/edge-tpu-tiny-yolo
|
||||
def featuresToBoxes(self, outputs, anchors, n_classes, net_input_shape):
|
||||
grid_shape = outputs.shape[1:3]
|
||||
n_anchors = len(anchors)
|
||||
|
||||
# Numpy screwaround to get the boxes in reasonable amount of time
|
||||
grid_y = np.tile(np.arange(grid_shape[0]).reshape(-1, 1), grid_shape[0]).reshape(1, grid_shape[0], grid_shape[0], 1).astype(np.float32)
|
||||
grid_x = grid_y.copy().T.reshape(1, grid_shape[0], grid_shape[1], 1).astype(np.float32)
|
||||
outputs = outputs.reshape(1, grid_shape[0], grid_shape[1], n_anchors, -1)
|
||||
_anchors = anchors.reshape(1, 1, 3, 2).astype(np.float32)
|
||||
|
||||
# Get box parameters from network output and apply transformations
|
||||
bx = (self.sigmoid(outputs[..., 0]) + grid_x) / grid_shape[0]
|
||||
by = (self.sigmoid(outputs[..., 1]) + grid_y) / grid_shape[1]
|
||||
# Should these be inverted?
|
||||
bw = np.multiply(_anchors[..., 0] / net_input_shape[1], np.exp(outputs[..., 2]))
|
||||
bh = np.multiply(_anchors[..., 1] / net_input_shape[2], np.exp(outputs[..., 3]))
|
||||
|
||||
# Get the scores
|
||||
scores = self.sigmoid(np.expand_dims(outputs[..., 4], -1)) * \
|
||||
self.sigmoid(outputs[..., 5:])
|
||||
scores = scores.reshape(-1, n_classes)
|
||||
|
||||
# TODO: some of these are probably not needed but I don't understand numpy magic well enough
|
||||
bx = bx.flatten()
|
||||
by = (by.flatten()) * 1
|
||||
bw = bw.flatten()
|
||||
bh = bh.flatten() * 1
|
||||
half_bw = bw / 2.
|
||||
half_bh = bh / 2.
|
||||
|
||||
tl_x = np.multiply(bx - half_bw, 1)
|
||||
tl_y = np.multiply(by - half_bh, 1)
|
||||
br_x = np.multiply(bx + half_bw, 1)
|
||||
br_y = np.multiply(by + half_bh, 1)
|
||||
|
||||
# Get indices of boxes with score higher than threshold
|
||||
indices = np.argwhere(scores >= 0.5)
|
||||
selected_boxes = []
|
||||
selected_scores = []
|
||||
for i in indices:
|
||||
i = tuple(i)
|
||||
selected_boxes.append( ((tl_x[i[0]], tl_y[i[0]]), (br_x[i[0]], br_y[i[0]])) )
|
||||
selected_scores.append(scores[i])
|
||||
|
||||
selected_boxes = np.array(selected_boxes)
|
||||
selected_scores = np.array(selected_scores)
|
||||
selected_classes = indices[:, 1]
|
||||
|
||||
return selected_boxes, selected_scores, selected_classes
|
||||
|
||||
def detect_yolov5(self, tensor_input):
|
||||
tensor_input = np.squeeze(tensor_input, axis=0)
|
||||
results = self.yolov5Model.forward(tensor_input)
|
||||
print(self.yolov5Model.get_last_inference_time())
|
||||
det = results[0]
|
||||
|
||||
detections = np.zeros((20, 6), np.float32)
|
||||
i = 0
|
||||
for *xyxy, conf, cls in reversed(det):
|
||||
detections[i] = [
|
||||
int(cls)+1,
|
||||
float(conf),
|
||||
xyxy[1],
|
||||
xyxy[0],
|
||||
xyxy[3],
|
||||
xyxy[2],
|
||||
]
|
||||
i += 1
|
||||
|
||||
return detections
|
||||
|
||||
def detect_ssd(self, tensor_input):
|
||||
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
|
||||
self.interpreter.invoke()
|
||||
|
||||
@ -106,6 +240,69 @@ class LocalObjectDetector(ObjectDetector):
|
||||
|
||||
return detections
|
||||
|
||||
def detect_yolov5_pytorch(self, tensor_input):
|
||||
tensor_input = np.squeeze(tensor_input, axis=0)
|
||||
results = self.yolov5ObjectDetector.score_frame(tensor_input)
|
||||
labels, cord = results
|
||||
n = len(labels)
|
||||
detections = np.zeros((20, 6), np.float32)
|
||||
if n > 0:
|
||||
print(f"Total Targets: {n}")
|
||||
print(f"Labels: {set([self.yolov5ObjectDetector.class_to_label(label) for label in labels])}")
|
||||
for i in range(n):
|
||||
if i < 20:
|
||||
row = cord[i]
|
||||
score = float(row[4])
|
||||
if score < 0.4:
|
||||
break
|
||||
x1, y1, x2, y2 = row[0], row[1], row[2], row[3]
|
||||
label = self.yolov5ObjectDetector.class_to_label(labels[i])
|
||||
#detections[i] = [labels[i]+1, score, x1, y1, x2, y2]
|
||||
detections[i] = [labels[i]+1, score, y1, x1, y2, x2]
|
||||
print(detections[i])
|
||||
|
||||
return detections
|
||||
|
||||
|
||||
def detect_yolov3(self, tensor_input):
|
||||
input_details, output_details, net_input_shape = \
|
||||
self.get_interpreter_details()
|
||||
|
||||
self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
|
||||
self.interpreter.invoke()
|
||||
|
||||
# for yolo, it's a little diffrent
|
||||
out1 = self.interpreter.get_tensor(self.tensor_output_details[0]['index'])
|
||||
out2 = self.interpreter.get_tensor(self.tensor_output_details[1]['index'])
|
||||
|
||||
# Dequantize output (tpu only)
|
||||
o1_scale, o1_zero = self.tensor_output_details[0]['quantization']
|
||||
out1 = (out1.astype(np.float32) - o1_zero) * o1_scale
|
||||
o2_scale, o2_zero = self.tensor_output_details[1]['quantization']
|
||||
out2 = (out2.astype(np.float32) - o2_zero) * o2_scale
|
||||
|
||||
num_classes = len(self.labels)
|
||||
_boxes1, _scores1, _classes1 = self.featuresToBoxes(out1, self.anchors[[3, 4, 5]], len(self.labels), net_input_shape)
|
||||
_boxes2, _scores2, _classes2 = self.featuresToBoxes(out2, self.anchors[[1, 2, 3]], len(self.labels), net_input_shape)
|
||||
|
||||
if _boxes1.shape[0] == 0:
|
||||
_boxes1 = np.empty([0, 2, 2])
|
||||
_scores1 = np.empty([0,])
|
||||
_classes1 = np.empty([0,])
|
||||
if _boxes2.shape[0] == 0:
|
||||
_boxes2 = np.empty([0, 2, 2])
|
||||
_scores2 = np.empty([0,])
|
||||
_classes2 = np.empty([0,])
|
||||
boxes = np.append(_boxes1, _boxes2, axis=0)
|
||||
scores = np.append(_scores1, _scores2, axis=0)
|
||||
label_codes = np.append(_classes1, _classes2, axis=0)
|
||||
|
||||
detections = np.zeros((20,6), np.float32)
|
||||
for i, score in enumerate(scores):
|
||||
if i < 20:
|
||||
detections[i] = [label_codes[i], score, boxes[i][0][1], boxes[i][0][0], boxes[i][1][1], boxes[i][1][0]]
|
||||
|
||||
return detections
|
||||
|
||||
def run_detector(
|
||||
name: str,
|
||||
@ -113,8 +310,7 @@ def run_detector(
|
||||
out_events: Dict[str, mp.Event],
|
||||
avg_speed,
|
||||
start,
|
||||
model_path,
|
||||
model_shape,
|
||||
model_config,
|
||||
tf_device,
|
||||
num_threads,
|
||||
):
|
||||
@ -134,7 +330,7 @@ def run_detector(
|
||||
|
||||
frame_manager = SharedMemoryFrameManager()
|
||||
object_detector = LocalObjectDetector(
|
||||
tf_device=tf_device, model_path=model_path, num_threads=num_threads
|
||||
model_config, tf_device=tf_device, num_threads=num_threads
|
||||
)
|
||||
|
||||
outputs = {}
|
||||
@ -149,7 +345,7 @@ def run_detector(
|
||||
except queue.Empty:
|
||||
continue
|
||||
input_frame = frame_manager.get(
|
||||
connection_id, (1, model_shape[0], model_shape[1], 3)
|
||||
connection_id, (1, model_config.height, model_config.width, 3)
|
||||
)
|
||||
|
||||
if input_frame is None:
|
||||
@ -172,8 +368,7 @@ class EdgeTPUProcess:
|
||||
name,
|
||||
detection_queue,
|
||||
out_events,
|
||||
model_path,
|
||||
model_shape,
|
||||
model_config,
|
||||
tf_device=None,
|
||||
num_threads=3,
|
||||
):
|
||||
@ -183,10 +378,11 @@ class EdgeTPUProcess:
|
||||
self.avg_inference_speed = mp.Value("d", 0.01)
|
||||
self.detection_start = mp.Value("d", 0.0)
|
||||
self.detect_process = None
|
||||
self.model_path = model_path
|
||||
self.model_shape = model_shape
|
||||
self.model_path = model_config.path
|
||||
self.model_shape = (model_config.height, model_config.width)
|
||||
self.tf_device = tf_device
|
||||
self.num_threads = num_threads
|
||||
self.model_config = model_config
|
||||
self.start_or_restart()
|
||||
|
||||
def stop(self):
|
||||
@ -211,8 +407,7 @@ class EdgeTPUProcess:
|
||||
self.out_events,
|
||||
self.avg_inference_speed,
|
||||
self.detection_start,
|
||||
self.model_path,
|
||||
self.model_shape,
|
||||
self.model_config,
|
||||
self.tf_device,
|
||||
self.num_threads,
|
||||
),
|
||||
|
||||
318
frigate/yolov5/edgetpumodel.py
Normal file
318
frigate/yolov5/edgetpumodel.py
Normal file
@ -0,0 +1,318 @@
|
||||
import time
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
|
||||
import yaml
|
||||
import numpy as np
|
||||
import pycoral.utils.edgetpu as etpu
|
||||
from pycoral.adapters import common
|
||||
from frigate.yolov5.nms import non_max_suppression
|
||||
import cv2
|
||||
import json
|
||||
import tflite_runtime.interpreter as tflite
|
||||
from frigate.yolov5.utils import plot_one_box, Colors, get_image_tensor
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger("EdgeTPUModel")
|
||||
|
||||
|
||||
class EdgeTPUModel:
|
||||
|
||||
def __init__(self, model_file, names_file, conf_thresh=0.25, iou_thresh=0.45, desktop=True, filter_classes=None,
|
||||
agnostic_nms=False, max_det=1000):
|
||||
"""
|
||||
Creates an object for running a Yolov5 model on an EdgeTPU or a Desktop
|
||||
|
||||
Inputs:
|
||||
- model_file: path to edgetpu-compiled tflite file
|
||||
- names_file: yaml names file (yolov5 format)
|
||||
- conf_thresh: detection threshold
|
||||
- iou_thresh: NMS threshold
|
||||
- desktop: option to run model on a desktop
|
||||
- filter_classes: only output certain classes
|
||||
- agnostic_nms: use class-agnostic NMS
|
||||
- max_det: max number of detections
|
||||
"""
|
||||
|
||||
model_file = os.path.abspath(model_file)
|
||||
|
||||
if not model_file.endswith('tflite'):
|
||||
model_file += ".tflite"
|
||||
|
||||
self.model_file = model_file
|
||||
self.conf_thresh = conf_thresh
|
||||
self.iou_thresh = iou_thresh
|
||||
self.desktop = desktop
|
||||
self.filter_classes = filter_classes
|
||||
self.agnostic_nms = agnostic_nms
|
||||
self.max_det = 1000
|
||||
|
||||
logger.info("Confidence threshold: {}".format(conf_thresh))
|
||||
logger.info("IOU threshold: {}".format(iou_thresh))
|
||||
|
||||
self.inference_time = None
|
||||
self.nms_time = None
|
||||
self.interpreter = None
|
||||
self.colors = Colors() # create instance for 'from utils.plots import colors'
|
||||
|
||||
#self.get_names(names_file)
|
||||
self.names = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light',
|
||||
'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
|
||||
'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
|
||||
'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard',
|
||||
'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
|
||||
'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
|
||||
'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
|
||||
'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear',
|
||||
'hair drier', 'toothbrush']
|
||||
self.make_interpreter()
|
||||
self.get_image_size()
|
||||
|
||||
def get_names(self, path):
|
||||
"""
|
||||
Load a names file
|
||||
|
||||
Inputs:
|
||||
- path: path to names file in yaml format
|
||||
"""
|
||||
|
||||
with open(path, 'r') as f:
|
||||
cfg = yaml.load(f, Loader=yaml.SafeLoader)
|
||||
|
||||
names = cfg['names']
|
||||
logger.info("Loaded {} classes".format(len(names)))
|
||||
|
||||
self.names = names
|
||||
|
||||
def make_interpreter(self):
|
||||
"""
|
||||
Internal function that loads the tflite file and creates
|
||||
the interpreter that deals with the EdgeTPU hardware.
|
||||
"""
|
||||
# Load the model and allocate
|
||||
# Choose desktop or EdgTPU
|
||||
if self.desktop:
|
||||
self.interpreter = tflite.Interpreter(self.model_file)
|
||||
else:
|
||||
self.interpreter = etpu.make_interpreter(self.model_file)
|
||||
self.interpreter.allocate_tensors()
|
||||
|
||||
self.input_details = self.interpreter.get_input_details()
|
||||
self.output_details = self.interpreter.get_output_details()
|
||||
|
||||
logger.debug(self.input_details)
|
||||
logger.debug(self.output_details)
|
||||
|
||||
self.input_zero = self.input_details[0]['quantization'][1]
|
||||
self.input_scale = self.input_details[0]['quantization'][0]
|
||||
self.output_zero = self.output_details[0]['quantization'][1]
|
||||
self.output_scale = self.output_details[0]['quantization'][0]
|
||||
|
||||
# If the model isn't quantized then these should be zero
|
||||
# Check against small epsilon to avoid comparing float/int
|
||||
if self.input_scale < 1e-9:
|
||||
self.input_scale = 1.0
|
||||
|
||||
if self.output_scale < 1e-9:
|
||||
self.output_scale = 1.0
|
||||
|
||||
logger.debug("Input scale: {}".format(self.input_scale))
|
||||
logger.debug("Input zero: {}".format(self.input_zero))
|
||||
logger.debug("Output scale: {}".format(self.output_scale))
|
||||
logger.debug("Output zero: {}".format(self.output_zero))
|
||||
|
||||
logger.info("Successfully loaded {}".format(self.model_file))
|
||||
|
||||
def get_image_size(self):
|
||||
"""
|
||||
Returns the expected size of the input image tensor
|
||||
"""
|
||||
if self.interpreter is not None:
|
||||
self.input_size = common.input_size(self.interpreter)
|
||||
logger.debug("Expecting input shape: {}".format(self.input_size))
|
||||
return self.input_size
|
||||
else:
|
||||
logger.warn("Interpreter is not yet loaded")
|
||||
|
||||
def predict(self, image_path, save_img=True, save_txt=True):
|
||||
logger.info("Attempting to load {}".format(image_path))
|
||||
|
||||
full_image, net_image, pad = get_image_tensor(image_path, self.input_size[0])
|
||||
pred = self.forward(net_image)
|
||||
logger.info("Inference time: {}".format(self.inference_time))
|
||||
|
||||
base, ext = os.path.splitext(image_path)
|
||||
|
||||
output_path = base + "_detect" + ext
|
||||
det = self.process_predictions(pred[0], full_image, pad, output_path, save_img=save_img, save_txt=save_txt)
|
||||
|
||||
return det
|
||||
|
||||
def forward(self, x: np.ndarray, with_nms=True) -> np.ndarray:
|
||||
"""
|
||||
Predict function using the EdgeTPU
|
||||
|
||||
Inputs:
|
||||
x: (C, H, W) image tensor
|
||||
with_nms: apply NMS on output
|
||||
|
||||
Returns:
|
||||
prediction array (with or without NMS applied)
|
||||
|
||||
"""
|
||||
tstart = time.time()
|
||||
# Transpose if C, H, W
|
||||
if x.shape[0] == 3:
|
||||
x = x.transpose((1, 2, 0))
|
||||
|
||||
x = x.astype('float32')
|
||||
|
||||
# Scale input, conversion is: real = (int_8 - zero)*scale
|
||||
x = (x / self.input_scale) + self.input_zero
|
||||
x = x[np.newaxis].astype(np.uint8)
|
||||
|
||||
self.interpreter.set_tensor(self.input_details[0]['index'], x)
|
||||
self.interpreter.invoke()
|
||||
|
||||
# Scale output
|
||||
result = (common.output_tensor(self.interpreter, 0).astype('float32') - self.output_zero) * self.output_scale
|
||||
self.inference_time = time.time() - tstart
|
||||
|
||||
if with_nms:
|
||||
|
||||
tstart = time.time()
|
||||
nms_result = non_max_suppression(result, self.conf_thresh, self.iou_thresh, self.filter_classes,
|
||||
self.agnostic_nms, max_det=self.max_det)
|
||||
self.nms_time = time.time() - tstart
|
||||
|
||||
return nms_result
|
||||
|
||||
else:
|
||||
return result
|
||||
|
||||
def get_last_inference_time(self, with_nms=True):
|
||||
"""
|
||||
Returns a tuple containing most recent inference and NMS time
|
||||
"""
|
||||
res = [self.inference_time]
|
||||
|
||||
if with_nms:
|
||||
res.append(self.nms_time)
|
||||
|
||||
return res
|
||||
|
||||
def get_scaled_coords(self, xyxy, output_image, pad):
|
||||
"""
|
||||
Converts raw prediction bounding box to orginal
|
||||
image coordinates.
|
||||
|
||||
Args:
|
||||
xyxy: array of boxes
|
||||
output_image: np array
|
||||
pad: padding due to image resizing (pad_w, pad_h)
|
||||
"""
|
||||
pad_w, pad_h = pad
|
||||
in_h, in_w = self.input_size
|
||||
out_h, out_w, _ = output_image.shape
|
||||
|
||||
ratio_w = out_w / (in_w - pad_w)
|
||||
ratio_h = out_h / (in_h - pad_h)
|
||||
|
||||
out = []
|
||||
for coord in xyxy:
|
||||
x1, y1, x2, y2 = coord
|
||||
|
||||
x1 *= in_w * ratio_w
|
||||
x2 *= in_w * ratio_w
|
||||
y1 *= in_h * ratio_h
|
||||
y2 *= in_h * ratio_h
|
||||
|
||||
x1 = max(0, x1)
|
||||
x2 = min(out_w, x2)
|
||||
|
||||
y1 = max(0, y1)
|
||||
y2 = min(out_h, y2)
|
||||
|
||||
out.append((x1, y1, x2, y2))
|
||||
|
||||
return np.array(out).astype(int)
|
||||
|
||||
def process_predictions2(self, det):
|
||||
"""
|
||||
Process predictions and optionally output an image with annotations
|
||||
"""
|
||||
if len(det):
|
||||
# Rescale boxes from img_size to im0 size
|
||||
# x1, y1, x2, y2=
|
||||
#det[:, :4] = self.get_scaled_coords(det[:, :4], output_image, pad)
|
||||
output = {}
|
||||
#base, ext = os.path.splitext(output_path)
|
||||
|
||||
s = ""
|
||||
|
||||
# Print results
|
||||
for c in np.unique(det[:, -1]):
|
||||
n = (det[:, -1] == c).sum() # detections per class
|
||||
s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string
|
||||
|
||||
if s != "":
|
||||
s = s.strip()
|
||||
s = s[:-1]
|
||||
|
||||
logger.info("Detected: {}".format(s))
|
||||
|
||||
for *xyxy, conf, cls in reversed(det):
|
||||
output = {}
|
||||
output['box'] = xyxy
|
||||
output['conf'] = conf
|
||||
output['cls'] = cls
|
||||
output['cls_name'] = self.names[c]
|
||||
return output
|
||||
|
||||
def process_predictions(self, det, output_image=None, pad=(0, 0), output_path="detection.jpg", save_img=False, save_txt=False,
|
||||
hide_labels=False, hide_conf=False):
|
||||
"""
|
||||
Process predictions and optionally output an image with annotations
|
||||
"""
|
||||
if len(det):
|
||||
# Rescale boxes from img_size to im0 size
|
||||
# x1, y1, x2, y2=
|
||||
det[:, :4] = self.get_scaled_coords(det[:, :4], output_image, pad)
|
||||
output = {}
|
||||
base, ext = os.path.splitext(output_path)
|
||||
|
||||
s = ""
|
||||
|
||||
# Print results
|
||||
for c in np.unique(det[:, -1]):
|
||||
n = (det[:, -1] == c).sum() # detections per class
|
||||
s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string
|
||||
|
||||
if s != "":
|
||||
s = s.strip()
|
||||
s = s[:-1]
|
||||
|
||||
logger.info("Detected: {}".format(s))
|
||||
|
||||
# Write results
|
||||
for *xyxy, conf, cls in reversed(det):
|
||||
if save_img: # Add bbox to image
|
||||
c = int(cls) # integer class
|
||||
label = None if hide_labels else (self.names[c] if hide_conf else f'{self.names[c]} {conf:.2f}')
|
||||
output_image = plot_one_box(xyxy, output_image, label=label, color=self.colors(c, True))
|
||||
if save_txt:
|
||||
output[base] = {}
|
||||
output[base]['box'] = xyxy
|
||||
output[base]['conf'] = conf
|
||||
output[base]['cls'] = cls
|
||||
output[base]['cls_name'] = self.names[c]
|
||||
|
||||
if save_txt:
|
||||
output_txt = base + "txt"
|
||||
with open(output_txt, 'w') as f:
|
||||
json.dump(output, f, indent=1)
|
||||
if save_img:
|
||||
cv2.imwrite(output_path, output_image)
|
||||
|
||||
return det
|
||||
142
frigate/yolov5/nms.py
Normal file
142
frigate/yolov5/nms.py
Normal file
@ -0,0 +1,142 @@
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
|
||||
def xywh2xyxy(x):
|
||||
# Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
|
||||
y = np.copy(x)
|
||||
y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x
|
||||
y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y
|
||||
y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x
|
||||
y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y
|
||||
return y
|
||||
|
||||
|
||||
def nms(dets, scores, thresh):
|
||||
'''
|
||||
dets is a numpy array : num_dets, 4
|
||||
scores ia nump array : num_dets,
|
||||
'''
|
||||
|
||||
x1 = dets[:, 0]
|
||||
y1 = dets[:, 1]
|
||||
x2 = dets[:, 2]
|
||||
y2 = dets[:, 3]
|
||||
|
||||
areas = (x2 - x1 + 1e-9) * (y2 - y1 + 1e-9)
|
||||
order = scores.argsort()[::-1] # get boxes with more ious first
|
||||
|
||||
keep = []
|
||||
while order.size > 0:
|
||||
i = order[0] # pick maxmum iou box
|
||||
other_box_ids = order[1:]
|
||||
keep.append(i)
|
||||
|
||||
xx1 = np.maximum(x1[i], x1[other_box_ids])
|
||||
yy1 = np.maximum(y1[i], y1[other_box_ids])
|
||||
xx2 = np.minimum(x2[i], x2[other_box_ids])
|
||||
yy2 = np.minimum(y2[i], y2[other_box_ids])
|
||||
|
||||
# print(list(zip(xx1, yy1, xx2, yy2)))
|
||||
|
||||
w = np.maximum(0.0, xx2 - xx1 + 1e-9) # maximum width
|
||||
h = np.maximum(0.0, yy2 - yy1 + 1e-9) # maxiumum height
|
||||
inter = w * h
|
||||
|
||||
ovr = inter / (areas[i] + areas[other_box_ids] - inter)
|
||||
|
||||
inds = np.where(ovr <= thresh)[0]
|
||||
order = order[inds + 1]
|
||||
|
||||
return np.array(keep)
|
||||
|
||||
|
||||
def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False,
|
||||
labels=(), max_det=300):
|
||||
nc = prediction.shape[2] - 5 # number of classes
|
||||
xc = prediction[..., 4] > conf_thres # candidates
|
||||
|
||||
# Checks
|
||||
assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
|
||||
assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0'
|
||||
|
||||
# Settings
|
||||
min_wh, max_wh = 2, 4096 # (pixels) minimum and maximum box width and height
|
||||
max_nms = 30000 # maximum number of boxes into torchvision.ops.nms()
|
||||
time_limit = 10.0 # seconds to quit after
|
||||
redundant = True # require redundant detections
|
||||
multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img)
|
||||
merge = False # use merge-NMS
|
||||
|
||||
t = time.time()
|
||||
output = [np.zeros((0, 6))] * prediction.shape[0]
|
||||
for xi, x in enumerate(prediction): # image index, image inference
|
||||
# Apply constraints
|
||||
# x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height
|
||||
x = x[xc[xi]] # confidence
|
||||
|
||||
# Cat apriori labels if autolabelling
|
||||
if labels and len(labels[xi]):
|
||||
l = labels[xi]
|
||||
v = np.zeros((len(l), nc + 5))
|
||||
v[:, :4] = l[:, 1:5] # box
|
||||
v[:, 4] = 1.0 # conf
|
||||
v[range(len(l)), l[:, 0].long() + 5] = 1.0 # cls
|
||||
x = np.concatenate((x, v), 0)
|
||||
|
||||
# If none remain process next image
|
||||
if not x.shape[0]:
|
||||
continue
|
||||
|
||||
# Compute conf
|
||||
x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf
|
||||
|
||||
# Box (center x, center y, width, height) to (x1, y1, x2, y2)
|
||||
box = xywh2xyxy(x[:, :4])
|
||||
|
||||
# Detections matrix nx6 (xyxy, conf, cls)
|
||||
if multi_label:
|
||||
i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T
|
||||
x = np.concatenate((box[i], x[i, j + 5, None], j[:, None].astype(float)), axis=1)
|
||||
else: # best class only
|
||||
conf = np.amax(x[:, 5:], axis=1, keepdims=True)
|
||||
j = np.argmax(x[:, 5:], axis=1).reshape(conf.shape)
|
||||
x = np.concatenate((box, conf, j.astype(float)), axis=1)[conf.flatten() > conf_thres]
|
||||
|
||||
# Filter by class
|
||||
if classes is not None:
|
||||
x = x[(x[:, 5:6] == np.array(classes)).any(1)]
|
||||
|
||||
# Apply finite constraint
|
||||
# if not torch.isfinite(x).all():
|
||||
# x = x[torch.isfinite(x).all(1)]
|
||||
|
||||
# Check shape
|
||||
n = x.shape[0] # number of boxes
|
||||
if not n: # no boxes
|
||||
continue
|
||||
elif n > max_nms: # excess boxes
|
||||
x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence
|
||||
|
||||
# Batched NMS
|
||||
c = x[:, 5:6] * (0 if agnostic else max_wh) # classes
|
||||
boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores
|
||||
|
||||
i = nms(boxes, scores, iou_thres) # NMS
|
||||
|
||||
if i.shape[0] > max_det: # limit detections
|
||||
i = i[:max_det]
|
||||
if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean)
|
||||
# update boxes as boxes(i,4) = weights(i,n) * boxes(n,4)
|
||||
iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix
|
||||
weights = iou * scores[None] # box weights
|
||||
x[i, :4] = np.dot(weights, x[:, :4]).astype(float) / weights.sum(1, keepdim=True) # merged boxes
|
||||
if redundant:
|
||||
i = i[iou.sum(1) > 1] # require redundancy
|
||||
|
||||
output[xi] = x[i]
|
||||
if (time.time() - t) > time_limit:
|
||||
print(f'WARNING: NMS time limit {time_limit}s exceeded')
|
||||
break # time limit exceeded
|
||||
|
||||
return output
|
||||
120
frigate/yolov5/utils.py
Normal file
120
frigate/yolov5/utils.py
Normal file
@ -0,0 +1,120 @@
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
|
||||
class Colors:
|
||||
# Ultralytics color palette https://ultralytics.com/
|
||||
def __init__(self):
|
||||
# hex = matplotlib.colors.TABLEAU_COLORS.values()
|
||||
hex = ('FF3838', 'FF9D97', 'FF701F', 'FFB21D', 'CFD231', '48F90A', '92CC17', '3DDB86', '1A9334', '00D4BB',
|
||||
'2C99A8', '00C2FF', '344593', '6473FF', '0018EC', '8438FF', '520085', 'CB38FF', 'FF95C8', 'FF37C7')
|
||||
self.palette = [self.hex2rgb('#' + c) for c in hex]
|
||||
self.n = len(self.palette)
|
||||
|
||||
def __call__(self, i, bgr=False):
|
||||
c = self.palette[int(i) % self.n]
|
||||
return (c[2], c[1], c[0]) if bgr else c
|
||||
|
||||
@staticmethod
|
||||
def hex2rgb(h): # rgb order (PIL)
|
||||
return tuple(int(h[1 + i:1 + i + 2], 16) for i in (0, 2, 4))
|
||||
|
||||
|
||||
def plot_one_box(box, im, color=(128, 128, 128), txt_color=(255, 255, 255), label=None, line_width=3):
|
||||
# Plots one xyxy box on image im with label
|
||||
assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to plot_on_box() input image.'
|
||||
lw = line_width or max(int(min(im.size) / 200), 2) # line width
|
||||
|
||||
c1, c2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3]))
|
||||
|
||||
cv2.rectangle(im, c1, c2, color, thickness=lw, lineType=cv2.LINE_AA)
|
||||
if label:
|
||||
tf = max(lw - 1, 1) # font thickness
|
||||
txt_width, txt_height = cv2.getTextSize(label, 0, fontScale=lw / 3, thickness=tf)[0]
|
||||
c2 = c1[0] + txt_width, c1[1] - txt_height - 3
|
||||
cv2.rectangle(im, c1, c2, color, -1, cv2.LINE_AA) # filled
|
||||
cv2.putText(im, label, (c1[0], c1[1] - 2), 0, lw / 3, txt_color, thickness=tf, lineType=cv2.LINE_AA)
|
||||
return im
|
||||
|
||||
|
||||
def resize_and_pad(image, desired_size):
|
||||
old_size = image.shape[:2]
|
||||
ratio = float(desired_size / max(old_size))
|
||||
new_size = tuple([int(x * ratio) for x in old_size])
|
||||
|
||||
# new_size should be in (width, height) format
|
||||
|
||||
image = cv2.resize(image, (new_size[1], new_size[0]))
|
||||
|
||||
delta_w = desired_size - new_size[1]
|
||||
delta_h = desired_size - new_size[0]
|
||||
|
||||
pad = (delta_w, delta_h)
|
||||
|
||||
color = [100, 100, 100]
|
||||
new_im = cv2.copyMakeBorder(image, 0, delta_h, 0, delta_w, cv2.BORDER_CONSTANT,
|
||||
value=color)
|
||||
|
||||
return new_im, pad
|
||||
|
||||
|
||||
def get_image_tensor(img, max_size, debug=False):
|
||||
"""
|
||||
Reshapes an input image into a square with sides max_size
|
||||
"""
|
||||
if type(img) is str:
|
||||
img = cv2.imread(img)
|
||||
|
||||
resized, pad = resize_and_pad(img, max_size)
|
||||
resized = resized.astype(np.float32)
|
||||
|
||||
if debug:
|
||||
cv2.imwrite("intermediate.png", resized)
|
||||
|
||||
# Normalise!
|
||||
resized /= 255.0
|
||||
|
||||
return img, resized, pad
|
||||
|
||||
|
||||
def xyxy2xywh(x):
|
||||
# Convert nx4 boxes from [x1, y1, x2, y2] to [x, y, w, h] where xy1=top-left, xy2=bottom-right
|
||||
y = np.copy(x)
|
||||
y[:, 0] = (x[:, 0] + x[:, 2]) / 2 # x center
|
||||
y[:, 1] = (x[:, 1] + x[:, 3]) / 2 # y center
|
||||
y[:, 2] = x[:, 2] - x[:, 0] # width
|
||||
y[:, 3] = x[:, 3] - x[:, 1] # height
|
||||
return y
|
||||
|
||||
|
||||
def coco80_to_coco91_class(): # converts 80-index (val2014) to 91-index (paper)
|
||||
# https://tech.amikelive.com/node-718/what-object-categories-labels-are-in-coco-dataset/
|
||||
# a = np.loadtxt('data/coco.names', dtype='str', delimiter='\n')
|
||||
# b = np.loadtxt('data/coco_paper.names', dtype='str', delimiter='\n')
|
||||
# x1 = [list(a[i] == b).index(True) + 1 for i in range(80)] # darknet to coco
|
||||
# x2 = [list(b[i] == a).index(True) if any(b[i] == a) else None for i in range(91)] # coco to darknet
|
||||
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 27, 28, 31, 32, 33, 34,
|
||||
35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63,
|
||||
64, 65, 67, 70, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 84, 85, 86, 87, 88, 89, 90]
|
||||
return x
|
||||
|
||||
|
||||
def save_one_json(predn, jdict, path, class_map):
|
||||
# Save one JSON result {"image_id": 42, "category_id": 18, "bbox": [258.15, 41.29, 348.26, 243.78], "score": 0.236}
|
||||
image_id = int(path.stem) if path.stem.isnumeric() else path.stem
|
||||
|
||||
box = xyxy2xywh(predn[:, :4]) # xywh
|
||||
box[:, :2] -= box[:, 2:] / 2 # xy center to top-left corner
|
||||
|
||||
for p, b in zip(predn.tolist(), box.tolist()):
|
||||
jdict.append({'image_id': image_id,
|
||||
'category_id': class_map[int(p[5])],
|
||||
'bbox': [round(x, 3) for x in b],
|
||||
'score': round(p[4], 5)})
|
||||
111
frigate/yolov5_pytorch.py
Normal file
111
frigate/yolov5_pytorch.py
Normal file
@ -0,0 +1,111 @@
|
||||
import torch
|
||||
import numpy as np
|
||||
#import cv2
|
||||
from time import time
|
||||
import sys
|
||||
|
||||
|
||||
class ObjectDetection:
|
||||
"""
|
||||
The class performs generic object detection on a video file.
|
||||
It uses yolo5 pretrained model to make inferences and opencv2 to manage frames.
|
||||
Included Features:
|
||||
1. Reading and writing of video file using Opencv2
|
||||
2. Using pretrained model to make inferences on frames.
|
||||
3. Use the inferences to plot boxes on objects along with labels.
|
||||
Upcoming Features:
|
||||
"""
|
||||
def __init__(self):
|
||||
self.model = self.load_model()
|
||||
self.model.conf = 0.4 # set inference threshold at 0.3
|
||||
self.model.iou = 0.3 # set inference IOU threshold at 0.3
|
||||
#self.model.classes = [0] # set model to only detect "Person" class
|
||||
#self.model.classes = self.model.names
|
||||
self.classes = self.model.names
|
||||
self.found_lables = set() # set
|
||||
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
|
||||
def load_model(self):
|
||||
"""
|
||||
Function loads the yolo5 model from PyTorch Hub.
|
||||
"""
|
||||
#model = torch.hub.load('/media/frigate/yolov5', 'custom', path='/media/frigate/yolov5/yolov5l.pt', source='local')
|
||||
model = torch.hub.load('/media/frigate/yolov5', 'custom', path='/media/frigate/yolov5/yolov5s.pt', source='local')
|
||||
#model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
|
||||
#model = torch.hub.load('ultralytics/yolov3', 'yolov3', pretrained=True)
|
||||
return model
|
||||
|
||||
def class_to_label(self, x):
|
||||
"""
|
||||
For a given label value, return corresponding string label.
|
||||
:param x: numeric label
|
||||
:return: corresponding string label
|
||||
"""
|
||||
return self.classes[int(x)]
|
||||
|
||||
def score_frame(self, frame):
|
||||
"""
|
||||
function scores each frame of the video and returns results.
|
||||
:param frame: frame to be infered.
|
||||
:return: labels and coordinates of objects found.
|
||||
"""
|
||||
self.model.to(self.device)
|
||||
results = self.model(frame)
|
||||
labels, cord = results.xyxyn[0][:, -1].to('cpu').numpy(), results.xyxyn[0][:, :-1].to('cpu').numpy()
|
||||
return labels, cord
|
||||
|
||||
def plot_boxes(self, results, frame):
|
||||
"""
|
||||
plots boxes and labels on frame.
|
||||
:param results: inferences made by model
|
||||
:param frame: frame on which to make the plots
|
||||
:return: new frame with boxes and labels plotted.
|
||||
"""
|
||||
labels, cord = results
|
||||
n = len(labels)
|
||||
if n > 0:
|
||||
print(f"Total Targets: {n}")
|
||||
print(f"Labels: {set([self.class_to_label(label) for label in labels])}")
|
||||
x_shape, y_shape = frame.shape[1], frame.shape[0]
|
||||
for i in range(n):
|
||||
self.found_lables.add(self.class_to_label(labels[i]))
|
||||
row = cord[i]
|
||||
x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape)
|
||||
bgr = (0, 0, 255)
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 1)
|
||||
label = f"{int(row[4]*100)}"
|
||||
cv2.putText(frame, self.class_to_label(labels[i]), (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1)
|
||||
cv2.putText(frame, f"Total Targets: {n}", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||||
|
||||
return frame
|
||||
|
||||
def __call__(self):
|
||||
player = self.get_video_from_file() # create streaming service for application
|
||||
assert player.isOpened()
|
||||
x_shape = int(player.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
y_shape = int(player.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
four_cc = cv2.VideoWriter_fourcc(*"MJPG")
|
||||
out = cv2.VideoWriter(self.out_file, four_cc, 20, (x_shape, y_shape))
|
||||
fc = 0
|
||||
fps = 0
|
||||
tfc = int(player.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
tfcc = 0
|
||||
while True:
|
||||
fc += 1
|
||||
start_time = time()
|
||||
ret, frame = player.read()
|
||||
if not ret:
|
||||
break
|
||||
results = self.score_frame(frame)
|
||||
frame = self.plot_boxes(results, frame)
|
||||
end_time = time()
|
||||
fps += 1/np.round(end_time - start_time, 3)
|
||||
if fc == 10:
|
||||
fps = int(fps / 10)
|
||||
tfcc += fc
|
||||
fc = 0
|
||||
per_com = int(tfcc / tfc * 100)
|
||||
print(f"Frames Per Second : {fps} || Percentage Parsed : {per_com}")
|
||||
out.write(frame)
|
||||
print(f"Found labels: {self.found_lables}")
|
||||
player.release()
|
||||
Loading…
Reference in New Issue
Block a user