Add Ultralytics YOLO pose detector and update documentation.

This commit is contained in:
jaykin-jmsc-aiml 2025-09-24 16:03:52 +05:30
parent 7e2f5a3017
commit e57ed4e8a2
8 changed files with 297 additions and 9 deletions

View File

@ -1,9 +1,10 @@
#!/command/with-contenv bash #!/command/with-contenv bash
# shellcheck shell=bash # shellcheck shell=bash
# Start the fake Frigate service # Start the Frigate service (with optional fake mode for legacy behaviour)
set -o errexit -o nounset -o pipefail set -o errexit -o nounset -o pipefail
if [ "${DEV_AUTOSTART_FRIGATE:-1}" != "1" ]; then
# Tell S6-Overlay not to restart this service # Tell S6-Overlay not to restart this service
s6-svc -O . s6-svc -O .
@ -11,3 +12,28 @@ while true; do
echo "[INFO] The fake Frigate service is running..." echo "[INFO] The fake Frigate service is running..."
sleep 5s sleep 5s
done done
fi
# Tell S6-Overlay not to restart this service
s6-svc -O .
if [ -e /usr/local/bin/opt_in_out ]; then
/usr/local/bin/opt_in_out --opt_out > /dev/null 2>&1
fi
set_libva_version() {
local ffmpeg_path
ffmpeg_path=$(python3 /usr/local/ffmpeg/get_ffmpeg_path.py)
LIBAVFORMAT_VERSION_MAJOR=$("$ffmpeg_path" -version | grep -Po "libavformat\W+\K\d+")
export LIBAVFORMAT_VERSION_MAJOR
}
echo "[INFO] Preparing Frigate..."
set_libva_version
echo "[INFO] Starting Frigate..."
cd /workspace/frigate || echo "[ERROR] Failed to change working directory to /workspace/frigate"
exec 2>&1
exec python3 -u -m frigate

View File

@ -24,6 +24,7 @@ peewee == 3.17.*
peewee_migrate == 1.13.* peewee_migrate == 1.13.*
psutil == 7.1.* psutil == 7.1.*
pydantic == 2.10.* pydantic == 2.10.*
ultralytics == 8.3.*
git+https://github.com/fbcotter/py3nvml#egg=py3nvml git+https://github.com/fbcotter/py3nvml#egg=py3nvml
pytz == 2025.* pytz == 2025.*
pyzmq == 26.2.* pyzmq == 26.2.*

View File

@ -1,2 +1,3 @@
scikit-build == 0.18.* scikit-build == 0.18.*
nvidia-pyindex nvidia-pyindex
ultralytics == 8.3.*

View File

