From 848c2da5df519b8e2fc89630c7320e35b94cc08c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20Franz=C3=A9n?= Date: Wed, 13 Apr 2022 20:18:14 +0200 Subject: [PATCH] Remove stuff --- frigate/config.py | 2 +- frigate/edgetpu.py | 80 ++----- frigate/yolov5/edgetpumodel.py | 318 --------------------------- frigate/yolov5/yolov5edgetpumodel.py | 163 ++++++++++++++ frigate/yolov5_pytorch.py | 111 ---------- 5 files changed, 180 insertions(+), 494 deletions(-) delete mode 100644 frigate/yolov5/edgetpumodel.py create mode 100644 frigate/yolov5/yolov5edgetpumodel.py delete mode 100644 frigate/yolov5_pytorch.py diff --git a/frigate/config.py b/frigate/config.py index 5fada12e6..c91bafc86 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -653,7 +653,7 @@ class DatabaseConfig(FrigateBaseModel): class ModelConfig(FrigateBaseModel): path: Optional[str] = Field(title="Custom Object detection model path.") - type: str = Field(default="ssd", title="Model type") + type: str = Field(default="ssd", title="Model type ssd, yolov3 or yolov5") anchors: Optional[str] = Field(default="", title="Optional but required for yolo3") labelmap_path: Optional[str] = Field(title="Label map for custom object detector.") width: int = Field(default=320, title="Object detection model input width.") diff --git a/frigate/edgetpu.py b/frigate/edgetpu.py index a6b9b8484..943e04c18 100644 --- a/frigate/edgetpu.py +++ b/frigate/edgetpu.py @@ -14,33 +14,11 @@ from setproctitle import setproctitle from tflite_runtime.interpreter import load_delegate from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels -from frigate.yolov5.edgetpumodel import EdgeTPUModel +from frigate.yolov5.yolov5edgetpumodel import Yolov5EdgeTPUModel logger = logging.getLogger(__name__) -def load_labels(path, encoding='utf-8'): - """Loads labels from file (with or without index numbers). - Args: - path: path to label file. - encoding: label file encoding. - Returns: - Dictionary mapping indices to labels. - """ - logger.warn(f"Loaded labels from {path}") - with open(path, 'r', encoding=encoding) as f: - lines = f.readlines() - - if not lines: - return {} - - if lines[0].split(' ', maxsplit=1)[0].isdigit(): - pairs = [line.split(' ', maxsplit=1) for line in lines] - return {int(index): label.strip() for index, label in pairs} - else: - return {index: line.strip() for index, line in enumerate(lines)} - - class ObjectDetector(ABC): @abstractmethod def detect(self, tensor_input, threshold=0.4): @@ -54,16 +32,6 @@ class LocalObjectDetector(ObjectDetector): self.labels = load_labels(model_config.labelmap_path) self.model_config = model_config - if self.model_config.type == 'yolov5': - model = EdgeTPUModel(model_config.path, None) - input_size = model.get_image_size() - x = (255 * np.random.random((3, *input_size))).astype(np.uint8) - model.forward(x) - self.yolov5Model = model - if self.model_config.type == 'yolov5_pytorch': - from frigate.yolov5_pytorch import ObjectDetection as Yolov5ObjectDetector - self.yolov5ObjectDetector = Yolov5ObjectDetector() - device_config = {"device": "usb"} if not tf_device is None: device_config = {"device": tf_device} @@ -97,6 +65,16 @@ class LocalObjectDetector(ObjectDetector): self.tensor_input_details = self.interpreter.get_input_details() self.tensor_output_details = self.interpreter.get_output_details() + if self.model_config.type == 'yolov5': + cpu = True + if tf_device != "cpu": + cpu = False + model = Yolov5EdgeTPUModel(model_config.path, cpu) + input_size = model.get_image_size() # we should probably use model_config.(height,width) + x = (255 * np.random.random((3, *input_size))).astype(np.uint8) + model.forward(x) + self.yolov5Model = model + if model_config.anchors != "": anchors = [float(x) for x in model_config.anchors.split(',')] @@ -119,6 +97,7 @@ class LocalObjectDetector(ObjectDetector): def sigmoid(self, x): return 1. / (1 + np.exp(-x)) + def detect_raw(self, tensor_input): if self.model_config.type == "ssd": raw_detections = self.detect_ssd(tensor_input) @@ -126,8 +105,6 @@ class LocalObjectDetector(ObjectDetector): raw_detections = self.detect_yolov3(tensor_input) elif self.model_config.type == "yolov5": raw_detections = self.detect_yolov5(tensor_input) - elif self.model_config.type == "yolov5_pytorch": - raw_detections = self.detect_yolov5_pytorch(tensor_input) else: logger.error(f"Unsupported model type {self.model_config.type}") raw_detections = [] @@ -195,14 +172,12 @@ class LocalObjectDetector(ObjectDetector): def detect_yolov5(self, tensor_input): tensor_input = np.squeeze(tensor_input, axis=0) results = self.yolov5Model.forward(tensor_input) - print(self.yolov5Model.get_last_inference_time()) det = results[0] - detections = np.zeros((20, 6), np.float32) i = 0 for *xyxy, conf, cls in reversed(det): detections[i] = [ - int(cls)+1, + int(cls), float(conf), xyxy[1], xyxy[0], @@ -240,30 +215,6 @@ class LocalObjectDetector(ObjectDetector): return detections - def detect_yolov5_pytorch(self, tensor_input): - tensor_input = np.squeeze(tensor_input, axis=0) - results = self.yolov5ObjectDetector.score_frame(tensor_input) - labels, cord = results - n = len(labels) - detections = np.zeros((20, 6), np.float32) - if n > 0: - print(f"Total Targets: {n}") - print(f"Labels: {set([self.yolov5ObjectDetector.class_to_label(label) for label in labels])}") - for i in range(n): - if i < 20: - row = cord[i] - score = float(row[4]) - if score < 0.4: - break - x1, y1, x2, y2 = row[0], row[1], row[2], row[3] - label = self.yolov5ObjectDetector.class_to_label(labels[i]) - #detections[i] = [labels[i]+1, score, x1, y1, x2, y2] - detections[i] = [labels[i]+1, score, y1, x1, y2, x2] - print(detections[i]) - - return detections - - def detect_yolov3(self, tensor_input): input_details, output_details, net_input_shape = \ self.get_interpreter_details() @@ -282,8 +233,8 @@ class LocalObjectDetector(ObjectDetector): out2 = (out2.astype(np.float32) - o2_zero) * o2_scale num_classes = len(self.labels) - _boxes1, _scores1, _classes1 = self.featuresToBoxes(out1, self.anchors[[3, 4, 5]], len(self.labels), net_input_shape) - _boxes2, _scores2, _classes2 = self.featuresToBoxes(out2, self.anchors[[1, 2, 3]], len(self.labels), net_input_shape) + _boxes1, _scores1, _classes1 = self.featuresToBoxes(out1, self.anchors[[3, 4, 5]], num_classes, net_input_shape) + _boxes2, _scores2, _classes2 = self.featuresToBoxes(out2, self.anchors[[1, 2, 3]], num_classes, net_input_shape) if _boxes1.shape[0] == 0: _boxes1 = np.empty([0, 2, 2]) @@ -304,6 +255,7 @@ class LocalObjectDetector(ObjectDetector): return detections + def run_detector( name: str, detection_queue: mp.Queue, diff --git a/frigate/yolov5/edgetpumodel.py b/frigate/yolov5/edgetpumodel.py deleted file mode 100644 index c6c4966d2..000000000 --- a/frigate/yolov5/edgetpumodel.py +++ /dev/null @@ -1,318 +0,0 @@ -import time -import os -import sys -import logging - -import yaml -import numpy as np -import pycoral.utils.edgetpu as etpu -from pycoral.adapters import common -from frigate.yolov5.nms import non_max_suppression -import cv2 -import json -import tflite_runtime.interpreter as tflite -from frigate.yolov5.utils import plot_one_box, Colors, get_image_tensor - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("EdgeTPUModel") - - -class EdgeTPUModel: - - def __init__(self, model_file, names_file, conf_thresh=0.25, iou_thresh=0.45, desktop=True, filter_classes=None, - agnostic_nms=False, max_det=1000): - """ - Creates an object for running a Yolov5 model on an EdgeTPU or a Desktop - - Inputs: - - model_file: path to edgetpu-compiled tflite file - - names_file: yaml names file (yolov5 format) - - conf_thresh: detection threshold - - iou_thresh: NMS threshold - - desktop: option to run model on a desktop - - filter_classes: only output certain classes - - agnostic_nms: use class-agnostic NMS - - max_det: max number of detections - """ - - model_file = os.path.abspath(model_file) - - if not model_file.endswith('tflite'): - model_file += ".tflite" - - self.model_file = model_file - self.conf_thresh = conf_thresh - self.iou_thresh = iou_thresh - self.desktop = desktop - self.filter_classes = filter_classes - self.agnostic_nms = agnostic_nms - self.max_det = 1000 - - logger.info("Confidence threshold: {}".format(conf_thresh)) - logger.info("IOU threshold: {}".format(iou_thresh)) - - self.inference_time = None - self.nms_time = None - self.interpreter = None - self.colors = Colors() # create instance for 'from utils.plots import colors' - - #self.get_names(names_file) - self.names = ['person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', - 'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', - 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', - 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', - 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', - 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', - 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', - 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', - 'hair drier', 'toothbrush'] - self.make_interpreter() - self.get_image_size() - - def get_names(self, path): - """ - Load a names file - - Inputs: - - path: path to names file in yaml format - """ - - with open(path, 'r') as f: - cfg = yaml.load(f, Loader=yaml.SafeLoader) - - names = cfg['names'] - logger.info("Loaded {} classes".format(len(names))) - - self.names = names - - def make_interpreter(self): - """ - Internal function that loads the tflite file and creates - the interpreter that deals with the EdgeTPU hardware. - """ - # Load the model and allocate - # Choose desktop or EdgTPU - if self.desktop: - self.interpreter = tflite.Interpreter(self.model_file) - else: - self.interpreter = etpu.make_interpreter(self.model_file) - self.interpreter.allocate_tensors() - - self.input_details = self.interpreter.get_input_details() - self.output_details = self.interpreter.get_output_details() - - logger.debug(self.input_details) - logger.debug(self.output_details) - - self.input_zero = self.input_details[0]['quantization'][1] - self.input_scale = self.input_details[0]['quantization'][0] - self.output_zero = self.output_details[0]['quantization'][1] - self.output_scale = self.output_details[0]['quantization'][0] - - # If the model isn't quantized then these should be zero - # Check against small epsilon to avoid comparing float/int - if self.input_scale < 1e-9: - self.input_scale = 1.0 - - if self.output_scale < 1e-9: - self.output_scale = 1.0 - - logger.debug("Input scale: {}".format(self.input_scale)) - logger.debug("Input zero: {}".format(self.input_zero)) - logger.debug("Output scale: {}".format(self.output_scale)) - logger.debug("Output zero: {}".format(self.output_zero)) - - logger.info("Successfully loaded {}".format(self.model_file)) - - def get_image_size(self): - """ - Returns the expected size of the input image tensor - """ - if self.interpreter is not None: - self.input_size = common.input_size(self.interpreter) - logger.debug("Expecting input shape: {}".format(self.input_size)) - return self.input_size - else: - logger.warn("Interpreter is not yet loaded") - - def predict(self, image_path, save_img=True, save_txt=True): - logger.info("Attempting to load {}".format(image_path)) - - full_image, net_image, pad = get_image_tensor(image_path, self.input_size[0]) - pred = self.forward(net_image) - logger.info("Inference time: {}".format(self.inference_time)) - - base, ext = os.path.splitext(image_path) - - output_path = base + "_detect" + ext - det = self.process_predictions(pred[0], full_image, pad, output_path, save_img=save_img, save_txt=save_txt) - - return det - - def forward(self, x: np.ndarray, with_nms=True) -> np.ndarray: - """ - Predict function using the EdgeTPU - - Inputs: - x: (C, H, W) image tensor - with_nms: apply NMS on output - - Returns: - prediction array (with or without NMS applied) - - """ - tstart = time.time() - # Transpose if C, H, W - if x.shape[0] == 3: - x = x.transpose((1, 2, 0)) - - x = x.astype('float32') - - # Scale input, conversion is: real = (int_8 - zero)*scale - x = (x / self.input_scale) + self.input_zero - x = x[np.newaxis].astype(np.uint8) - - self.interpreter.set_tensor(self.input_details[0]['index'], x) - self.interpreter.invoke() - - # Scale output - result = (common.output_tensor(self.interpreter, 0).astype('float32') - self.output_zero) * self.output_scale - self.inference_time = time.time() - tstart - - if with_nms: - - tstart = time.time() - nms_result = non_max_suppression(result, self.conf_thresh, self.iou_thresh, self.filter_classes, - self.agnostic_nms, max_det=self.max_det) - self.nms_time = time.time() - tstart - - return nms_result - - else: - return result - - def get_last_inference_time(self, with_nms=True): - """ - Returns a tuple containing most recent inference and NMS time - """ - res = [self.inference_time] - - if with_nms: - res.append(self.nms_time) - - return res - - def get_scaled_coords(self, xyxy, output_image, pad): - """ - Converts raw prediction bounding box to orginal - image coordinates. - - Args: - xyxy: array of boxes - output_image: np array - pad: padding due to image resizing (pad_w, pad_h) - """ - pad_w, pad_h = pad - in_h, in_w = self.input_size - out_h, out_w, _ = output_image.shape - - ratio_w = out_w / (in_w - pad_w) - ratio_h = out_h / (in_h - pad_h) - - out = [] - for coord in xyxy: - x1, y1, x2, y2 = coord - - x1 *= in_w * ratio_w - x2 *= in_w * ratio_w - y1 *= in_h * ratio_h - y2 *= in_h * ratio_h - - x1 = max(0, x1) - x2 = min(out_w, x2) - - y1 = max(0, y1) - y2 = min(out_h, y2) - - out.append((x1, y1, x2, y2)) - - return np.array(out).astype(int) - - def process_predictions2(self, det): - """ - Process predictions and optionally output an image with annotations - """ - if len(det): - # Rescale boxes from img_size to im0 size - # x1, y1, x2, y2= - #det[:, :4] = self.get_scaled_coords(det[:, :4], output_image, pad) - output = {} - #base, ext = os.path.splitext(output_path) - - s = "" - - # Print results - for c in np.unique(det[:, -1]): - n = (det[:, -1] == c).sum() # detections per class - s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string - - if s != "": - s = s.strip() - s = s[:-1] - - logger.info("Detected: {}".format(s)) - - for *xyxy, conf, cls in reversed(det): - output = {} - output['box'] = xyxy - output['conf'] = conf - output['cls'] = cls - output['cls_name'] = self.names[c] - return output - - def process_predictions(self, det, output_image=None, pad=(0, 0), output_path="detection.jpg", save_img=False, save_txt=False, - hide_labels=False, hide_conf=False): - """ - Process predictions and optionally output an image with annotations - """ - if len(det): - # Rescale boxes from img_size to im0 size - # x1, y1, x2, y2= - det[:, :4] = self.get_scaled_coords(det[:, :4], output_image, pad) - output = {} - base, ext = os.path.splitext(output_path) - - s = "" - - # Print results - for c in np.unique(det[:, -1]): - n = (det[:, -1] == c).sum() # detections per class - s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string - - if s != "": - s = s.strip() - s = s[:-1] - - logger.info("Detected: {}".format(s)) - - # Write results - for *xyxy, conf, cls in reversed(det): - if save_img: # Add bbox to image - c = int(cls) # integer class - label = None if hide_labels else (self.names[c] if hide_conf else f'{self.names[c]} {conf:.2f}') - output_image = plot_one_box(xyxy, output_image, label=label, color=self.colors(c, True)) - if save_txt: - output[base] = {} - output[base]['box'] = xyxy - output[base]['conf'] = conf - output[base]['cls'] = cls - output[base]['cls_name'] = self.names[c] - - if save_txt: - output_txt = base + "txt" - with open(output_txt, 'w') as f: - json.dump(output, f, indent=1) - if save_img: - cv2.imwrite(output_path, output_image) - - return det \ No newline at end of file diff --git a/frigate/yolov5/yolov5edgetpumodel.py b/frigate/yolov5/yolov5edgetpumodel.py new file mode 100644 index 000000000..3267f84d0 --- /dev/null +++ b/frigate/yolov5/yolov5edgetpumodel.py @@ -0,0 +1,163 @@ +import time +import os +import sys +import logging + +import yaml +import numpy as np +import pycoral.utils.edgetpu as etpu +from pycoral.adapters import common +from frigate.yolov5.nms import non_max_suppression +import cv2 +import json +import tflite_runtime.interpreter as tflite +from frigate.yolov5.utils import plot_one_box, Colors, get_image_tensor + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("EdgeTPUModel") + + +class Yolov5EdgeTPUModel: + + def __init__(self, model_file, desktop=True, conf_thresh=0.25, iou_thresh=0.45, filter_classes=None, + agnostic_nms=False, max_det=1000): + """ + Creates an object for running a Yolov5 model on an EdgeTPU or a Desktop + + Inputs: + - model_file: path to edgetpu-compiled tflite file + - names_file: yaml names file (yolov5 format) + - conf_thresh: detection threshold + - iou_thresh: NMS threshold + - desktop: option to run model on a desktop + - filter_classes: only output certain classes + - agnostic_nms: use class-agnostic NMS + - max_det: max number of detections + """ + + model_file = os.path.abspath(model_file) + + if not model_file.endswith('tflite'): + model_file += ".tflite" + + self.model_file = model_file + self.conf_thresh = conf_thresh + self.iou_thresh = iou_thresh + self.desktop = desktop + self.filter_classes = filter_classes + self.agnostic_nms = agnostic_nms + self.max_det = 1000 + + logger.info("Confidence threshold: {}".format(conf_thresh)) + logger.info("IOU threshold: {}".format(iou_thresh)) + + self.inference_time = None + self.nms_time = None + self.interpreter = None + self.colors = Colors() # create instance for 'from utils.plots import colors' + + self.make_interpreter() + self.get_image_size() + + def make_interpreter(self): + """ + Internal function that loads the tflite file and creates + the interpreter that deals with the EdgeTPU hardware. + """ + # Load the model and allocate + # Choose desktop or EdgTPU + if self.desktop: + self.interpreter = tflite.Interpreter(self.model_file) + else: + self.interpreter = etpu.make_interpreter(self.model_file) + self.interpreter.allocate_tensors() + + self.input_details = self.interpreter.get_input_details() + self.output_details = self.interpreter.get_output_details() + + logger.debug(self.input_details) + logger.debug(self.output_details) + + self.input_zero = self.input_details[0]['quantization'][1] + self.input_scale = self.input_details[0]['quantization'][0] + self.output_zero = self.output_details[0]['quantization'][1] + self.output_scale = self.output_details[0]['quantization'][0] + + # If the model isn't quantized then these should be zero + # Check against small epsilon to avoid comparing float/int + if self.input_scale < 1e-9: + self.input_scale = 1.0 + + if self.output_scale < 1e-9: + self.output_scale = 1.0 + + logger.debug("Input scale: {}".format(self.input_scale)) + logger.debug("Input zero: {}".format(self.input_zero)) + logger.debug("Output scale: {}".format(self.output_scale)) + logger.debug("Output zero: {}".format(self.output_zero)) + + logger.info("Successfully loaded {}".format(self.model_file)) + + def get_image_size(self): + """ + Returns the expected size of the input image tensor + """ + if self.interpreter is not None: + self.input_size = common.input_size(self.interpreter) + logger.debug("Expecting input shape: {}".format(self.input_size)) + return self.input_size + else: + logger.warning("Interpreter is not yet loaded") + + def forward(self, x: np.ndarray, with_nms=True) -> np.ndarray: + """ + Predict function using the EdgeTPU + + Inputs: + x: (C, H, W) image tensor + with_nms: apply NMS on output + + Returns: + prediction array (with or without NMS applied) + + """ + tstart = time.time() + # Transpose if C, H, W + if x.shape[0] == 3: + x = x.transpose((1, 2, 0)) + + x = x.astype('float32') + + # Scale input, conversion is: real = (int_8 - zero)*scale + x = (x / self.input_scale) + self.input_zero + x = x[np.newaxis].astype(np.uint8) + + self.interpreter.set_tensor(self.input_details[0]['index'], x) + self.interpreter.invoke() + + # Scale output + result = (common.output_tensor(self.interpreter, 0).astype('float32') - self.output_zero) * self.output_scale + self.inference_time = time.time() - tstart + + if with_nms: + + tstart = time.time() + nms_result = non_max_suppression(result, self.conf_thresh, self.iou_thresh, self.filter_classes, + self.agnostic_nms, max_det=self.max_det) + self.nms_time = time.time() - tstart + + return nms_result + + else: + return result + + def get_last_inference_time(self, with_nms=True): + """ + Returns a tuple containing most recent inference and NMS time + """ + res = [self.inference_time] + + if with_nms: + res.append(self.nms_time) + + return res diff --git a/frigate/yolov5_pytorch.py b/frigate/yolov5_pytorch.py deleted file mode 100644 index d0e9c7fac..000000000 --- a/frigate/yolov5_pytorch.py +++ /dev/null @@ -1,111 +0,0 @@ -import torch -import numpy as np -#import cv2 -from time import time -import sys - - -class ObjectDetection: - """ - The class performs generic object detection on a video file. - It uses yolo5 pretrained model to make inferences and opencv2 to manage frames. - Included Features: - 1. Reading and writing of video file using Opencv2 - 2. Using pretrained model to make inferences on frames. - 3. Use the inferences to plot boxes on objects along with labels. - Upcoming Features: - """ - def __init__(self): - self.model = self.load_model() - self.model.conf = 0.4 # set inference threshold at 0.3 - self.model.iou = 0.3 # set inference IOU threshold at 0.3 - #self.model.classes = [0] # set model to only detect "Person" class - #self.model.classes = self.model.names - self.classes = self.model.names - self.found_lables = set() # set - self.device = 'cuda' if torch.cuda.is_available() else 'cpu' - - def load_model(self): - """ - Function loads the yolo5 model from PyTorch Hub. - """ - #model = torch.hub.load('/media/frigate/yolov5', 'custom', path='/media/frigate/yolov5/yolov5l.pt', source='local') - model = torch.hub.load('/media/frigate/yolov5', 'custom', path='/media/frigate/yolov5/yolov5s.pt', source='local') - #model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True) - #model = torch.hub.load('ultralytics/yolov3', 'yolov3', pretrained=True) - return model - - def class_to_label(self, x): - """ - For a given label value, return corresponding string label. - :param x: numeric label - :return: corresponding string label - """ - return self.classes[int(x)] - - def score_frame(self, frame): - """ - function scores each frame of the video and returns results. - :param frame: frame to be infered. - :return: labels and coordinates of objects found. - """ - self.model.to(self.device) - results = self.model(frame) - labels, cord = results.xyxyn[0][:, -1].to('cpu').numpy(), results.xyxyn[0][:, :-1].to('cpu').numpy() - return labels, cord - - def plot_boxes(self, results, frame): - """ - plots boxes and labels on frame. - :param results: inferences made by model - :param frame: frame on which to make the plots - :return: new frame with boxes and labels plotted. - """ - labels, cord = results - n = len(labels) - if n > 0: - print(f"Total Targets: {n}") - print(f"Labels: {set([self.class_to_label(label) for label in labels])}") - x_shape, y_shape = frame.shape[1], frame.shape[0] - for i in range(n): - self.found_lables.add(self.class_to_label(labels[i])) - row = cord[i] - x1, y1, x2, y2 = int(row[0]*x_shape), int(row[1]*y_shape), int(row[2]*x_shape), int(row[3]*y_shape) - bgr = (0, 0, 255) - cv2.rectangle(frame, (x1, y1), (x2, y2), bgr, 1) - label = f"{int(row[4]*100)}" - cv2.putText(frame, self.class_to_label(labels[i]), (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1) - cv2.putText(frame, f"Total Targets: {n}", (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) - - return frame - - def __call__(self): - player = self.get_video_from_file() # create streaming service for application - assert player.isOpened() - x_shape = int(player.get(cv2.CAP_PROP_FRAME_WIDTH)) - y_shape = int(player.get(cv2.CAP_PROP_FRAME_HEIGHT)) - four_cc = cv2.VideoWriter_fourcc(*"MJPG") - out = cv2.VideoWriter(self.out_file, four_cc, 20, (x_shape, y_shape)) - fc = 0 - fps = 0 - tfc = int(player.get(cv2.CAP_PROP_FRAME_COUNT)) - tfcc = 0 - while True: - fc += 1 - start_time = time() - ret, frame = player.read() - if not ret: - break - results = self.score_frame(frame) - frame = self.plot_boxes(results, frame) - end_time = time() - fps += 1/np.round(end_time - start_time, 3) - if fc == 10: - fps = int(fps / 10) - tfcc += fc - fc = 0 - per_com = int(tfcc / tfc * 100) - print(f"Frames Per Second : {fps} || Percentage Parsed : {per_com}") - out.write(frame) - print(f"Found labels: {self.found_lables}") - player.release()