mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-01 19:17:41 +03:00
Add basic config for teachable machine models
This commit is contained in:
parent
c67b25e3da
commit
b65e8506b5
172
frigate/data_processing/real_time/teachable_machine.py
Normal file
172
frigate/data_processing/real_time/teachable_machine.py
Normal file
@ -0,0 +1,172 @@
|
||||
"""Real time processor that works with teachable machine tflite models."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from frigate.comms.event_metadata_updater import (
|
||||
EventMetadataPublisher,
|
||||
EventMetadataTypeEnum,
|
||||
)
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.config.classification import TeachableMachineConfig
|
||||
from frigate.util.builtin import load_labels
|
||||
from frigate.util.object import calculate_region
|
||||
|
||||
from ..types import DataProcessorMetrics
|
||||
from .api import RealTimeProcessorApi
|
||||
|
||||
try:
|
||||
from tflite_runtime.interpreter import Interpreter
|
||||
except ModuleNotFoundError:
|
||||
from tensorflow.lite.python.interpreter import Interpreter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TeachableMachineStateProcessor(RealTimeProcessorApi):
|
||||
def __init__(
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
model_config: TeachableMachineConfig,
|
||||
metrics: DataProcessorMetrics,
|
||||
):
|
||||
super().__init__(config, metrics)
|
||||
self.model_config = model_config
|
||||
self.interpreter: Interpreter = None
|
||||
self.tensor_input_details: dict[str, Any] = None
|
||||
self.tensor_output_details: dict[str, Any] = None
|
||||
self.labelmap: dict[int, str] = {}
|
||||
self.__build_detector()
|
||||
|
||||
def __build_detector(self) -> None:
|
||||
self.interpreter = Interpreter(
|
||||
model_path=self.model_config.model_path,
|
||||
num_threads=2,
|
||||
)
|
||||
self.interpreter.allocate_tensors()
|
||||
self.tensor_input_details = self.interpreter.get_input_details()
|
||||
self.tensor_output_details = self.interpreter.get_output_details()
|
||||
self.labelmap = load_labels(self.model_config.labelmap_path, prefill=0)
|
||||
|
||||
def process_frame(self, obj_data, frame):
|
||||
x, y, x2, y2 = calculate_region(
|
||||
frame.shape,
|
||||
obj_data["box"][0],
|
||||
obj_data["box"][1],
|
||||
obj_data["box"][2],
|
||||
obj_data["box"][3],
|
||||
224,
|
||||
1.0,
|
||||
)
|
||||
|
||||
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
|
||||
input = rgb[
|
||||
y:y2,
|
||||
x:x2,
|
||||
]
|
||||
|
||||
if input.shape != (224, 224):
|
||||
input = cv2.resize(input, (224, 224))
|
||||
|
||||
input = np.expand_dims(input, axis=0)
|
||||
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input)
|
||||
self.interpreter.invoke()
|
||||
res: np.ndarray = self.interpreter.get_tensor(
|
||||
self.tensor_output_details[0]["index"]
|
||||
)[0]
|
||||
probs = res / res.sum(axis=0)
|
||||
best_id = np.argmax(probs)
|
||||
score = round(probs[best_id], 2)
|
||||
|
||||
print(f"got ID of {best_id} with score {score}")
|
||||
|
||||
def handle_request(self, topic, request_data):
|
||||
return None
|
||||
|
||||
def expire_object(self, object_id, camera):
|
||||
pass
|
||||
|
||||
|
||||
class TeachableMachineObjectProcessor(RealTimeProcessorApi):
|
||||
def __init__(
|
||||
self,
|
||||
config: FrigateConfig,
|
||||
model_config: TeachableMachineConfig,
|
||||
sub_label_publisher: EventMetadataPublisher,
|
||||
metrics: DataProcessorMetrics,
|
||||
):
|
||||
super().__init__(config, metrics)
|
||||
self.model_config = model_config
|
||||
self.interpreter: Interpreter = None
|
||||
self.sub_label_publisher = sub_label_publisher
|
||||
self.tensor_input_details: dict[str, Any] = None
|
||||
self.tensor_output_details: dict[str, Any] = None
|
||||
self.detected_objects: dict[str, float] = {}
|
||||
self.labelmap: dict[int, str] = {}
|
||||
self.__build_detector()
|
||||
|
||||
def __build_detector(self) -> None:
|
||||
self.interpreter = Interpreter(
|
||||
model_path=self.model_config.model_path,
|
||||
num_threads=2,
|
||||
)
|
||||
self.interpreter.allocate_tensors()
|
||||
self.tensor_input_details = self.interpreter.get_input_details()
|
||||
self.tensor_output_details = self.interpreter.get_output_details()
|
||||
self.labelmap = load_labels(self.model_config.labelmap_path, prefill=0)
|
||||
|
||||
def process_frame(self, obj_data, frame):
|
||||
if obj_data["label"] != "object":
|
||||
return
|
||||
|
||||
x, y, x2, y2 = calculate_region(
|
||||
frame.shape,
|
||||
obj_data["box"][0],
|
||||
obj_data["box"][1],
|
||||
obj_data["box"][2],
|
||||
obj_data["box"][3],
|
||||
224,
|
||||
1.0,
|
||||
)
|
||||
|
||||
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2RGB_I420)
|
||||
input = rgb[
|
||||
y:y2,
|
||||
x:x2,
|
||||
]
|
||||
|
||||
if input.shape != (224, 224):
|
||||
input = cv2.resize(input, (224, 224))
|
||||
|
||||
input = np.expand_dims(input, axis=0)
|
||||
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], input)
|
||||
self.interpreter.invoke()
|
||||
res: np.ndarray = self.interpreter.get_tensor(
|
||||
self.tensor_output_details[0]["index"]
|
||||
)[0]
|
||||
probs = res / res.sum(axis=0)
|
||||
best_id = np.argmax(probs)
|
||||
|
||||
score = round(probs[best_id], 2)
|
||||
|
||||
previous_score = self.detected_objects.get(obj_data["id"], 0.0)
|
||||
|
||||
if score <= previous_score:
|
||||
logger.debug(f"Score {score} is worse than previous score {previous_score}")
|
||||
return
|
||||
|
||||
self.sub_label_publisher.publish(
|
||||
EventMetadataTypeEnum.sub_label,
|
||||
(obj_data["id"], self.labelmap[best_id], score),
|
||||
)
|
||||
self.detected_objects[obj_data["id"]] = score
|
||||
|
||||
def handle_request(self, topic, request_data):
|
||||
return None
|
||||
|
||||
def expire_object(self, object_id, camera):
|
||||
if object_id in self.detected_objects:
|
||||
self.detected_objects.pop(object_id)
|
||||
@ -172,7 +172,7 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
self._process_requests()
|
||||
self._process_updates()
|
||||
self._process_recordings_updates()
|
||||
self._process_dedicated_lpr()
|
||||
self._process_frame_updates()
|
||||
self._expire_dedicated_lpr()
|
||||
self._process_finalized()
|
||||
self._process_event_metadata()
|
||||
@ -449,7 +449,7 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
event_id, RegenerateDescriptionEnum(source)
|
||||
)
|
||||
|
||||
def _process_dedicated_lpr(self) -> None:
|
||||
def _process_frame_updates(self) -> None:
|
||||
"""Process event updates"""
|
||||
(topic, data) = self.detection_subscriber.check_for_update()
|
||||
|
||||
@ -458,16 +458,17 @@ class EmbeddingMaintainer(threading.Thread):
|
||||
|
||||
camera, frame_name, _, _, motion_boxes, _ = data
|
||||
|
||||
if not camera or not self.config.lpr.enabled or len(motion_boxes) == 0:
|
||||
if not camera or len(motion_boxes) == 0:
|
||||
return
|
||||
|
||||
camera_config = self.config.cameras[camera]
|
||||
|
||||
custom_classification_enabled = True
|
||||
if (
|
||||
camera_config.type != CameraTypeEnum.lpr
|
||||
or "license_plate" in camera_config.objects.track
|
||||
):
|
||||
# we're not a dedicated lpr camera or we are one but we're using frigate+
|
||||
) and not custom_classification_enabled:
|
||||
# no active features that use this data
|
||||
return
|
||||
|
||||
try:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user