@ -58,7 +58,7 @@ This does not affect using hardware for accelerating other tasks such as [semant
# Officially Supported Detectors # Officially Supported Detectors
Frigate provides the following builtin detector types: `cpu`, `edgetpu`, `hailo8l`, `memryx`, `onnx`, `openvino`, `rknn`, and `tensorrt`. By default, Frigate will use a single CPU detector. Other detectors may require additional configuration as described below. When using multiple detectors they will run in dedicated processes, but pull from a common queue of detection requests from across all cameras. Frigate provides the following builtin detector types: `cpu`, `edgetpu`, `hailo8l`, `memryx`, `onnx`, `openvino`, `rknn`, `tensorrt`, and `ultralytics_pose`. By default, Frigate will use a single CPU detector. Other detectors may require additional configuration as described below. When using multiple detectors they will run in dedicated processes, but pull from a common queue of detection requests from across all cameras.
## Edge TPU Detector ## Edge TPU Detector
@ -141,6 +141,27 @@ detectors:
--- ---
## Ultralytics Pose Detector
The Ultralytics Pose detector runs a YOLO pose model and depends on the `ultralytics` package. To configure an Ultralytics Pose detector, set the `"type"` attribute to `"ultralytics_pose"`.
:::note
The `model_type` must be set to `yolo-pose` for this detector to work correctly.
:::
### Configuration
```yaml
detectors:
pose:
type: ultralytics_pose
device: cpu # or cuda for nvidia gpus
half_precision: False # optional, runs the model in half precision (GPU only)
max_detections: 20 # optional, maximum number of detections to return
```
---
## Hailo-8 ## Hailo-8
This detector is available for use with both Hailo-8 and Hailo-8L AI Acceleration Modules. The integration automatically detects your hardware architecture via the Hailo CLI and selects the appropriate default model if no custom model is specified. This detector is available for use with both Hailo-8 and Hailo-8L AI Acceleration Modules. The integration automatically detects your hardware architecture via the Hailo CLI and selects the appropriate default model if no custom model is specified.

View File

@ -42,6 +42,7 @@ class ModelTypeEnum(str, Enum):
yolox = "yolox" yolox = "yolox"
yolonas = "yolonas" yolonas = "yolonas"
yologeneric = "yolo-generic" yologeneric = "yolo-generic"
yolo_pose = "yolo-pose"
class ModelConfig(BaseModel): class ModelConfig(BaseModel):

View File

@ -0,0 +1,165 @@
"""Ultralytics YOLO pose detector integration."""
import logging
from typing import Any
import numpy as np
from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import (
BaseDetectorConfig,
ModelTypeEnum,
)
logger = logging.getLogger(__name__)
DETECTOR_KEY = "ultralytics_pose"
class UltralyticsPoseDetectorConfig(BaseDetectorConfig):
"""Detector configuration for Ultralytics YOLO pose models."""
type: Literal[DETECTOR_KEY]
device: str = Field(default="cpu", title="Torch device to run inference on")
half_precision: bool = Field(
default=False, title="Run the model in half precision (GPU only)"
)
max_detections: int = Field(
default=20, title="Maximum number of detections to return"
)
class UltralyticsPoseDetector(DetectionApi):
"""Detector implementation that wraps Ultralytics YOLO pose models."""
type_key = DETECTOR_KEY
supported_models = [ModelTypeEnum.yolo_pose]
def __init__(self, detector_config: UltralyticsPoseDetectorConfig):
super().__init__(detector_config)
try:
from ultralytics import YOLO
except ImportError as exc: # pragma: no cover - dependency guard
raise RuntimeError(
"The 'ultralytics' package is required for the ultralytics_pose detector"
) from exc
if detector_config.model is None or not detector_config.model.path:
raise ValueError(
"Ultralytics pose detector requires 'model.path' to be configured."
)
self.device = detector_config.device
self.max_detections = detector_config.max_detections
self.labelmap = detector_config.model.merged_labelmap if detector_config.model else {}
logger.info(
"Loading Ultralytics YOLO pose model from %s on device %s",
detector_config.model.path,
self.device,
)
self.model = YOLO(detector_config.model.path)
if self.device:
self.model.to(self.device)
# Configure inference defaults
self.model.overrides.setdefault("task", "pose")
self.model.overrides["conf"] = self.thresh
self.model.overrides["verbose"] = False
self.model.overrides["max_det"] = self.max_detections
# Enable half precision when explicitly requested and supported
self._use_half = bool(detector_config.half_precision)
if self._use_half and self.device and self.device != "cpu":
try:
self.model.model.half()
except AttributeError:
logger.warning(
"Unable to enable half precision for Ultralytics model; proceeding in fp32."
)
self._use_half = False
elif self._use_half:
logger.warning(
"Half precision requested but unsupported on CPU; proceeding in fp32."
)
self._use_half = False
def detect_raw(self, tensor_input: np.ndarray) -> np.ndarray:
frame = tensor_input[0]
if frame.dtype != np.uint8:
frame = frame.astype(np.uint8)
# Ensure contiguous memory for torch consumption
frame = np.ascontiguousarray(frame)
try:
results = self.model.predict(
frame,
imgsz=(self.height, self.width),
device=self.device,
conf=self.thresh,
verbose=False,
max_det=self.max_detections,
)
except Exception as exc: # pragma: no cover - runtime safeguard
logger.exception("Ultralytics pose inference failed: %s", exc)
raise
detections = np.zeros((self.max_detections, 6), dtype=np.float32)
if not results:
return detections
result = results[0]
boxes = getattr(result, "boxes", None)
if boxes is None or boxes.xyxy is None:
return detections
xyxy = self._to_numpy(boxes.xyxy)
confidences = self._to_numpy(boxes.conf)
classes = self._to_numpy(boxes.cls)
num_detections = min(len(xyxy), self.max_detections)
log_entries = []
for idx in range(num_detections):
x_min, y_min, x_max, y_max = xyxy[idx]
detections[idx] = (
float(classes[idx]),
float(confidences[idx]),
float(max(0.0, min(1.0, y_min / self.height))),
float(max(0.0, min(1.0, x_min / self.width))),
float(max(0.0, min(1.0, y_max / self.height))),
float(max(0.0, min(1.0, x_max / self.width))),
)
if logger.isEnabledFor(logging.DEBUG):
class_id = int(classes[idx]) if idx < len(classes) else None
label = self.labelmap.get(class_id, f"unknown_{class_id}")
log_entries.append(
f"{label} (class={class_id}) conf={float(confidences[idx]):.3f} "
f"bbox=({x_min:.1f},{y_min:.1f},{x_max:.1f},{y_max:.1f})"
)
if log_entries:
logger.debug(
"Ultralytics pose detections: %s",
"; ".join(log_entries),
)
return detections
@staticmethod
def _to_numpy(value: Any) -> np.ndarray:
if hasattr(value, "detach"):
value = value.detach()
if hasattr(value, "cpu"):
value = value.cpu()
if hasattr(value, "numpy"):
return value.numpy()
return np.asarray(value)

View File

@ -380,18 +380,43 @@ class RemoteObjectDetector:
# copy input to shared memory # copy input to shared memory
self.np_shm[:] = tensor_input[:] self.np_shm[:] = tensor_input[:]
self.detection_queue.put(self.name) self.detection_queue.put(self.name)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Detector %s queued frame for inference", self.name)
result = self.detector_subscriber.check_for_update() result = self.detector_subscriber.check_for_update()
# if it timed out # if it timed out
if result is None: if result is None:
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Detector %s timed out waiting for inference result", self.name)
return detections return detections
for d in self.out_np_shm: for d in self.out_np_shm:
if d[1] < threshold: if d[1] < threshold:
break if logger.isEnabledFor(logging.DEBUG) and d[1] != 0.0:
detections.append( logger.debug(
(self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5])) "Detector %s stopping at class=%s conf=%.3f below threshold %.3f",
self.name,
int(d[0]),
float(d[1]),
threshold,
) )
break
class_id = int(d[0])
label = self.labels.get(class_id, f"unknown_{class_id}")
confidence = float(d[1])
box = (d[2], d[3], d[4], d[5])
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Detector %s detection label=%s class=%s conf=%.3f box=%s",
self.name,
label,
class_id,
confidence,
box,
)
detections.append((label, confidence, box))
self.fps.update() self.fps.update()
return detections return detections

