This commit is contained in:
ZhaiSoul 2025-03-13 00:31:15 +08:00
commit 4adf6ee22f
29 changed files with 1307 additions and 968 deletions

View File

@ -10,7 +10,7 @@ slowapi == 0.1.*
imutils == 0.5.*
joserfc == 1.0.*
pathvalidate == 3.2.*
markupsafe == 2.1.*
markupsafe == 3.0.*
python-multipart == 0.0.12
# General
mypy == 1.6.1

View File

@ -97,15 +97,35 @@ python3 -c 'import secrets; print(secrets.token_hex(64))'
### Header mapping
If you have disabled Frigate's authentication and your proxy supports passing a header with the authenticated username, you can use the `header_map` config to specify the header name so it is passed to Frigate. For example, the following will map the `X-Forwarded-User` value. Header names are not case sensitive.
If you have disabled Frigate's authentication and your proxy supports passing a header with authenticated usernames and/or roles, you can use the `header_map` config to specify the header name so it is passed to Frigate. For example, the following will map the `X-Forwarded-User` and `X-Forwarded-Role` values. Header names are not case sensitive.
```yaml
proxy:
...
header_map:
user: x-forwarded-user
role: x-forwarded-role
```
Frigate supports both `admin` and `viewer` roles (see below). When using port `8971`, Frigate validates these headers and subsequent requests use the headers `remote-user` and `remote-role` for authorization.
#### Port Considerations
**Authenticated Port (8971)**
- Header mapping is **fully supported**.
- The `remote-role` header determines the users privileges:
- **admin** → Full access (user management, configuration changes).
- **viewer** → Read-only access.
- Ensure your **proxy sends both user and role headers** for proper role enforcement.
**Unauthenticated Port (5000)**
- Headers are **ignored** for role enforcement.
- All requests are treated as **anonymous**.
- The `remote-role` value is **overridden** to **admin-level access**.
- This design ensures **unauthenticated internal use** within a trusted network.
Note that only the following list of headers are permitted by default:
```
@ -126,8 +146,6 @@ X-authentik-uid
If you would like to add more options, you can overwrite the default file with a docker bind mount at `/usr/local/nginx/conf/proxy_trusted_headers.conf`. Reference the source code for the default file formatting.
Future versions of Frigate may leverage group and role headers for authorization in Frigate as well.
### Login page redirection
Frigate gracefully performs login page redirection that should work with most authentication proxies. If your reverse proxy returns a `Location` header on `401`, `302`, or `307` unauthorized responses, Frigate's frontend will automatically detect it and redirect to that URL.
@ -135,3 +153,31 @@ Frigate gracefully performs login page redirection that should work with most au
### Custom logout url
If your reverse proxy has a dedicated logout url, you can specify using the `logout_url` config option. This will update the link for the `Logout` link in the UI.
## User Roles
Frigate supports user roles to control access to certain features in the UI and API, such as managing users or modifying configuration settings. Roles are assigned to users in the database or through proxy headers and are enforced when accessing the UI or API through the authenticated port (`8971`).
### Supported Roles
- **admin**: Full access to all features, including user management and configuration.
- **viewer**: Read-only access to the UI and API, including viewing cameras, review items, and historical footage. Configuration editor and settings in the UI are inaccessible.
### Role Enforcement
When using the authenticated port (`8971`), roles are validated via the JWT token or proxy headers (e.g., `remote-role`).
On the internal **unauthenticated** port (`5000`), roles are **not enforced**. All requests are treated as **anonymous**, granting access equivalent to the **admin** role without restrictions.
To use role-based access control, you must connect to Frigate via the **authenticated port (`8971`)** directly or through a reverse proxy.
### Role Visibility in the UI
- When logged in via port `8971`, your **username and role** are displayed in the **account menu** (bottom corner).
- When using port `5000`, the UI will always display "anonymous" for the username and "admin" for the role.
### Managing User Roles
1. Log in as an **admin** user via port `8971`.
2. Navigate to **Settings > Users**.
3. Edit a users role by selecting **admin** or **viewer**.

View File

@ -12,7 +12,7 @@ Frigate supports multiple different detectors that work on different types of ha
**Most Hardware**
- [Coral EdgeTPU](#edge-tpu-detector): The Google Coral EdgeTPU is available in USB and m.2 format allowing for a wide range of compatibility with devices.
- [Hailo](#hailo-8l): The Hailo8 AI Acceleration module is available in m.2 format with a HAT for RPi devices, offering a wide range of compatibility with devices.
- [Hailo](#hailo-8): The Hailo8 and Hailo8L AI Acceleration module is available in m.2 format with a HAT for RPi devices, offering a wide range of compatibility with devices.
**AMD**
@ -129,15 +129,58 @@ detectors:
type: edgetpu
device: pci
```
---
## Hailo-8l
This detector is available for use with Hailo-8 AI Acceleration Module.
## Hailo-8
See the [installation docs](../frigate/installation.md#hailo-8l) for information on configuring the hailo8.
This detector is available for use with both Hailo-8 and Hailo-8L AI Acceleration Modules. The integration automatically detects your hardware architecture via the Hailo CLI and selects the appropriate default model if no custom model is specified.
See the [installation docs](../frigate/installation.md#hailo-8l) for information on configuring the Hailo hardware.
### Configuration
When configuring the Hailo detector, you have two options to specify the model: a local **path** or a **URL**.
If both are provided, the detector will first check for the model at the given local path. If the file is not found, it will download the model from the specified URL. The model file is cached under `/config/model_cache/hailo`.
#### YOLO
Use this configuration for YOLO-based models. When no custom model path or URL is provided, the detector automatically downloads the default model based on the detected hardware:
- **Hailo-8 hardware:** Uses **YOLOv6n** (default: `yolov6n.hef`)
- **Hailo-8L hardware:** Uses **YOLOv6n** (default: `yolov6n.hef`)
```yaml
detectors:
hailo8l:
type: hailo8l
device: PCIe
model:
width: 320
height: 320
input_tensor: nhwc
input_pixel_format: rgb
input_dtype: int
model_type: yolo-generic
# The detector automatically selects the default model based on your hardware:
# - For Hailo-8 hardware: YOLOv6n (default: yolov6n.hef)
# - For Hailo-8L hardware: YOLOv6n (default: yolov6n.hef)
#
# Optionally, you can specify a local model path to override the default.
# If a local path is provided and the file exists, it will be used instead of downloading.
# Example:
# path: /config/model_cache/hailo/yolov6n.hef
#
# You can also override using a custom URL:
# path: https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/yolov6n.hef
# just make sure to give it the write configuration based on the model
```
#### SSD
For SSD-based models, provide either a model path or URL to your compiled SSD model. The integration will first check the local path before downloading if necessary.
```yaml
detectors:
hailo8l:
@ -148,11 +191,50 @@ model:
width: 300
height: 300
input_tensor: nhwc
input_pixel_format: bgr
input_pixel_format: rgb
model_type: ssd
path: /config/model_cache/h8l_cache/ssd_mobilenet_v1.hef
# Specify the local model path (if available) or URL for SSD MobileNet v1.
# Example with a local path:
# path: /config/model_cache/h8l_cache/ssd_mobilenet_v1.hef
#
# Or override using a custom URL:
# path: https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/ssd_mobilenet_v1.hef
```
#### Custom Models
The Hailo detector supports all YOLO models compiled for Hailo hardware that include post-processing. You can specify a custom URL or a local path to download or use your model directly. If both are provided, the detector checks the local path first.
```yaml
detectors:
hailo8l:
type: hailo8l
device: PCIe
model:
width: 640
height: 640
input_tensor: nhwc
input_pixel_format: rgb
input_dtype: int
model_type: yolo-generic
# Optional: Specify a local model path.
# path: /config/model_cache/hailo/custom_model.hef
#
# Alternatively, or as a fallback, provide a custom URL:
# path: https://custom-model-url.com/path/to/model.hef
```
For additional ready-to-use models, please visit: https://github.com/hailo-ai/hailo_model_zoo
Hailo8 supports all models in the Hailo Model Zoo that include HailoRT post-processing. You're welcome to choose any of these pre-configured models for your implementation.
> **Note:**
> The config.path parameter can accept either a local file path or a URL ending with .hef. When provided, the detector will first check if the path is a local file path. If the file exists locally, it will use it directly. If the file is not found locally or if a URL was provided, it will attempt to download the model from the specified URL.
---
## OpenVINO Detector
The OpenVINO detector type runs an OpenVINO IR model on AMD and Intel CPUs, Intel GPUs and Intel VPU hardware. To configure an OpenVINO detector, set the `"type"` attribute to `"openvino"`.

View File

@ -21,23 +21,77 @@ I may earn a small commission for my endorsement, recommendation, testimonial, o
## Server
My current favorite is the Beelink EQ13 because of the efficient N100 CPU and dual NICs that allow you to setup a dedicated private network for your cameras where they can be blocked from accessing the internet. There are many used workstation options on eBay that work very well. Anything with an Intel CPU and capable of running Debian should work fine. As a bonus, you may want to look for devices with a M.2 or PCIe express slot that is compatible with the Google Coral. I may earn a small commission for my endorsement, recommendation, testimonial, or link to any products or services from this website.
My current favorite is the Beelink EQ13 because of the efficient N100 CPU and dual NICs that allow you to setup a dedicated private network for your cameras where they can be blocked from accessing the internet. There are many used workstation options on eBay that work very well. Anything with an Intel CPU and capable of running Debian should work fine. As a bonus, you may want to look for devices with a M.2 or PCIe express slot that is compatible with the Hailo8 or Google Coral. I may earn a small commission for my endorsement, recommendation, testimonial, or link to any products or services from this website.
| Name | Coral Inference Speed | Coral Compatibility | Notes |
| ------------------------------------------------------------------------------------------------------------- | --------------------- | ------------------- | ----------------------------------------------------------------------------------------- |
| Beelink EQ13 (<a href="https://amzn.to/4iQaBKu" target="_blank" rel="nofollow noopener sponsored">Amazon</a>) | 5-10ms | USB | Dual gigabit NICs for easy isolated camera network. Easily handles several 1080p cameras. |
| Name | Notes |
| ------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------- |
| Beelink EQ13 (<a href="https://amzn.to/4iQaBKu" target="_blank" rel="nofollow noopener sponsored">Amazon</a>) | Dual gigabit NICs for easy isolated camera network. Easily handles several 1080p cameras. |
## Detectors
A detector is a device which is optimized for running inferences efficiently to detect objects. Using a recommended detector means there will be less latency between detections and more detections can be run per second. Frigate is designed around the expectation that a detector is used to achieve very low inference speeds. Offloading TensorFlow to a detector is an order of magnitude faster and will reduce your CPU load dramatically. As of 0.12, Frigate supports a handful of different detector types with varying inference speeds and performance.
A detector is a device which is optimized for running inferences efficiently to detect objects. Using a recommended detector means there will be less latency between detections and more detections can be run per second. Frigate is designed around the expectation that a detector is used to achieve very low inference speeds. Offloading TensorFlow to a detector is an order of magnitude faster and will reduce your CPU load dramatically.
:::info
Frigate supports multiple different detectors that work on different types of hardware:
**Most Hardware**
- [Hailo](#hailo-8): The Hailo8 and Hailo8L AI Acceleration module is available in m.2 format with a HAT for RPi devices offering a wide range of compatibility with devices.
- [Supports many model architectures](../../configuration/object_detectors#configuration)
- Runs best with tiny or small size models
- [Google Coral EdgeTPU](#google-coral-tpu): The Google Coral EdgeTPU is available in USB and m.2 format allowing for a wide range of compatibility with devices.
- [Supports primarily ssdlite and mobilenet model architectures](../../configuration/object_detectors#edge-tpu-detector)
**AMD**
- [ROCm](#amd-gpus): ROCm can run on AMD Discrete GPUs to provide efficient object detection
- [Supports limited model architectures](../../configuration/object_detectors#supported-models-1)
- Runs best on discrete AMD GPUs
**Intel**
- [OpenVino](#openvino): OpenVino can run on Intel Arc GPUs, Intel integrated GPUs, and Intel CPUs to provide efficient object detection.
- [Supports majority of model architectures](../../configuration/object_detectors#supported-models)
- Runs best with tiny, small, or medium models
**Nvidia**
- [TensortRT](#tensorrt---nvidia-gpu): TensorRT can run on Nvidia GPUs and Jetson devices.
- [Supports majority of model architectures via ONNX](../../configuration/object_detectors#supported-models-2)
- Runs well with any size models including large
**Rockchip**
- [RKNN](#rockchip-platform): RKNN models can run on Rockchip devices with included NPUs to provide efficient object detection.
- [Supports limited model architectures](../../configuration/object_detectors#choosing-a-model)
- Runs best with tiny or small size models
- Runs efficiently on low power hardware
:::
### Hailo-8
Frigate supports both the Hailo-8 and Hailo-8L AI Acceleration Modules on compatible hardware platforms—including the Raspberry Pi 5 with the PCIe hat from the AI kit. The Hailo detector integration in Frigate automatically identifies your hardware type and selects the appropriate default model when a custom model isnt provided.
**Default Model Configuration:**
- **Hailo-8L:** Default model is **YOLOv6n**.
- **Hailo-8:** Default model is **YOLOv6n**.
In real-world deployments, even with multiple cameras running concurrently, Frigate has demonstrated consistent performance. Testing on x86 platforms—with dual PCIe lanes—yields further improvements in FPS, throughput, and latency compared to the Raspberry Pi setup.
| Name | Hailo8 Inference Time | Hailo8L Inference Time |
| ---------------- | ---------------------- | ----------------------- |
| ssd mobilenet v1 | ~ 6 ms | ~ 10 ms |
| yolov6n | ~ 7 ms | ~ 11 ms |
### Google Coral TPU
It is strongly recommended to use a Google Coral. A $60 device will outperform $2000 CPU. Frigate should work with any supported Coral device from https://coral.ai
The USB version is compatible with the widest variety of hardware and does not require a driver on the host machine. However, it does lack the automatic throttling features of the other versions.
The PCIe and M.2 versions require installation of a driver on the host. Follow the instructions for your version from https://coral.ai
Frigate supports both the USB and M.2 versions of the Google Coral.
- The USB version is compatible with the widest variety of hardware and does not require a driver on the host machine. However, it does lack the automatic throttling features of the other versions.
- The PCIe and M.2 versions require installation of a driver on the host. Follow the instructions for your version from https://coral.ai
A single Coral can handle many cameras using the default model and will be sufficient for the majority of users. You can calculate the maximum performance of your Coral based on the inference speed reported by Frigate. With an inference speed of 10, your Coral will top out at `1000/10=100`, or 100 frames per second. If your detection fps is regularly getting close to that, you should first consider tuning motion masks. If those are already properly configured, a second Coral may be needed.
@ -92,11 +146,9 @@ Inference speeds will vary greatly depending on the GPU and the model used.
With the [rocm](../configuration/object_detectors.md#amdrocm-gpu-detector) detector Frigate can take advantage of many discrete AMD GPUs.
### Hailo-8l PCIe
Frigate supports the Hailo-8l M.2 card on any hardware but currently it is only tested on the Raspberry Pi5 PCIe hat from the AI kit.
The inference time for the Hailo-8L chip at time of writing is around 17-21 ms for the SSD MobileNet Version 1 model.
| Name | YoloV9 Inference Time | YOLO-NAS Inference Time |
| --------------- | --------------------- | ------------------------- |
| AMD 780M | ~ 14 ms | ~ 60 ms |
## Community Supported Detectors

View File

@ -6,7 +6,7 @@ slug: /
A complete and local NVR designed for Home Assistant with AI object detection. Uses OpenCV and Tensorflow to perform realtime object detection locally for IP cameras.
Use of a [Google Coral Accelerator](https://coral.ai/products/) is optional, but strongly recommended. CPU detection should only be used for testing purposes. The Coral will outperform even the best CPUs and can process 100+ FPS with very little overhead.
Use of a [Recommended Detector](/frigate/hardware#detectors) is optional, but strongly recommended. CPU detection should only be used for testing purposes.
- Tight integration with Home Assistant via a [custom component](https://github.com/blakeblackshear/frigate-hass-integration)
- Designed to minimize resource use and maximize performance by only looking for objects when and where it is necessary

View File

@ -100,9 +100,9 @@ By default, the Raspberry Pi limits the amount of memory available to the GPU. I
Additionally, the USB Coral draws a considerable amount of power. If using any other USB devices such as an SSD, you will experience instability due to the Pi not providing enough power to USB devices. You will need to purchase an external USB hub with it's own power supply. Some have reported success with <a href="https://amzn.to/3a2mH0P" target="_blank" rel="nofollow noopener sponsored">this</a> (affiliate link).
### Hailo-8L
### Hailo-8
The Hailo-8L is an M.2 card typically connected to a carrier board for PCIe, which then connects to the Raspberry Pi 5 as part of the AI Kit. However, it can also be used on other boards equipped with an M.2 M key edge connector.
The Hailo-8 and Hailo-8L AI accelerators are available in both M.2 and HAT form factors for the Raspberry Pi. The M.2 version typically connects to a carrier board for PCIe, which then interfaces with the Raspberry Pi 5 as part of the AI Kit. The HAT version can be mounted directly onto compatible Raspberry Pi models. Both form factors have been successfully tested on x86 platforms as well, making them versatile options for various computing environments.
#### Installation

View File

@ -3,6 +3,8 @@
import datetime
import logging
import os
import random
import string
from functools import reduce
from pathlib import Path
from urllib.parse import unquote
@ -43,9 +45,8 @@ from frigate.api.defs.tags import Tags
from frigate.comms.event_metadata_updater import EventMetadataTypeEnum
from frigate.const import CLIPS_DIR
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.models import Event, ReviewSegment, Timeline
from frigate.object_processing import TrackedObject, TrackedObjectProcessor
from frigate.track.object_processing import TrackedObject
from frigate.util.builtin import get_tz_modifiers
logger = logging.getLogger(__name__)
@ -1202,28 +1203,25 @@ def create_event(
status_code=404,
)
try:
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
external_processor: ExternalEventProcessor = request.app.external_processor
now = datetime.datetime.now().timestamp()
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
frame = frame_processor.get_current_frame(camera_name)
event_id = external_processor.create_manual_event(
request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
camera_name,
label,
body.source_type,
body.sub_label,
body.score,
body.duration,
event_id,
body.include_recording,
body.score,
body.sub_label,
body.duration,
body.source_type,
body.draw,
frame,
)
except Exception as e:
logger.error(e)
return JSONResponse(
content=({"success": False, "message": "An unknown error occurred"}),
status_code=500,
)
),
)
return JSONResponse(
content=(
@ -1245,7 +1243,9 @@ def create_event(
def end_event(request: Request, event_id: str, body: EventsEndBody):
try:
end_time = body.end_time or datetime.datetime.now().timestamp()
request.app.external_processor.finish_manual_event(event_id, end_time)
request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_end, (event_id, end_time)
)
except Exception:
return JSONResponse(
content=(

View File

@ -27,7 +27,6 @@ from frigate.comms.event_metadata_updater import (
)
from frigate.config import FrigateConfig
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.ptz.onvif import OnvifController
from frigate.stats.emitter import StatsEmitter
from frigate.storage import StorageMaintainer
@ -56,7 +55,6 @@ def create_fastapi_app(
detected_frames_processor,
storage_maintainer: StorageMaintainer,
onvif: OnvifController,
external_processor: ExternalEventProcessor,
stats_emitter: StatsEmitter,
event_metadata_updater: EventMetadataPublisher,
):
@ -129,7 +127,6 @@ def create_fastapi_app(
app.onvif = onvif
app.stats_emitter = stats_emitter
app.event_metadata_updater = event_metadata_updater
app.external_processor = external_processor
app.jwt_token = get_jwt_secret() if frigate_config.auth.enabled else None
return app

View File

@ -37,7 +37,7 @@ from frigate.const import (
RECORD_DIR,
)
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.object_processing import TrackedObjectProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import get_tz_modifiers
from frigate.util.image import get_image_from_recording
from frigate.util.path import get_event_thumbnail_bytes

View File

@ -43,7 +43,6 @@ from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.models import (
Event,
@ -57,7 +56,6 @@ from frigate.models import (
User,
)
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
@ -69,6 +67,7 @@ from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
@ -318,9 +317,6 @@ class FrigateApp:
# Create a client for other processes to use
self.embeddings = EmbeddingsContext(self.db)
def init_external_event_processor(self) -> None:
self.external_event_processor = ExternalEventProcessor(self.config)
def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator()
self.inter_config_updater = ConfigPublisher()
@ -657,7 +653,6 @@ class FrigateApp:
self.start_camera_capture_processes()
self.start_audio_processor()
self.start_storage_maintainer()
self.init_external_event_processor()
self.start_stats_emitter()
self.start_timeline_processor()
self.start_event_processor()
@ -676,7 +671,6 @@ class FrigateApp:
self.detected_frames_processor,
self.storage_maintainer,
self.onvif_controller,
self.external_event_processor,
self.stats_emitter,
self.event_metadata_updater,
),
@ -748,7 +742,6 @@ class FrigateApp:
self.review_segment_process.terminate()
self.review_segment_process.join()
self.external_event_processor.stop()
self.dispatcher.stop()
self.ptz_autotracker_thread.join()

464
frigate/camera/state.py Normal file
View File

@ -0,0 +1,464 @@
"""Maintains state of camera."""
import datetime
import logging
import os
import threading
from collections import defaultdict
from typing import Callable
import cv2
import numpy as np
from frigate.config import (
FrigateConfig,
ZoomingModeEnum,
)
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
SharedMemoryFrameManager,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail,
is_label_printable,
)
logger = logging.getLogger(__name__)
class CameraState:
def __init__(
self,
name,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.mask == [0])
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
# draw the bounding boxes on the frame
for obj in tracked_objects.values():
if obj["frame_time"] == frame_time:
if obj["stationary"]:
color = (220, 220, 220)
thickness = 1
else:
thickness = 2
color = self.config.model.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw thicker box around ptz autotracked object
if (
self.camera_config.onvif.autotracking.enabled
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
self.name
]
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
]
is not None
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
and obj["frame_time"] == frame_time
):
thickness = 5
color = self.config.model.colormap[obj["label"]]
# debug autotracking zooming - show the zoom factor box
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
self.camera_config.detect.height,
)
)
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
top_left = (
int(centroid_x - side_length // 2),
int(centroid_y - side_length // 2),
)
bottom_right = (
int(centroid_x + side_length // 2),
int(centroid_y + side_length // 2),
)
cv2.rectangle(
frame_copy,
top_left,
bottom_right,
(255, 255, 0),
2,
)
# draw the bounding boxes on the frame
box = obj["box"]
text = (
obj["label"]
if (
not obj.get("sub_label")
or not is_label_printable(obj["sub_label"][0])
)
else obj["sub_label"][0]
)
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
text,
f"{obj['score']:.0%} {int(obj['area'])}"
+ (
f" {float(obj['current_estimated_speed']):.1f}"
if obj["current_estimated_speed"] != 0
else ""
),
thickness=thickness,
color=color,
)
# draw any attributes
for attribute in obj["current_attributes"]:
box = attribute["box"]
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%}",
thickness=thickness,
color=color,
)
if draw_options.get("regions"):
for region in regions:
cv2.rectangle(
frame_copy,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
if draw_options.get("zones"):
for name, zone in self.camera_config.zones.items():
thickness = (
8
if any(
name in obj["current_zones"] for obj in tracked_objects.values()
)
else 2
)
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
if draw_options.get("motion_boxes"):
for m_box in motion_boxes:
cv2.rectangle(
frame_copy,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
position=self.camera_config.timestamp_style.position,
)
return frame_copy
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback)
def update(
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
removed_ids = previous_ids.difference(current_ids)
new_ids = current_ids.difference(previous_ids)
updated_ids = current_ids.intersection(previous_ids)
for id in new_ids:
new_obj = tracked_objects[id] = TrackedObject(
self.config.model,
self.camera_config,
self.config.ui,
self.frame_cache,
current_detections[id],
)
# call event handlers
for c in self.callbacks["start"]:
c(self.name, new_obj, frame_name)
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
self.frame_cache[frame_time] = np.copy(current_frame)
updated_obj.last_updated = frame_time
# if it has been more than 5 seconds since the last thumb update
# and the last update is greater than the last publish or
# the object has changed significantly
if (
frame_time - updated_obj.last_published > 5
and updated_obj.last_updated > updated_obj.last_published
) or significant_update:
# call event handlers
for c in self.callbacks["update"]:
c(self.name, updated_obj, frame_name)
updated_obj.last_published = frame_time
for id in removed_ids:
# publish events to mqtt
removed_obj = tracked_objects[id]
if "end_time" not in removed_obj.obj_data:
removed_obj.obj_data["end_time"] = frame_time
for c in self.callbacks["end"]:
c(self.name, removed_obj, frame_name)
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
"enabled": True,
"motion": len(motion_boxes) > 0,
"objects": [],
}
for obj in tracked_objects.values():
object_type = obj.obj_data["label"]
active = obj.is_active()
if not obj.false_positive:
label = object_type
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
sub_label = obj.obj_data["sub_label"][0]
camera_activity["objects"].append(
{
"id": obj.obj_data["id"],
"label": label,
"stationary": not active,
"area": obj.obj_data["area"],
"ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"],
"sub_label": sub_label,
"current_zones": obj.current_zones,
}
)
# if we don't have access to the current frame or
# if the object's thumbnail is not from the current frame, skip
if (
current_frame is None
or obj.thumbnail_data is None
or obj.false_positive
or obj.thumbnail_data["frame_time"] != frame_time
):
continue
if object_type in self.best_objects:
current_best = self.best_objects[object_type]
now = datetime.datetime.now().timestamp()
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
):
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
else:
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity)
# cleanup thumbnail frame cache
current_thumb_frames = {
obj.thumbnail_data["frame_time"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
}
thumb_frames_to_delete = [
t
for t in self.frame_cache.keys()
if t not in current_thumb_frames and t not in current_best_frames
]
for t in thumb_frames_to_delete:
del self.frame_cache[t]
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.motion_boxes = motion_boxes
self.regions = regions
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = np.copy(current_frame)
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_name
def save_manual_event_image(
self, event_id: str, label: str, draw: dict[str, list[dict]]
) -> None:
img_frame = self.get_current_frame()
# write clean snapshot if enabled
if self.camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{self.camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = int(box["box"][0] * self.camera_config.detect.width)
y = int(box["box"][1] * self.camera_config.detect.height)
width = int(box["box"][2] * self.camera_config.detect.width)
height = int(box["box"][3] * self.camera_config.detect.height)
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{self.camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
cv2.imwrite(
os.path.join(THUMB_DIR, self.camera_config.name, f"{event_id}.webp"), thumb
)
def shutdown(self) -> None:
for obj in self.tracked_objects.values():
if not obj.obj_data.get("end_time"):
obj.write_thumbnail_to_disk()

View File

@ -10,6 +10,8 @@ logger = logging.getLogger(__name__)
class EventMetadataTypeEnum(str, Enum):
all = ""
manual_event_create = "manual_event_create"
manual_event_end = "manual_event_end"
regenerate_description = "regenerate_description"
sub_label = "sub_label"

View File

@ -67,6 +67,9 @@ class FaceRecognitionConfig(FrigateBaseModel):
save_attempts: bool = Field(
default=True, title="Save images of face detections for training."
)
blur_confidence_filter: bool = Field(
default=True, title="Apply blur quality filter to face confidence."
)
class LicensePlateRecognitionConfig(FrigateBaseModel):

View File

@ -37,3 +37,5 @@ class LoggerConfig(FrigateBaseModel):
for log, level in log_levels.items():
logging.getLogger(log).setLevel(level.value.upper())
return self

View File

@ -192,6 +192,22 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
image, M, (output_width, output_height), flags=cv2.INTER_CUBIC
)
def __get_blur_factor(self, input: np.ndarray) -> float:
"""Calculates the factor for the confidence based on the blur of the image."""
if not self.face_config.blur_confidence_filter:
return 1.0
variance = cv2.Laplacian(input, cv2.CV_64F).var()
if variance < 60: # image is very blurry
return 0.96
elif variance < 70: # image moderately blurry
return 0.98
elif variance < 80: # image is slightly blurry
return 0.99
else:
return 1.0
def __clear_classifier(self) -> None:
self.face_recognizer = None
self.label_map = {}
@ -232,14 +248,21 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
if not self.recognizer:
return None
# face recognition is best run on grayscale images
img = cv2.cvtColor(face_image, cv2.COLOR_BGR2GRAY)
# get blur factor before aligning face
blur_factor = self.__get_blur_factor(img)
logger.debug(f"face detected with bluriness {blur_factor}")
# align face and run recognition
img = self.__align_face(img, img.shape[1], img.shape[0])
index, distance = self.recognizer.predict(img)
if index == -1:
return None
score = 1.0 - (distance / 1000)
score = (1.0 - (distance / 1000)) * blur_factor
return self.label_map[index], round(score, 2)
def __update_metrics(self, duration: float) -> None:

View File

@ -38,6 +38,7 @@ class ModelTypeEnum(str, Enum):
yolov9 = "yolov9"
yolonas = "yolonas"
dfine = "dfine"
yologeneric = "yolo-generic"
class ModelConfig(BaseModel):

616
frigate/detectors/plugins/hailo8l.py Normal file → Executable file
View File

@ -1,286 +1,450 @@
import logging
import os
import queue
import subprocess
import threading
import urllib.request
from functools import partial
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
try:
from hailo_platform import (
HEF,
ConfigureParams,
FormatType,
HailoRTException,
HailoStreamInterface,
InferVStreams,
InputVStreamParams,
OutputVStreamParams,
HailoSchedulingAlgorithm,
VDevice,
)
except ModuleNotFoundError:
pass
from pydantic import BaseModel, Field
from pydantic import Field
from typing_extensions import Literal
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig
from frigate.detectors.detector_config import (
BaseDetectorConfig,
)
# Set up logging
logger = logging.getLogger(__name__)
# Define the detector key for Hailo
# ----------------- ResponseStore Class ----------------- #
class ResponseStore:
"""
A thread-safe hash-based response store that maps request IDs
to their results. Threads can wait on the condition variable until
their request's result appears.
"""
def __init__(self):
self.responses = {} # Maps request_id -> (original_input, infer_results)
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
def put(self, request_id, response):
with self.cond:
self.responses[request_id] = response
self.cond.notify_all()
def get(self, request_id, timeout=None):
with self.cond:
if not self.cond.wait_for(
lambda: request_id in self.responses, timeout=timeout
):
raise TimeoutError(f"Timeout waiting for response {request_id}")
return self.responses.pop(request_id)
# ----------------- Utility Functions ----------------- #
def preprocess_tensor(image: np.ndarray, model_w: int, model_h: int) -> np.ndarray:
"""
Resize an image with unchanged aspect ratio using padding.
Assumes input image shape is (H, W, 3).
"""
if image.ndim == 4 and image.shape[0] == 1:
image = image[0]
h, w = image.shape[:2]
if (w, h) == (320, 320) and (model_w, model_h) == (640, 640):
return cv2.resize(image, (model_w, model_h), interpolation=cv2.INTER_LINEAR)
scale = min(model_w / w, model_h / h)
new_w, new_h = int(w * scale), int(h * scale)
resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
padded_image = np.full((model_h, model_w, 3), 114, dtype=image.dtype)
x_offset = (model_w - new_w) // 2
y_offset = (model_h - new_h) // 2
padded_image[y_offset : y_offset + new_h, x_offset : x_offset + new_w] = (
resized_image
)
return padded_image
# ----------------- Global Constants ----------------- #
DETECTOR_KEY = "hailo8l"
ARCH = None
H8_DEFAULT_MODEL = "yolov6n.hef"
H8L_DEFAULT_MODEL = "yolov6n.hef"
H8_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8/yolov6n.hef"
H8L_DEFAULT_URL = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.14.0/hailo8l/yolov6n.hef"
# Configuration class for model settings
class ModelConfig(BaseModel):
path: str = Field(default=None, title="Model Path") # Path to the HEF file
def detect_hailo_arch():
try:
result = subprocess.run(
["hailortcli", "fw-control", "identify"], capture_output=True, text=True
)
if result.returncode != 0:
logger.error(f"Inference error: {result.stderr}")
return None
for line in result.stdout.split("\n"):
if "Device Architecture" in line:
if "HAILO8L" in line:
return "hailo8l"
elif "HAILO8" in line:
return "hailo8"
logger.error("Inference error: Could not determine Hailo architecture.")
return None
except Exception as e:
logger.error(f"Inference error: {e}")
return None
# Configuration class for Hailo detector
class HailoDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY] # Type of the detector
device: str = Field(default="PCIe", title="Device Type") # Device type (e.g., PCIe)
# ----------------- HailoAsyncInference Class ----------------- #
class HailoAsyncInference:
def __init__(
self,
hef_path: str,
input_queue: queue.Queue,
output_store: ResponseStore,
batch_size: int = 1,
input_type: Optional[str] = None,
output_type: Optional[Dict[str, str]] = None,
send_original_frame: bool = False,
) -> None:
self.input_queue = input_queue
self.output_store = output_store
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
self.hef = HEF(hef_path)
self.target = VDevice(params)
self.infer_model = self.target.create_infer_model(hef_path)
self.infer_model.set_batch_size(batch_size)
if input_type is not None:
self._set_input_type(input_type)
if output_type is not None:
self._set_output_type(output_type)
self.output_type = output_type
self.send_original_frame = send_original_frame
def _set_input_type(self, input_type: Optional[str] = None) -> None:
self.infer_model.input().set_format_type(getattr(FormatType, input_type))
def _set_output_type(
self, output_type_dict: Optional[Dict[str, str]] = None
) -> None:
for output_name, output_type in output_type_dict.items():
self.infer_model.output(output_name).set_format_type(
getattr(FormatType, output_type)
)
def callback(
self,
completion_info,
bindings_list: List,
input_batch: List,
request_ids: List[int],
):
if completion_info.exception:
logger.error(f"Inference error: {completion_info.exception}")
else:
for i, bindings in enumerate(bindings_list):
if len(bindings._output_names) == 1:
result = bindings.output().get_buffer()
else:
result = {
name: np.expand_dims(bindings.output(name).get_buffer(), axis=0)
for name in bindings._output_names
}
self.output_store.put(request_ids[i], (input_batch[i], result))
def _create_bindings(self, configured_infer_model) -> object:
if self.output_type is None:
output_buffers = {
output_info.name: np.empty(
self.infer_model.output(output_info.name).shape,
dtype=getattr(
np, str(output_info.format.type).split(".")[1].lower()
),
)
for output_info in self.hef.get_output_vstream_infos()
}
else:
output_buffers = {
name: np.empty(
self.infer_model.output(name).shape,
dtype=getattr(np, self.output_type[name].lower()),
)
for name in self.output_type
}
return configured_infer_model.create_bindings(output_buffers=output_buffers)
def get_input_shape(self) -> Tuple[int, ...]:
return self.hef.get_input_vstream_infos()[0].shape
def run(self) -> None:
with self.infer_model.configure() as configured_infer_model:
while True:
batch_data = self.input_queue.get()
if batch_data is None:
break
request_id, frame_data = batch_data
preprocessed_batch = [frame_data]
request_ids = [request_id]
input_batch = preprocessed_batch # non-send_original_frame mode
bindings_list = []
for frame in preprocessed_batch:
bindings = self._create_bindings(configured_infer_model)
bindings.input().set_buffer(np.array(frame))
bindings_list.append(bindings)
configured_infer_model.wait_for_async_ready(timeout_ms=10000)
job = configured_infer_model.run_async(
bindings_list,
partial(
self.callback,
input_batch=input_batch,
request_ids=request_ids,
bindings_list=bindings_list,
),
)
job.wait(100)
# Hailo detector class implementation
# ----------------- HailoDetector Class ----------------- #
class HailoDetector(DetectionApi):
type_key = DETECTOR_KEY # Set the type key to the Hailo detector key
type_key = DETECTOR_KEY
def __init__(self, detector_config: HailoDetectorConfig):
# Initialize device type and model path from the configuration
self.h8l_device_type = detector_config.device
self.h8l_model_path = detector_config.model.path
self.h8l_model_height = detector_config.model.height
self.h8l_model_width = detector_config.model.width
self.h8l_model_type = detector_config.model.model_type
self.h8l_tensor_format = detector_config.model.input_tensor
self.h8l_pixel_format = detector_config.model.input_pixel_format
self.model_url = "https://hailo-model-zoo.s3.eu-west-2.amazonaws.com/ModelZoo/Compiled/v2.11.0/hailo8l/ssd_mobilenet_v1.hef"
self.cache_dir = os.path.join(MODEL_CACHE_DIR, "h8l_cache")
self.expected_model_filename = "ssd_mobilenet_v1.hef"
output_type = "FLOAT32"
def __init__(self, detector_config: "HailoDetectorConfig"):
global ARCH
ARCH = detect_hailo_arch()
self.cache_dir = MODEL_CACHE_DIR
self.device_type = detector_config.device
self.model_height = (
detector_config.model.height
if hasattr(detector_config.model, "height")
else None
)
self.model_width = (
detector_config.model.width
if hasattr(detector_config.model, "width")
else None
)
self.model_type = (
detector_config.model.model_type
if hasattr(detector_config.model, "model_type")
else None
)
self.tensor_format = (
detector_config.model.input_tensor
if hasattr(detector_config.model, "input_tensor")
else None
)
self.pixel_format = (
detector_config.model.input_pixel_format
if hasattr(detector_config.model, "input_pixel_format")
else None
)
self.input_dtype = (
detector_config.model.input_dtype
if hasattr(detector_config.model, "input_dtype")
else None
)
self.output_type = "FLOAT32"
self.set_path_and_url(detector_config.model.path)
self.working_model_path = self.check_and_prepare()
self.batch_size = 1
self.input_queue = queue.Queue()
self.response_store = ResponseStore()
self.request_counter = 0
self.request_counter_lock = threading.Lock()
logger.info(f"Initializing Hailo device as {self.h8l_device_type}")
self.check_and_prepare_model()
try:
# Validate device type
if self.h8l_device_type not in ["PCIe", "M.2"]:
raise ValueError(f"Unsupported device type: {self.h8l_device_type}")
# Initialize the Hailo device
self.target = VDevice()
# Load the HEF (Hailo's binary format for neural networks)
self.hef = HEF(self.h8l_model_path)
# Create configuration parameters from the HEF
self.configure_params = ConfigureParams.create_from_hef(
hef=self.hef, interface=HailoStreamInterface.PCIe
logger.debug(f"[INIT] Loading HEF model from {self.working_model_path}")
self.inference_engine = HailoAsyncInference(
self.working_model_path,
self.input_queue,
self.response_store,
self.batch_size,
)
# Configure the device with the HEF
self.network_groups = self.target.configure(self.hef, self.configure_params)
self.network_group = self.network_groups[0]
self.network_group_params = self.network_group.create_params()
# Create input and output virtual stream parameters
self.input_vstream_params = InputVStreamParams.make(
self.network_group,
format_type=self.hef.get_input_vstream_infos()[0].format.type,
self.input_shape = self.inference_engine.get_input_shape()
logger.debug(f"[INIT] Model input shape: {self.input_shape}")
self.inference_thread = threading.Thread(
target=self.inference_engine.run, daemon=True
)
self.output_vstream_params = OutputVStreamParams.make(
self.network_group, format_type=getattr(FormatType, output_type)
)
# Get input and output stream information from the HEF
self.input_vstream_info = self.hef.get_input_vstream_infos()
self.output_vstream_info = self.hef.get_output_vstream_infos()
logger.info("Hailo device initialized successfully")
logger.debug(f"[__init__] Model Path: {self.h8l_model_path}")
logger.debug(f"[__init__] Input Tensor Format: {self.h8l_tensor_format}")
logger.debug(f"[__init__] Input Pixel Format: {self.h8l_pixel_format}")
logger.debug(f"[__init__] Input VStream Info: {self.input_vstream_info[0]}")
logger.debug(
f"[__init__] Output VStream Info: {self.output_vstream_info[0]}"
)
except HailoRTException as e:
logger.error(f"HailoRTException during initialization: {e}")
raise
self.inference_thread.start()
except Exception as e:
logger.error(f"Failed to initialize Hailo device: {e}")
logger.error(f"[INIT] Failed to initialize HailoAsyncInference: {e}")
raise
def check_and_prepare_model(self):
# Ensure cache directory exists
def set_path_and_url(self, path: str = None):
if not path:
self.model_path = None
self.url = None
return
if self.is_url(path):
self.url = path
self.model_path = None
else:
self.model_path = path
self.url = None
def is_url(self, url: str) -> bool:
return (
url.startswith("http://")
or url.startswith("https://")
or url.startswith("www.")
)
@staticmethod
def extract_model_name(path: str = None, url: str = None) -> str:
if path and path.endswith(".hef"):
return os.path.basename(path)
elif url and url.endswith(".hef"):
return os.path.basename(url)
else:
if ARCH == "hailo8":
return H8_DEFAULT_MODEL
else:
return H8L_DEFAULT_MODEL
@staticmethod
def download_model(url: str, destination: str):
if not url.endswith(".hef"):
raise ValueError("Invalid model URL. Only .hef files are supported.")
try:
urllib.request.urlretrieve(url, destination)
logger.debug(f"Downloaded model to {destination}")
except Exception as e:
raise RuntimeError(f"Failed to download model from {url}: {str(e)}")
def check_and_prepare(self) -> str:
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
model_name = self.extract_model_name(self.model_path, self.url)
cached_model_path = os.path.join(self.cache_dir, model_name)
if not self.model_path and not self.url:
if os.path.exists(cached_model_path):
logger.debug(f"Model found in cache: {cached_model_path}")
return cached_model_path
else:
logger.debug(f"Downloading default model: {model_name}")
if ARCH == "hailo8":
self.download_model(H8_DEFAULT_URL, cached_model_path)
else:
self.download_model(H8L_DEFAULT_URL, cached_model_path)
elif self.url:
logger.debug(f"Downloading model from URL: {self.url}")
self.download_model(self.url, cached_model_path)
elif self.model_path:
if os.path.exists(self.model_path):
logger.debug(f"Using existing model at: {self.model_path}")
return self.model_path
else:
raise FileNotFoundError(f"Model file not found at: {self.model_path}")
return cached_model_path
# Check for the expected model file
model_file_path = os.path.join(self.cache_dir, self.expected_model_filename)
if not os.path.isfile(model_file_path):
logger.info(
f"A model file was not found at {model_file_path}, Downloading one from {self.model_url}."
)
urllib.request.urlretrieve(self.model_url, model_file_path)
logger.info(f"A model file was downloaded to {model_file_path}.")
else:
logger.info(
f"A model file already exists at {model_file_path} not downloading one."
)
def _get_request_id(self) -> int:
with self.request_counter_lock:
request_id = self.request_counter
self.request_counter += 1
if self.request_counter > 1000000:
self.request_counter = 0
return request_id
def detect_raw(self, tensor_input):
logger.debug("[detect_raw] Entering function")
logger.debug(
f"[detect_raw] The `tensor_input` = {tensor_input} tensor_input shape = {tensor_input.shape}"
)
request_id = self._get_request_id()
if tensor_input is None:
raise ValueError(
"[detect_raw] The 'tensor_input' argument must be provided"
)
# Ensure tensor_input is a numpy array
if isinstance(tensor_input, list):
tensor_input = np.array(tensor_input)
logger.debug(
f"[detect_raw] Converted tensor_input to numpy array: shape {tensor_input.shape}"
)
input_data = tensor_input
logger.debug(
f"[detect_raw] Input data for inference shape: {tensor_input.shape}, dtype: {tensor_input.dtype}"
)
tensor_input = self.preprocess(tensor_input)
if isinstance(tensor_input, np.ndarray) and len(tensor_input.shape) == 3:
tensor_input = np.expand_dims(tensor_input, axis=0)
self.input_queue.put((request_id, tensor_input))
try:
with InferVStreams(
self.network_group,
self.input_vstream_params,
self.output_vstream_params,
) as infer_pipeline:
input_dict = {}
if isinstance(input_data, dict):
input_dict = input_data
logger.debug("[detect_raw] it a dictionary.")
elif isinstance(input_data, (list, tuple)):
for idx, layer_info in enumerate(self.input_vstream_info):
input_dict[layer_info.name] = input_data[idx]
logger.debug("[detect_raw] converted from list/tuple.")
else:
if len(input_data.shape) == 3:
input_data = np.expand_dims(input_data, axis=0)
logger.debug("[detect_raw] converted from an array.")
input_dict[self.input_vstream_info[0].name] = input_data
original_input, infer_results = self.response_store.get(
request_id, timeout=10.0
)
except TimeoutError:
logger.error(
f"Timeout waiting for inference results for request {request_id}"
)
return np.zeros((20, 6), dtype=np.float32)
logger.debug(
f"[detect_raw] Input dictionary for inference keys: {input_dict.keys()}"
)
if isinstance(infer_results, list) and len(infer_results) == 1:
infer_results = infer_results[0]
with self.network_group.activate(self.network_group_params):
raw_output = infer_pipeline.infer(input_dict)
logger.debug(f"[detect_raw] Raw inference output: {raw_output}")
if self.output_vstream_info[0].name not in raw_output:
logger.error(
f"[detect_raw] Missing output stream {self.output_vstream_info[0].name} in inference results"
)
return np.zeros((20, 6), np.float32)
raw_output = raw_output[self.output_vstream_info[0].name][0]
logger.debug(
f"[detect_raw] Raw output for stream {self.output_vstream_info[0].name}: {raw_output}"
)
# Process the raw output
detections = self.process_detections(raw_output)
if len(detections) == 0:
logger.debug(
"[detect_raw] No detections found after processing. Setting default values."
)
return np.zeros((20, 6), np.float32)
else:
formatted_detections = detections
if (
formatted_detections.shape[1] != 6
): # Ensure the formatted detections have 6 columns
logger.error(
f"[detect_raw] Unexpected shape for formatted detections: {formatted_detections.shape}. Expected (20, 6)."
)
return np.zeros((20, 6), np.float32)
return formatted_detections
except HailoRTException as e:
logger.error(f"[detect_raw] HailoRTException during inference: {e}")
return np.zeros((20, 6), np.float32)
except Exception as e:
logger.error(f"[detect_raw] Exception during inference: {e}")
return np.zeros((20, 6), np.float32)
finally:
logger.debug("[detect_raw] Exiting function")
def process_detections(self, raw_detections, threshold=0.5):
boxes, scores, classes = [], [], []
num_detections = 0
logger.debug(f"[process_detections] Raw detections: {raw_detections}")
for i, detection_set in enumerate(raw_detections):
threshold = 0.4
all_detections = []
for class_id, detection_set in enumerate(infer_results):
if not isinstance(detection_set, np.ndarray) or detection_set.size == 0:
logger.debug(
f"[process_detections] Detection set {i} is empty or not an array, skipping."
)
continue
logger.debug(
f"[process_detections] Detection set {i} shape: {detection_set.shape}"
)
for detection in detection_set:
if detection.shape[0] == 0:
logger.debug(
f"[process_detections] Detection in set {i} is empty, skipping."
)
for det in detection_set:
if det.shape[0] < 5:
continue
ymin, xmin, ymax, xmax = detection[:4]
score = np.clip(detection[4], 0, 1) # Use np.clip for clarity
score = float(det[4])
if score < threshold:
logger.debug(
f"[process_detections] Detection in set {i} has a score {score} below threshold {threshold}. Skipping."
)
continue
all_detections.append([class_id, score, det[0], det[1], det[2], det[3]])
logger.debug(
f"[process_detections] Adding detection with coordinates: ({xmin}, {ymin}), ({xmax}, {ymax}) and score: {score}"
)
boxes.append([ymin, xmin, ymax, xmax])
scores.append(score)
classes.append(i)
num_detections += 1
if len(all_detections) == 0:
detections_array = np.zeros((20, 6), dtype=np.float32)
else:
detections_array = np.array(all_detections, dtype=np.float32)
if detections_array.shape[0] > 20:
detections_array = detections_array[:20, :]
elif detections_array.shape[0] < 20:
pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32)
detections_array = np.vstack((detections_array, pad))
logger.debug(
f"[process_detections] Boxes: {boxes}, Scores: {scores}, Classes: {classes}, Num detections: {num_detections}"
)
return detections_array
if num_detections == 0:
logger.debug("[process_detections] No valid detections found.")
return np.zeros((20, 6), np.float32)
combined = np.hstack(
(
np.array(classes)[:, np.newaxis],
np.array(scores)[:, np.newaxis],
np.array(boxes),
def preprocess(self, image):
if isinstance(image, np.ndarray):
processed = preprocess_tensor(
image, self.input_shape[1], self.input_shape[0]
)
)
return np.expand_dims(processed, axis=0)
else:
raise ValueError("Unsupported image format for preprocessing")
if combined.shape[0] < 20:
padding = np.zeros(
(20 - combined.shape[0], combined.shape[1]), dtype=combined.dtype
)
combined = np.vstack((combined, padding))
def close(self):
"""Properly shuts down the inference engine and releases the VDevice."""
logger.debug("[CLOSE] Closing HailoDetector")
try:
if hasattr(self, "inference_engine"):
if hasattr(self.inference_engine, "target"):
self.inference_engine.target.release()
logger.debug("Hailo VDevice released successfully")
except Exception as e:
logger.error(f"Failed to close Hailo device: {e}")
raise
logger.debug(
f"[process_detections] Combined detections (padded to 20 if necessary): {np.array_str(combined, precision=4, suppress_small=True)}"
)
def __del__(self):
"""Destructor to ensure cleanup when the object is deleted."""
self.close()
return combined[:20, :6]
# ----------------- HailoDetectorConfig Class ----------------- #
class HailoDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default="PCIe", title="Device Type")

View File

@ -2,17 +2,22 @@
import datetime
import logging
import random
import string
import threading
import time
from typing import Tuple
import numpy as np
import requests
import frigate.util as util
from frigate.camera import CameraMetrics
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig
from frigate.const import (
@ -21,7 +26,6 @@ from frigate.const import (
AUDIO_MAX_BIT_RANGE,
AUDIO_MIN_CONFIDENCE,
AUDIO_SAMPLE_RATE,
FRIGATE_LOCALHOST,
)
from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe
@ -139,6 +143,7 @@ class AudioEventMaintainer(threading.Thread):
f"config/enabled/{camera.name}", True
)
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio)
self.event_metadata_publisher = EventMetadataPublisher()
self.was_enabled = camera.enabled
@ -207,24 +212,33 @@ class AudioEventMaintainer(threading.Thread):
datetime.datetime.now().timestamp()
)
else:
now = datetime.datetime.now().timestamp()
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.requestor.send_data(f"{self.config.name}/audio/{label}", "ON")
resp = requests.post(
f"{FRIGATE_LOCALHOST}/api/events/{self.config.name}/{label}/create",
json={"duration": None, "score": score, "source_type": "audio"},
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
self.config.name,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
),
)
if resp.status_code == 200:
event_id = resp.json()["event_id"]
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": datetime.datetime.now().timestamp(),
}
else:
self.logger.warning(
f"Failed to create audio event with status code {resp.status_code}"
)
self.detections[label] = {
"id": event_id,
"label": label,
"last_detection": now,
}
def expire_detections(self) -> None:
now = datetime.datetime.now().timestamp()
@ -241,17 +255,11 @@ class AudioEventMaintainer(threading.Thread):
f"{self.config.name}/audio/{detection['label']}", "OFF"
)
resp = requests.put(
f"{FRIGATE_LOCALHOST}/api/events/{detection['id']}/end",
json={"end_time": detection["last_detection"]},
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], detection["last_detection"]),
)
if resp.status_code == 200:
self.detections[detection["label"]] = None
else:
self.logger.warning(
f"Failed to end audio event {detection['id']} with status code {resp.status_code}"
)
self.detections[detection["label"]] = None
def expire_all_detections(self) -> None:
"""Immediately end all current detections"""
@ -259,16 +267,11 @@ class AudioEventMaintainer(threading.Thread):
for label, detection in list(self.detections.items()):
if detection:
self.requestor.send_data(f"{self.config.name}/audio/{label}", "OFF")
resp = requests.put(
f"{FRIGATE_LOCALHOST}/api/events/{detection['id']}/end",
json={"end_time": now},
self.event_metadata_publisher.publish(
EventMetadataTypeEnum.manual_event_end,
(detection["id"], now),
)
if resp.status_code == 200:
self.detections[label] = None
else:
self.logger.warning(
f"Failed to end audio event {detection['id']} with status code {resp.status_code}"
)
self.detections[label] = None
def start_or_restart_ffmpeg(self) -> None:
self.audio_listener = start_or_restart_ffmpeg(

View File

@ -1,187 +0,0 @@
"""Handle external events created by the user."""
import datetime
import logging
import os
import random
import string
from enum import Enum
from typing import Optional
import cv2
from numpy import ndarray
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.events_updater import EventUpdatePublisher
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.util.image import draw_box_with_label
logger = logging.getLogger(__name__)
class ManualEventState(str, Enum):
complete = "complete"
start = "start"
end = "end"
class ExternalEventProcessor:
def __init__(self, config: FrigateConfig) -> None:
self.config = config
self.default_thumbnail = None
self.event_sender = EventUpdatePublisher()
self.detection_updater = DetectionPublisher(DetectionTypeEnum.api)
self.event_camera = {}
def create_manual_event(
self,
camera: str,
label: str,
source_type: str,
sub_label: Optional[str],
score: int,
duration: Optional[int],
include_recording: bool,
draw: dict[str, any],
snapshot_frame: Optional[ndarray],
) -> str:
now = datetime.datetime.now().timestamp()
camera_config = self.config.cameras.get(camera)
# create event id and start frame time
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
self._write_images(camera_config, label, event_id, draw, snapshot_frame)
end = now + duration if duration is not None else None
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera,
"start_time": now - camera_config.record.event_pre_capture,
"end_time": end,
"has_clip": camera_config.record.enabled and include_recording,
"has_snapshot": True,
"type": source_type,
},
)
)
if source_type == "api":
self.event_camera[event_id] = camera
self.detection_updater.publish(
(
camera,
now,
{
"state": (
ManualEventState.complete if end else ManualEventState.start
),
"label": f"{label}: {sub_label}" if sub_label else label,
"event_id": event_id,
"end_time": end,
},
)
)
return event_id
def finish_manual_event(self, event_id: str, end_time: float) -> None:
"""Finish external event with indeterminate duration."""
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.end,
None,
"",
{"id": event_id, "end_time": end_time},
)
)
if event_id in self.event_camera:
self.detection_updater.publish(
(
self.event_camera[event_id],
end_time,
{
"state": ManualEventState.end,
"event_id": event_id,
"end_time": end_time,
},
)
)
self.event_camera.pop(event_id)
def _write_images(
self,
camera_config: CameraConfig,
label: str,
event_id: str,
draw: dict[str, any],
img_frame: Optional[ndarray],
) -> None:
if img_frame is None:
return
# write clean snapshot if enabled
if camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = int(box["box"][0] * camera_config.detect.width)
y = int(box["box"][1] * camera_config.detect.height)
width = int(box["box"][2] * camera_config.detect.width)
height = int(box["box"][3] * camera_config.detect.height)
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
cv2.imwrite(
os.path.join(THUMB_DIR, camera_config.name, f"{event_id}.webp"), thumb
)
def stop(self):
self.event_sender.stop()
self.detection_updater.stop()

View File

@ -22,7 +22,7 @@ from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_encode,
)
from frigate.models import Previews
from frigate.object_processing import TrackedObject
from frigate.track.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop
logger = logging.getLogger(__name__)

View File

@ -23,10 +23,9 @@ from frigate.const import (
CLIPS_DIR,
UPSERT_REVIEW_SEGMENT,
)
from frigate.events.external import ManualEventState
from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
from frigate.review.types import SeverityEnum
from frigate.track.object_processing import ManualEventState, TrackedObject
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop
logger = logging.getLogger(__name__)

View File

@ -117,7 +117,6 @@ class BaseTestHttp(unittest.TestCase):
None,
None,
None,
None,
stats,
None,
)

View File

@ -122,7 +122,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
@ -144,7 +143,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
bad_id = "654321.other"
@ -165,7 +163,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
@ -188,7 +185,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"
@ -215,7 +211,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
morning_id = "123456.random"
evening_id = "654321.random"
@ -254,7 +249,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
mock_event_updater,
)
id = "123456.random"
@ -300,7 +294,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
mock_event_updater,
)
id = "123456.random"
@ -334,7 +327,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
with TestClient(app) as client:
@ -352,7 +344,6 @@ class TestHttp(unittest.TestCase):
None,
None,
None,
None,
)
id = "123456.random"

View File

@ -4,13 +4,13 @@ import logging
import queue
import threading
from collections import defaultdict
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable, Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.camera.state import CameraState
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher
@ -25,406 +25,20 @@ from frigate.config import (
FrigateConfig,
RecordConfig,
SnapshotsConfig,
ZoomingModeEnum,
)
from frigate.const import UPDATE_CAMERA_ACTIVITY
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event, Timeline
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
SharedMemoryFrameManager,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail,
is_label_printable,
)
from frigate.util.image import SharedMemoryFrameManager
logger = logging.getLogger(__name__)
# Maintains the state of a camera
class CameraState:
def __init__(
self,
name,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.mask == [0])
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
# draw the bounding boxes on the frame
for obj in tracked_objects.values():
if obj["frame_time"] == frame_time:
if obj["stationary"]:
color = (220, 220, 220)
thickness = 1
else:
thickness = 2
color = self.config.model.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw thicker box around ptz autotracked object
if (
self.camera_config.onvif.autotracking.enabled
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
self.name
]
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
]
is not None
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
and obj["frame_time"] == frame_time
):
thickness = 5
color = self.config.model.colormap[obj["label"]]
# debug autotracking zooming - show the zoom factor box
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
self.camera_config.detect.height,
)
)
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
top_left = (
int(centroid_x - side_length // 2),
int(centroid_y - side_length // 2),
)
bottom_right = (
int(centroid_x + side_length // 2),
int(centroid_y + side_length // 2),
)
cv2.rectangle(
frame_copy,
top_left,
bottom_right,
(255, 255, 0),
2,
)
# draw the bounding boxes on the frame
box = obj["box"]
text = (
obj["label"]
if (
not obj.get("sub_label")
or not is_label_printable(obj["sub_label"][0])
)
else obj["sub_label"][0]
)
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
text,
f"{obj['score']:.0%} {int(obj['area'])}"
+ (
f" {float(obj['current_estimated_speed']):.1f}"
if obj["current_estimated_speed"] != 0
else ""
),
thickness=thickness,
color=color,
)
# draw any attributes
for attribute in obj["current_attributes"]:
box = attribute["box"]
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%}",
thickness=thickness,
color=color,
)
if draw_options.get("regions"):
for region in regions:
cv2.rectangle(
frame_copy,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
if draw_options.get("zones"):
for name, zone in self.camera_config.zones.items():
thickness = (
8
if any(
name in obj["current_zones"] for obj in tracked_objects.values()
)
else 2
)
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
if draw_options.get("motion_boxes"):
for m_box in motion_boxes:
cv2.rectangle(
frame_copy,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
position=self.camera_config.timestamp_style.position,
)
return frame_copy
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback)
def update(
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
removed_ids = previous_ids.difference(current_ids)
new_ids = current_ids.difference(previous_ids)
updated_ids = current_ids.intersection(previous_ids)
for id in new_ids:
new_obj = tracked_objects[id] = TrackedObject(
self.config.model,
self.camera_config,
self.config.ui,
self.frame_cache,
current_detections[id],
)
# call event handlers
for c in self.callbacks["start"]:
c(self.name, new_obj, frame_name)
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
self.frame_cache[frame_time] = np.copy(current_frame)
updated_obj.last_updated = frame_time
# if it has been more than 5 seconds since the last thumb update
# and the last update is greater than the last publish or
# the object has changed significantly
if (
frame_time - updated_obj.last_published > 5
and updated_obj.last_updated > updated_obj.last_published
) or significant_update:
# call event handlers
for c in self.callbacks["update"]:
c(self.name, updated_obj, frame_name)
updated_obj.last_published = frame_time
for id in removed_ids:
# publish events to mqtt
removed_obj = tracked_objects[id]
if "end_time" not in removed_obj.obj_data:
removed_obj.obj_data["end_time"] = frame_time
for c in self.callbacks["end"]:
c(self.name, removed_obj, frame_name)
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
"enabled": True,
"motion": len(motion_boxes) > 0,
"objects": [],
}
for obj in tracked_objects.values():
object_type = obj.obj_data["label"]
active = obj.is_active()
if not obj.false_positive:
label = object_type
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
sub_label = obj.obj_data["sub_label"][0]
camera_activity["objects"].append(
{
"id": obj.obj_data["id"],
"label": label,
"stationary": not active,
"area": obj.obj_data["area"],
"ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"],
"sub_label": sub_label,
"current_zones": obj.current_zones,
}
)
# if we don't have access to the current frame or
# if the object's thumbnail is not from the current frame, skip
if (
current_frame is None
or obj.thumbnail_data is None
or obj.false_positive
or obj.thumbnail_data["frame_time"] != frame_time
):
continue
if object_type in self.best_objects:
current_best = self.best_objects[object_type]
now = datetime.datetime.now().timestamp()
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
):
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
else:
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity)
# cleanup thumbnail frame cache
current_thumb_frames = {
obj.thumbnail_data["frame_time"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
}
thumb_frames_to_delete = [
t
for t in self.frame_cache.keys()
if t not in current_thumb_frames and t not in current_best_frames
]
for t in thumb_frames_to_delete:
del self.frame_cache[t]
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.motion_boxes = motion_boxes
self.regions = regions
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = np.copy(current_frame)
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_name
def shutdown(self) -> None:
for obj in self.tracked_objects.values():
if not obj.obj_data.get("end_time"):
obj.write_thumbnail_to_disk()
class ManualEventState(str, Enum):
complete = "complete"
start = "start"
end = "end"
class TrackedObjectProcessor(threading.Thread):
@ -449,14 +63,13 @@ class TrackedObjectProcessor(threading.Thread):
self.config_enabled_subscriber = ConfigSubscriber("config/enabled/")
self.requestor = InterProcessRequestor()
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all)
self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber()
self.sub_label_subscriber = EventMetadataSubscriber(
EventMetadataTypeEnum.sub_label
)
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)
self.camera_activity: dict[str, dict[str, any]] = {}
self.ongoing_manual_events: dict[str, str] = {}
# {
# 'zone_name': {
@ -677,7 +290,7 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(
self, camera: str, draw_options: dict[str, any] = {}
) -> Optional[np.ndarray]:
) -> np.ndarray | None:
if camera == "birdseye":
return self.frame_manager.get(
"birdseye",
@ -733,6 +346,96 @@ class TrackedObjectProcessor(threading.Thread):
return True
def create_manual_event(self, payload: tuple) -> None:
(
frame_time,
camera_name,
label,
event_id,
include_recording,
score,
sub_label,
duration,
source_type,
draw,
) = payload
# save the snapshot image
self.camera_states[camera_name].save_manual_event_image(event_id, label, draw)
end_time = frame_time + duration if duration is not None else None
# send event to event maintainer
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera_name,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera_name,
"start_time": frame_time
- self.config.cameras[camera_name].record.event_pre_capture,
"end_time": end_time,
"has_clip": self.config.cameras[camera_name].record.enabled
and include_recording,
"has_snapshot": True,
"type": source_type,
},
)
)
if source_type == "api":
self.ongoing_manual_events[event_id] = camera_name
self.detection_publisher.publish(
(
camera_name,
frame_time,
{
"state": (
ManualEventState.complete
if end_time
else ManualEventState.start
),
"label": f"{label}: {sub_label}" if sub_label else label,
"event_id": event_id,
"end_time": end_time,
},
),
DetectionTypeEnum.api.value,
)
def end_manual_event(self, payload: tuple) -> None:
(event_id, end_time) = payload
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.end,
None,
"",
{"id": event_id, "end_time": end_time},
)
)
if event_id in self.ongoing_manual_events:
self.detection_publisher.publish(
(
self.ongoing_manual_events[event_id],
end_time,
{
"state": ManualEventState.end,
"event_id": event_id,
"end_time": end_time,
},
),
DetectionTypeEnum.api.value,
)
self.ongoing_manual_events.pop(event_id)
def force_end_all_events(self, camera: str, camera_state: CameraState):
"""Ends all active events on camera when disabling."""
last_frame_name = camera_state.previous_frame_id
@ -792,15 +495,22 @@ class TrackedObjectProcessor(threading.Thread):
# check for sub label updates
while True:
(topic, payload) = self.sub_label_subscriber.check_for_update(
timeout=0.1
(raw_topic, payload) = self.sub_label_subscriber.check_for_update(
timeout=0
)
if not topic:
if not raw_topic:
break
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
topic = str(raw_topic)
if topic.endswith(EventMetadataTypeEnum.sub_label.value):
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
elif topic.endswith(EventMetadataTypeEnum.manual_event_create.value):
self.create_manual_event(payload)
elif topic.endswith(EventMetadataTypeEnum.manual_event_end.value):
self.end_manual_event(payload)
try:
(
@ -839,7 +549,8 @@ class TrackedObjectProcessor(threading.Thread):
tracked_objects,
motion_boxes,
regions,
)
),
DetectionTypeEnum.video.value,
)
# cleanup event finished queue