View File

@ -221,6 +221,12 @@ def is_object_filtered(obj, objects_to_track, object_filters):
object_ratio = obj[4] object_ratio = obj[4]
if object_name not in objects_to_track: if object_name not in objects_to_track:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - not in track list %s",
object_name,
list(objects_to_track),
)
return True return True
if object_name in object_filters: if object_name in object_filters:
@ -229,23 +235,58 @@ def is_object_filtered(obj, objects_to_track, object_filters):
# if the min area is larger than the # if the min area is larger than the
# detected object, don't add it to detected objects # detected object, don't add it to detected objects
if obj_settings.min_area > object_area: if obj_settings.min_area > object_area:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - area %.2f below min_area %.2f",
object_name,
object_area,
obj_settings.min_area,
)
return True return True
# if the detected object is larger than the # if the detected object is larger than the
# max area, don't add it to detected objects # max area, don't add it to detected objects
if obj_settings.max_area < object_area: if obj_settings.max_area < object_area:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - area %.2f above max_area %.2f",
object_name,
object_area,
obj_settings.max_area,
)
return True return True
# if the score is lower than the min_score, skip # if the score is lower than the min_score, skip
if obj_settings.min_score > object_score: if obj_settings.min_score > object_score:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - score %.3f below min_score %.3f",
object_name,
object_score,
obj_settings.min_score,
)
return True return True
# if the object is not proportionally wide enough # if the object is not proportionally wide enough
if obj_settings.min_ratio > object_ratio: if obj_settings.min_ratio > object_ratio:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - ratio %.3f below min_ratio %.3f",
object_name,
object_ratio,
obj_settings.min_ratio,
)
return True return True
# if the object is proportionally too wide # if the object is proportionally too wide
if obj_settings.max_ratio < object_ratio: if obj_settings.max_ratio < object_ratio:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - ratio %.3f above max_ratio %.3f",
object_name,
object_ratio,
obj_settings.max_ratio,
)
return True return True
if obj_settings.mask is not None: if obj_settings.mask is not None:
@ -262,6 +303,13 @@ def is_object_filtered(obj, objects_to_track, object_filters):
# if the object is in a masked location, don't add it to detected objects # if the object is in a masked location, don't add it to detected objects
if obj_settings.mask[y_location][x_location] == 0: if obj_settings.mask[y_location][x_location] == 0:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"Filtered detection '%s' - masked at (%d,%d)",
object_name,
x_location,
y_location,
)
return True return True
return False return False