View File

@ -606,23 +606,24 @@ def process_frames(
startup_scan = True
stationary_frame_counter = 0
camera_enabled = True
region_min_size = get_min_region_size(model_config)
prev_enabled = None
while not stop_event.is_set():
_, enabled_config = enabled_config_subscriber.check_for_update()
current_enabled = (
enabled_config.enabled
if enabled_config
else (prev_enabled if prev_enabled is not None else True)
)
if prev_enabled is None:
prev_enabled = current_enabled
_, updated_enabled_config = enabled_config_subscriber.check_for_update()
if prev_enabled and not current_enabled and camera_metrics.frame_queue.empty():
if updated_enabled_config:
prev_enabled = camera_enabled
camera_enabled = updated_enabled_config.enabled
if (
not camera_enabled
and prev_enabled != camera_enabled
and camera_metrics.frame_queue.empty()
):
logger.debug(f"Camera {camera_name} disabled, clearing tracked objects")
prev_enabled = camera_enabled
# Clear norfair's dictionaries
object_tracker.tracked_objects.clear()
@ -638,9 +639,7 @@ def process_frames(
for tracker in object_tracker.default_tracker.values():
tracker.tracked_objects = []
prev_enabled = current_enabled
if not current_enabled:
if not camera_enabled:
time.sleep(0.1)
continue

View File

@ -15,8 +15,8 @@ sys.path.append("/workspace/frigate")
from frigate.config import FrigateConfig # noqa: E402
from frigate.motion import MotionDetector # noqa: E402
from frigate.object_detection import LocalObjectDetector # noqa: E402
from frigate.object_processing import CameraState # noqa: E402
from frigate.track.centroid_tracker import CentroidTracker # noqa: E402
from frigate.track.object_processing import CameraState # noqa: E402
from frigate.util import ( # noqa: E402
EventsPerSecond,
SharedMemoryFrameManager,

View File

@ -301,22 +301,6 @@ export default function LivePlayer({
player = <ActivityIndicator />;
}
// if (cameraConfig.name == "lpr")
// console.log(
// cameraConfig.name,
// "enabled",
// cameraEnabled,
// "prev enabled",
// prevCameraEnabledRef.current,
// "offline",
// offline,
// "show still",
// showStillWithoutActivity,
// "live ready",
// liveReady,
// player,
// );
return (
<div
ref={cameraRef ?? internalContainerRef}
@ -378,7 +362,9 @@ export default function LivePlayer({
{[
...new Set([
...(objects || []).map(({ label, sub_label }) =>
label.endsWith("verified") ? sub_label : label,
label.endsWith("verified")
? sub_label
: label.replaceAll("_", " "),
),
]),
]
@ -411,7 +397,7 @@ export default function LivePlayer({
/>
</div>
{offline && !showStillWithoutActivity && (
{offline && !showStillWithoutActivity && cameraEnabled && (
<div className="absolute inset-0 left-1/2 top-1/2 flex h-96 w-96 -translate-x-1/2 -translate-y-1/2">
<div className="flex flex-col items-center justify-center rounded-lg bg-background/50 p-5">
<p className="my-5 text-lg">Stream offline</p>

View File

@ -155,15 +155,20 @@ export interface CameraConfig {
record: {
enabled: boolean;
enabled_in_config: boolean;
events: {
objects: string[] | null;
alerts: {
post_capture: number;
pre_capture: number;
required_zones: string[];
retain: {
default: number;
days: number;
mode: string;
};
};
detections: {
post_capture: number;
pre_capture: number;
retain: {
days: number;
mode: string;
objects: Record<string, unknown>;
};
};
expire_interval: number;

View File

@ -1034,9 +1034,13 @@ function FrigateCameraFeatures({
setIsRecording(true);
const toastId = toast.success(
<div className="flex flex-col space-y-3">
<div className="font-semibold">{t("manualRecording.started")}</div>
{!camera.record.enabled || camera.record.retain.days == 0 ? (
<div>{t("manualRecording.recordDisabledTips")}</div>
<div className="font-semibold">
Started manual on-demand recording.
</div>
{!camera.record.enabled || camera.record.alerts.retain.days == 0 ? (
<div>
{t("manualRecording.recordDisabledTips")}
</div>
) : (
<OnDemandRetentionMessage camera={camera} />
)}