Compare commits

...

3 Commits

Author SHA1 Message Date
Josh Hawkins
c0124938b3
Tweaks (#22630)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* fix stage overlay size

* add audio filter config and load audio labels

* remove add button from object and audio labels in settings

* tests

* update classification docs

* tweak wording

* don't require restart for timestamp_style changes

* add optional i18n prefix for select widgets

* use i18n enum prefix for timestamp position

* add i18n for all presets
2026-03-25 13:14:32 -06:00
Nicolas Mowen
b1c410bc3e
Optimize more mypy classes (#22637)
* Cleanup motion mypy

* Cleanup object detection mypy

* Update output mypy

* Cleanup
2026-03-25 12:53:19 -06:00
Nicolas Mowen
80c4ce2b5d
Increase mypy coverage and fixes (#22632) 2026-03-25 09:28:48 -06:00
33 changed files with 533 additions and 275 deletions

View File

@ -102,8 +102,19 @@ If examples for some of your classes do not appear in the grid, you can continue
### Improving the Model
:::tip Diversity matters far more than volume
Selecting dozens of nearly identical images is one of the fastest ways to degrade model performance. MobileNetV2 can overfit quickly when trained on homogeneous data — the model learns what *that exact moment* looked like rather than what actually defines the class. **This is why Frigate does not implement bulk training in the UI.**
For more detail, see [Frigate Tip: Best Practices for Training Face and Custom Classification Models](https://github.com/blakeblackshear/frigate/discussions/21374).
:::
- **Start small and iterate**: Begin with a small, representative set of images per class. Models often begin working well with surprisingly few examples and improve naturally over time.
- **Favor hard examples**: When images appear in the Recent Classifications tab, prioritize images scoring below 90100% or those captured under new lighting, weather, or distance conditions.
- **Avoid bulk training similar images**: Training large batches of images that already score 100% (or close) adds little new information and increases the risk of overfitting.
- **The wizard is just the starting point**: You dont need to find and label every class upfront. Missing classes will naturally appear in Recent Classifications, and those images tend to be more valuable because they represent new conditions and edge cases.
- **Problem framing**: Keep classes visually distinct and relevant to the chosen object types.
- **Data collection**: Use the models Recent Classification tab to gather balanced examples across times of day, weather, and distances.
- **Preprocessing**: Ensure examples reflect object crops similar to Frigates boxes; keep the subject centered.
- **Labels**: Keep label names short and consistent; include a `none` class if you plan to ignore uncertain predictions for sub labels.
- **Threshold**: Tune `threshold` per model to reduce false assignments. Start at `0.8` and adjust based on validation.

View File

@ -70,10 +70,21 @@ Once some images are assigned, training will begin automatically.
### Improving the Model
:::tip Diversity matters far more than volume
Selecting dozens of nearly identical images is one of the fastest ways to degrade model performance. MobileNetV2 can overfit quickly when trained on homogeneous data — the model learns what *that exact moment* looked like rather than what actually defines the state. This often leads to models that work perfectly under the original conditions but become unstable when day turns to night, weather changes, or seasonal lighting shifts. **This is why Frigate does not implement bulk training in the UI.**
For more detail, see [Frigate Tip: Best Practices for Training Face and Custom Classification Models](https://github.com/blakeblackshear/frigate/discussions/21374).
:::
- **Start small and iterate**: Begin with a small, representative set of images per class. Models often begin working well with surprisingly few examples and improve naturally over time.
- **Problem framing**: Keep classes visually distinct and state-focused (e.g., `open`, `closed`, `unknown`). Avoid combining object identity with state in a single model unless necessary.
- **Data collection**: Use the model's Recent Classifications tab to gather balanced examples across times of day and weather.
- **When to train**: Focus on cases where the model is entirely incorrect or flips between states when it should not. There's no need to train additional images when the model is already working consistently.
- **Selecting training images**: Images scoring below 100% due to new conditions (e.g., first snow of the year, seasonal changes) or variations (e.g., objects temporarily in view, insects at night) are good candidates for training, as they represent scenarios different from the default state. Training these lower-scoring images that differ from existing training data helps prevent overfitting. Avoid training large quantities of images that look very similar, especially if they already score 100% as this can lead to overfitting.
- **Favor hard examples**: When images appear in the Recent Classifications tab, prioritize images scoring below 90100% or those captured under new conditions (e.g., first snow of the year, seasonal changes, objects temporarily in view, insects at night). These represent scenarios different from the default state and help prevent overfitting.
- **Avoid bulk training similar images**: Training large batches of images that already score 100% (or close) adds little new information and increases the risk of overfitting.
- **The wizard is just the starting point**: You don't need to find and label every state upfront. Missing states will naturally appear in Recent Classifications, and those images tend to be more valuable because they represent new conditions and edge cases.
## Debugging Classification Models

View File

@ -32,6 +32,7 @@ class CameraConfigUpdateEnum(str, Enum):
face_recognition = "face_recognition"
lpr = "lpr"
snapshots = "snapshots"
timestamp_style = "timestamp_style"
zones = "zones"
@ -133,6 +134,8 @@ class CameraConfigUpdateSubscriber:
config.snapshots = updated_config
elif update_type == CameraConfigUpdateEnum.onvif:
config.onvif = updated_config
elif update_type == CameraConfigUpdateEnum.timestamp_style:
config.timestamp_style = updated_config
elif update_type == CameraConfigUpdateEnum.zones:
config.zones = updated_config

View File

@ -25,6 +25,7 @@ from frigate.plus import PlusApi
from frigate.util.builtin import (
deep_merge,
get_ffmpeg_arg_list,
load_labels,
)
from frigate.util.config import (
CURRENT_CONFIG_VERSION,
@ -40,7 +41,7 @@ from frigate.util.services import auto_detect_hwaccel
from .auth import AuthConfig
from .base import FrigateBaseModel
from .camera import CameraConfig, CameraLiveConfig
from .camera.audio import AudioConfig
from .camera.audio import AudioConfig, AudioFilterConfig
from .camera.birdseye import BirdseyeConfig
from .camera.detect import DetectConfig
from .camera.ffmpeg import FfmpegConfig
@ -473,7 +474,7 @@ class FrigateConfig(FrigateBaseModel):
live: CameraLiveConfig = Field(
default_factory=CameraLiveConfig,
title="Live playback",
description="Settings used by the Web UI to control live stream resolution and quality.",
description="Settings to control the jsmpeg live stream resolution and quality. This does not affect restreamed cameras that use go2rtc for live view.",
)
motion: Optional[MotionConfig] = Field(
default=None,
@ -671,6 +672,12 @@ class FrigateConfig(FrigateBaseModel):
detector_config.model = model
self.detectors[key] = detector_config
all_audio_labels = {
label
for label in load_labels("/audio-labelmap.txt", prefill=521).values()
if label
}
for name, camera in self.cameras.items():
modified_global_config = global_config.copy()
@ -791,6 +798,14 @@ class FrigateConfig(FrigateBaseModel):
camera_config.review.genai.enabled
)
if camera_config.audio.filters is None:
camera_config.audio.filters = {}
audio_keys = all_audio_labels
audio_keys = audio_keys - camera_config.audio.filters.keys()
for key in audio_keys:
camera_config.audio.filters[key] = AudioFilterConfig()
# Add default filters
object_keys = camera_config.objects.track
if camera_config.objects.filters is None:

View File

@ -317,7 +317,7 @@ class MemryXDetector(DetectionApi):
f"Failed to remove downloaded zip {zip_path}: {e}"
)
def send_input(self, connection_id, tensor_input: np.ndarray):
def send_input(self, connection_id, tensor_input: np.ndarray) -> None:
"""Pre-process (if needed) and send frame to MemryX input queue"""
if tensor_input is None:
raise ValueError("[send_input] No image data provided for inference")

View File

@ -5,7 +5,7 @@ import importlib
import logging
import os
import re
from typing import Any, Optional
from typing import Any, Callable, Optional
import numpy as np
from playhouse.shortcuts import model_to_dict
@ -31,10 +31,10 @@ __all__ = [
PROVIDERS = {}
def register_genai_provider(key: GenAIProviderEnum):
def register_genai_provider(key: GenAIProviderEnum) -> Callable:
"""Register a GenAI provider."""
def decorator(cls):
def decorator(cls: type) -> type:
PROVIDERS[key] = cls
return cls
@ -297,7 +297,7 @@ Guidelines:
"""Generate a description for the frame."""
try:
prompt = camera_config.objects.genai.object_prompts.get(
event.label,
str(event.label),
camera_config.objects.genai.prompt,
).format(**model_to_dict(event))
except KeyError as e:
@ -307,7 +307,7 @@ Guidelines:
logger.debug(f"Sending images to genai provider with prompt: {prompt}")
return self._send(prompt, thumbnails)
def _init_provider(self):
def _init_provider(self) -> Any:
"""Initialize the client."""
return None
@ -402,7 +402,7 @@ Guidelines:
}
def load_providers():
def load_providers() -> None:
package_dir = os.path.dirname(__file__)
for filename in os.listdir(package_dir):
if filename.endswith(".py") and filename != "__init__.py":

View File

@ -3,7 +3,7 @@
import base64
import json
import logging
from typing import Any, Optional
from typing import Any, AsyncGenerator, Optional
from urllib.parse import parse_qs, urlparse
from openai import AzureOpenAI
@ -20,10 +20,10 @@ class OpenAIClient(GenAIClient):
provider: AzureOpenAI
def _init_provider(self):
def _init_provider(self) -> AzureOpenAI | None:
"""Initialize the client."""
try:
parsed_url = urlparse(self.genai_config.base_url)
parsed_url = urlparse(self.genai_config.base_url or "")
query_params = parse_qs(parsed_url.query)
api_version = query_params.get("api-version", [None])[0]
azure_endpoint = f"{parsed_url.scheme}://{parsed_url.netloc}/"
@ -79,7 +79,7 @@ class OpenAIClient(GenAIClient):
logger.warning("Azure OpenAI returned an error: %s", str(e))
return None
if len(result.choices) > 0:
return result.choices[0].message.content.strip()
return str(result.choices[0].message.content.strip())
return None
def get_context_size(self) -> int:
@ -113,7 +113,7 @@ class OpenAIClient(GenAIClient):
if openai_tool_choice is not None:
request_params["tool_choice"] = openai_tool_choice
result = self.provider.chat.completions.create(**request_params)
result = self.provider.chat.completions.create(**request_params) # type: ignore[call-overload]
if (
result is None
@ -181,7 +181,7 @@ class OpenAIClient(GenAIClient):
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
):
) -> AsyncGenerator[tuple[str, Any], None]:
"""
Stream chat with tools; yields content deltas then final message.
@ -214,7 +214,7 @@ class OpenAIClient(GenAIClient):
tool_calls_by_index: dict[int, dict[str, Any]] = {}
finish_reason = "stop"
stream = self.provider.chat.completions.create(**request_params)
stream = self.provider.chat.completions.create(**request_params) # type: ignore[call-overload]
for chunk in stream:
if not chunk or not chunk.choices:

View File

@ -2,10 +2,11 @@
import json
import logging
from typing import Any, Optional
from typing import Any, AsyncGenerator, Optional
from google import genai
from google.genai import errors, types
from google.genai.types import FunctionCallingConfigMode
from frigate.config import GenAIProviderEnum
from frigate.genai import GenAIClient, register_genai_provider
@ -19,10 +20,10 @@ class GeminiClient(GenAIClient):
provider: genai.Client
def _init_provider(self):
def _init_provider(self) -> genai.Client:
"""Initialize the client."""
# Merge provider_options into HttpOptions
http_options_dict = {
http_options_dict: dict[str, Any] = {
"timeout": int(self.timeout * 1000), # requires milliseconds
"retry_options": types.HttpRetryOptions(
attempts=3,
@ -54,7 +55,7 @@ class GeminiClient(GenAIClient):
] + [prompt]
try:
# Merge runtime_options into generation_config if provided
generation_config_dict = {"candidate_count": 1}
generation_config_dict: dict[str, Any] = {"candidate_count": 1}
generation_config_dict.update(self.genai_config.runtime_options)
if response_format and response_format.get("type") == "json_schema":
@ -65,7 +66,7 @@ class GeminiClient(GenAIClient):
response = self.provider.models.generate_content(
model=self.genai_config.model,
contents=contents,
contents=contents, # type: ignore[arg-type]
config=types.GenerateContentConfig(
**generation_config_dict,
),
@ -78,6 +79,8 @@ class GeminiClient(GenAIClient):
return None
try:
if response.text is None:
return None
description = response.text.strip()
except (ValueError, AttributeError):
# No description was generated
@ -102,7 +105,7 @@ class GeminiClient(GenAIClient):
"""
try:
# Convert messages to Gemini format
gemini_messages = []
gemini_messages: list[types.Content] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
@ -110,7 +113,11 @@ class GeminiClient(GenAIClient):
# Map roles to Gemini format
if role == "system":
# Gemini doesn't have system role, prepend to first user message
if gemini_messages and gemini_messages[0].role == "user":
if (
gemini_messages
and gemini_messages[0].role == "user"
and gemini_messages[0].parts
):
gemini_messages[0].parts[
0
].text = f"{content}\n\n{gemini_messages[0].parts[0].text}"
@ -136,7 +143,7 @@ class GeminiClient(GenAIClient):
types.Content(
role="function",
parts=[
types.Part.from_function_response(function_response)
types.Part.from_function_response(function_response) # type: ignore[misc,call-arg,arg-type]
],
)
)
@ -171,19 +178,25 @@ class GeminiClient(GenAIClient):
if tool_choice:
if tool_choice == "none":
tool_config = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="NONE")
function_calling_config=types.FunctionCallingConfig(
mode=FunctionCallingConfigMode.NONE
)
)
elif tool_choice == "auto":
tool_config = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="AUTO")
function_calling_config=types.FunctionCallingConfig(
mode=FunctionCallingConfigMode.AUTO
)
)
elif tool_choice == "required":
tool_config = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="ANY")
function_calling_config=types.FunctionCallingConfig(
mode=FunctionCallingConfigMode.ANY
)
)
# Build request config
config_params = {"candidate_count": 1}
config_params: dict[str, Any] = {"candidate_count": 1}
if gemini_tools:
config_params["tools"] = gemini_tools
@ -197,7 +210,7 @@ class GeminiClient(GenAIClient):
response = self.provider.models.generate_content(
model=self.genai_config.model,
contents=gemini_messages,
contents=gemini_messages, # type: ignore[arg-type]
config=types.GenerateContentConfig(**config_params),
)
@ -291,7 +304,7 @@ class GeminiClient(GenAIClient):
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
):
) -> AsyncGenerator[tuple[str, Any], None]:
"""
Stream chat with tools; yields content deltas then final message.
@ -299,7 +312,7 @@ class GeminiClient(GenAIClient):
"""
try:
# Convert messages to Gemini format
gemini_messages = []
gemini_messages: list[types.Content] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
@ -307,7 +320,11 @@ class GeminiClient(GenAIClient):
# Map roles to Gemini format
if role == "system":
# Gemini doesn't have system role, prepend to first user message
if gemini_messages and gemini_messages[0].role == "user":
if (
gemini_messages
and gemini_messages[0].role == "user"
and gemini_messages[0].parts
):
gemini_messages[0].parts[
0
].text = f"{content}\n\n{gemini_messages[0].parts[0].text}"
@ -333,7 +350,7 @@ class GeminiClient(GenAIClient):
types.Content(
role="function",
parts=[
types.Part.from_function_response(function_response)
types.Part.from_function_response(function_response) # type: ignore[misc,call-arg,arg-type]
],
)
)
@ -368,19 +385,25 @@ class GeminiClient(GenAIClient):
if tool_choice:
if tool_choice == "none":
tool_config = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="NONE")
function_calling_config=types.FunctionCallingConfig(
mode=FunctionCallingConfigMode.NONE
)
)
elif tool_choice == "auto":
tool_config = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="AUTO")
function_calling_config=types.FunctionCallingConfig(
mode=FunctionCallingConfigMode.AUTO
)
)
elif tool_choice == "required":
tool_config = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="ANY")
function_calling_config=types.FunctionCallingConfig(
mode=FunctionCallingConfigMode.ANY
)
)
# Build request config
config_params = {"candidate_count": 1}
config_params: dict[str, Any] = {"candidate_count": 1}
if gemini_tools:
config_params["tools"] = gemini_tools
@ -399,7 +422,7 @@ class GeminiClient(GenAIClient):
stream = await self.provider.aio.models.generate_content_stream(
model=self.genai_config.model,
contents=gemini_messages,
contents=gemini_messages, # type: ignore[arg-type]
config=types.GenerateContentConfig(**config_params),
)

View File

@ -4,7 +4,7 @@ import base64
import io
import json
import logging
from typing import Any, Optional
from typing import Any, AsyncGenerator, Optional
import httpx
import numpy as np
@ -23,7 +23,7 @@ def _to_jpeg(img_bytes: bytes) -> bytes | None:
try:
img = Image.open(io.BytesIO(img_bytes))
if img.mode != "RGB":
img = img.convert("RGB")
img = img.convert("RGB") # type: ignore[assignment]
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85)
return buf.getvalue()
@ -36,10 +36,10 @@ def _to_jpeg(img_bytes: bytes) -> bytes | None:
class LlamaCppClient(GenAIClient):
"""Generative AI client for Frigate using llama.cpp server."""
provider: str # base_url
provider: str | None # base_url
provider_options: dict[str, Any]
def _init_provider(self):
def _init_provider(self) -> str | None:
"""Initialize the client."""
self.provider_options = {
**self.genai_config.provider_options,
@ -75,7 +75,7 @@ class LlamaCppClient(GenAIClient):
content.append(
{
"type": "image_url",
"image_url": {
"image_url": { # type: ignore[dict-item]
"url": f"data:image/jpeg;base64,{encoded_image}",
},
}
@ -111,7 +111,7 @@ class LlamaCppClient(GenAIClient):
):
choice = result["choices"][0]
if "message" in choice and "content" in choice["message"]:
return choice["message"]["content"].strip()
return str(choice["message"]["content"].strip())
return None
except Exception as e:
logger.warning("llama.cpp returned an error: %s", str(e))
@ -229,7 +229,7 @@ class LlamaCppClient(GenAIClient):
content.append(
{
"prompt_string": "<__media__>\n",
"multimodal_data": [encoded],
"multimodal_data": [encoded], # type: ignore[dict-item]
}
)
@ -367,7 +367,7 @@ class LlamaCppClient(GenAIClient):
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
):
) -> AsyncGenerator[tuple[str, Any], None]:
"""Stream chat with tools via OpenAI-compatible streaming API."""
if self.provider is None:
logger.warning(

View File

@ -2,7 +2,7 @@
import json
import logging
from typing import Any, Optional
from typing import Any, AsyncGenerator, Optional
from httpx import RemoteProtocolError, TimeoutException
from ollama import AsyncClient as OllamaAsyncClient
@ -28,10 +28,10 @@ class OllamaClient(GenAIClient):
},
}
provider: ApiClient
provider: ApiClient | None
provider_options: dict[str, Any]
def _init_provider(self):
def _init_provider(self) -> ApiClient | None:
"""Initialize the client."""
self.provider_options = {
**self.LOCAL_OPTIMIZED_OPTIONS,
@ -73,7 +73,7 @@ class OllamaClient(GenAIClient):
"exclusiveMinimum",
"exclusiveMaximum",
}
result = {}
result: dict[str, Any] = {}
for key, value in schema.items():
if not _is_properties and key in STRIP_KEYS:
continue
@ -122,7 +122,7 @@ class OllamaClient(GenAIClient):
logger.debug(
f"Ollama tokens used: eval_count={result.get('eval_count')}, prompt_eval_count={result.get('prompt_eval_count')}"
)
return result["response"].strip()
return str(result["response"]).strip()
except (
TimeoutException,
ResponseError,
@ -263,7 +263,7 @@ class OllamaClient(GenAIClient):
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
):
) -> AsyncGenerator[tuple[str, Any], None]:
"""Stream chat with tools; yields content deltas then final message.
When tools are provided, Ollama streaming does not include tool_calls

View File

@ -3,7 +3,7 @@
import base64
import json
import logging
from typing import Any, Optional
from typing import Any, AsyncGenerator, Optional
from httpx import TimeoutException
from openai import OpenAI
@ -21,7 +21,7 @@ class OpenAIClient(GenAIClient):
provider: OpenAI
context_size: Optional[int] = None
def _init_provider(self):
def _init_provider(self) -> OpenAI:
"""Initialize the client."""
# Extract context_size from provider_options as it's not a valid OpenAI client parameter
# It will be used in get_context_size() instead
@ -81,7 +81,7 @@ class OpenAIClient(GenAIClient):
and hasattr(result, "choices")
and len(result.choices) > 0
):
return result.choices[0].message.content.strip()
return str(result.choices[0].message.content.strip())
return None
except (TimeoutException, Exception) as e:
logger.warning("OpenAI returned an error: %s", str(e))
@ -171,7 +171,7 @@ class OpenAIClient(GenAIClient):
}
request_params.update(provider_opts)
result = self.provider.chat.completions.create(**request_params)
result = self.provider.chat.completions.create(**request_params) # type: ignore[call-overload]
if (
result is None
@ -245,7 +245,7 @@ class OpenAIClient(GenAIClient):
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
):
) -> AsyncGenerator[tuple[str, Any], None]:
"""
Stream chat with tools; yields content deltas then final message.
@ -287,7 +287,7 @@ class OpenAIClient(GenAIClient):
tool_calls_by_index: dict[int, dict[str, Any]] = {}
finish_reason = "stop"
stream = self.provider.chat.completions.create(**request_params)
stream = self.provider.chat.completions.create(**request_params) # type: ignore[call-overload]
for chunk in stream:
if not chunk or not chunk.choices:

View File

@ -5,7 +5,7 @@ import os
import threading
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
from typing import Optional, cast
from frigate.comms.inter_process import InterProcessRequestor
from frigate.const import CONFIG_DIR, UPDATE_JOB_STATE
@ -122,7 +122,7 @@ def start_media_sync_job(
if job_is_running("media_sync"):
current = get_current_job("media_sync")
logger.warning(
f"Media sync job {current.id} is already running. Rejecting new request."
f"Media sync job {current.id if current else 'unknown'} is already running. Rejecting new request."
)
return None
@ -146,9 +146,9 @@ def start_media_sync_job(
def get_current_media_sync_job() -> Optional[MediaSyncJob]:
"""Get the current running/queued media sync job, if any."""
return get_current_job("media_sync")
return cast(Optional[MediaSyncJob], get_current_job("media_sync"))
def get_media_sync_job_by_id(job_id: str) -> Optional[MediaSyncJob]:
"""Get media sync job by ID. Currently only tracks the current job."""
return get_job_by_id("media_sync", job_id)
return cast(Optional[MediaSyncJob], get_job_by_id("media_sync", job_id))

View File

@ -6,7 +6,7 @@ import threading
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Any, Optional
from typing import Any, Optional, cast
import cv2
import numpy as np
@ -96,7 +96,7 @@ def create_polygon_mask(
dtype=np.int32,
)
mask = np.zeros((frame_height, frame_width), dtype=np.uint8)
cv2.fillPoly(mask, [motion_points], 255)
cv2.fillPoly(mask, [motion_points], (255,))
return mask
@ -116,7 +116,7 @@ def compute_roi_bbox_normalized(
def heatmap_overlaps_roi(
heatmap: dict[str, int], roi_bbox: tuple[float, float, float, float]
heatmap: object, roi_bbox: tuple[float, float, float, float]
) -> bool:
"""Check if a sparse motion heatmap has any overlap with the ROI bounding box.
@ -155,9 +155,9 @@ def segment_passes_activity_gate(recording: Recordings) -> bool:
Returns True if any of motion, objects, or regions is non-zero/non-null.
Returns True if all are null (old segments without data).
"""
motion = recording.motion
objects = recording.objects
regions = recording.regions
motion: Any = recording.motion
objects: Any = recording.objects
regions: Any = recording.regions
# Old segments without metadata - pass through (conservative)
if motion is None and objects is None and regions is None:
@ -278,6 +278,9 @@ class MotionSearchRunner(threading.Thread):
frame_width = camera_config.detect.width
frame_height = camera_config.detect.height
if frame_width is None or frame_height is None:
raise ValueError(f"Camera {camera_name} detect dimensions not configured")
# Create polygon mask
polygon_mask = create_polygon_mask(
self.job.polygon_points, frame_width, frame_height
@ -415,11 +418,13 @@ class MotionSearchRunner(threading.Thread):
if self._should_stop():
break
rec_start: float = recording.start_time # type: ignore[assignment]
rec_end: float = recording.end_time # type: ignore[assignment]
future = executor.submit(
self._process_recording_for_motion,
recording.path,
recording.start_time,
recording.end_time,
str(recording.path),
rec_start,
rec_end,
self.job.start_time_range,
self.job.end_time_range,
polygon_mask,
@ -524,10 +529,12 @@ class MotionSearchRunner(threading.Thread):
break
try:
rec_start: float = recording.start_time # type: ignore[assignment]
rec_end: float = recording.end_time # type: ignore[assignment]
results, frames = self._process_recording_for_motion(
recording.path,
recording.start_time,
recording.end_time,
str(recording.path),
rec_start,
rec_end,
self.job.start_time_range,
self.job.end_time_range,
polygon_mask,
@ -672,7 +679,9 @@ class MotionSearchRunner(threading.Thread):
# Handle frame dimension changes
if gray.shape != polygon_mask.shape:
resized_mask = cv2.resize(
polygon_mask, (gray.shape[1], gray.shape[0]), cv2.INTER_NEAREST
polygon_mask,
(gray.shape[1], gray.shape[0]),
interpolation=cv2.INTER_NEAREST,
)
current_bbox = cv2.boundingRect(resized_mask)
else:
@ -698,7 +707,7 @@ class MotionSearchRunner(threading.Thread):
)
if prev_frame_gray is not None:
diff = cv2.absdiff(prev_frame_gray, masked_gray)
diff = cv2.absdiff(prev_frame_gray, masked_gray) # type: ignore[unreachable]
diff_blurred = cv2.GaussianBlur(diff, (3, 3), 0)
_, thresh = cv2.threshold(
diff_blurred, threshold, 255, cv2.THRESH_BINARY
@ -825,7 +834,7 @@ def get_motion_search_job(job_id: str) -> Optional[MotionSearchJob]:
if job_entry:
return job_entry[0]
# Check completed jobs via manager
return get_job_by_id("motion_search", job_id)
return cast(Optional[MotionSearchJob], get_job_by_id("motion_search", job_id))
def cancel_motion_search_job(job_id: str) -> bool:

View File

@ -54,9 +54,9 @@ class VLMWatchRunner(threading.Thread):
job: VLMWatchJob,
config: FrigateConfig,
cancel_event: threading.Event,
frame_processor,
genai_manager,
dispatcher,
frame_processor: Any,
genai_manager: Any,
dispatcher: Any,
) -> None:
super().__init__(daemon=True, name=f"vlm_watch_{job.id}")
self.job = job
@ -226,9 +226,12 @@ class VLMWatchRunner(threading.Thread):
remaining = deadline - time.time()
if remaining <= 0:
break
topic, payload = self.detection_subscriber.check_for_update(
result = self.detection_subscriber.check_for_update(
timeout=min(1.0, remaining)
)
if result is None:
continue
topic, payload = result
if topic is None or payload is None:
continue
# payload = (camera, frame_name, frame_time, tracked_objects, motion_boxes, regions)
@ -328,9 +331,9 @@ def start_vlm_watch_job(
condition: str,
max_duration_minutes: int,
config: FrigateConfig,
frame_processor,
genai_manager,
dispatcher,
frame_processor: Any,
genai_manager: Any,
dispatcher: Any,
labels: list[str] | None = None,
zones: list[str] | None = None,
) -> str:

View File

@ -13,10 +13,10 @@ class MotionDetector(ABC):
frame_shape: Tuple[int, int, int],
config: MotionConfig,
fps: int,
improve_contrast,
threshold,
contour_area,
):
improve_contrast: bool,
threshold: int,
contour_area: int | None,
) -> None:
pass
@abstractmethod
@ -25,7 +25,7 @@ class MotionDetector(ABC):
pass
@abstractmethod
def is_calibrating(self):
def is_calibrating(self) -> bool:
"""Return if motion is recalibrating."""
pass
@ -35,6 +35,6 @@ class MotionDetector(ABC):
pass
@abstractmethod
def stop(self):
def stop(self) -> None:
"""Stop any ongoing work and processes."""
pass

View File

@ -1,7 +1,9 @@
from typing import Any
import cv2
import numpy as np
from frigate.config import MotionConfig
from frigate.config.config import RuntimeMotionConfig
from frigate.motion import MotionDetector
from frigate.util.image import grab_cv2_contours
@ -9,19 +11,20 @@ from frigate.util.image import grab_cv2_contours
class FrigateMotionDetector(MotionDetector):
def __init__(
self,
frame_shape,
config: MotionConfig,
frame_shape: tuple[int, ...],
config: RuntimeMotionConfig,
fps: int,
improve_contrast,
threshold,
contour_area,
):
improve_contrast: Any,
threshold: Any,
contour_area: Any,
) -> None:
self.config = config
self.frame_shape = frame_shape
self.resize_factor = frame_shape[0] / config.frame_height
frame_height = config.frame_height or frame_shape[0]
self.resize_factor = frame_shape[0] / frame_height
self.motion_frame_size = (
config.frame_height,
config.frame_height * frame_shape[1] // frame_shape[0],
frame_height,
frame_height * frame_shape[1] // frame_shape[0],
)
self.avg_frame = np.zeros(self.motion_frame_size, np.float32)
self.avg_delta = np.zeros(self.motion_frame_size, np.float32)
@ -38,10 +41,10 @@ class FrigateMotionDetector(MotionDetector):
self.threshold = threshold
self.contour_area = contour_area
def is_calibrating(self):
def is_calibrating(self) -> bool:
return False
def detect(self, frame):
def detect(self, frame: np.ndarray) -> list:
motion_boxes = []
gray = frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]]
@ -99,7 +102,7 @@ class FrigateMotionDetector(MotionDetector):
# dilate the thresholded image to fill in holes, then find contours
# on thresholded image
thresh_dilated = cv2.dilate(thresh, None, iterations=2)
thresh_dilated = cv2.dilate(thresh, None, iterations=2) # type: ignore[call-overload]
contours = cv2.findContours(
thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)

View File

@ -1,11 +1,12 @@
import logging
from typing import Optional
import cv2
import numpy as np
from scipy.ndimage import gaussian_filter
from frigate.camera import PTZMetrics
from frigate.config import MotionConfig
from frigate.config.config import RuntimeMotionConfig
from frigate.motion import MotionDetector
from frigate.util.image import grab_cv2_contours
@ -15,22 +16,23 @@ logger = logging.getLogger(__name__)
class ImprovedMotionDetector(MotionDetector):
def __init__(
self,
frame_shape,
config: MotionConfig,
frame_shape: tuple[int, ...],
config: RuntimeMotionConfig,
fps: int,
ptz_metrics: PTZMetrics = None,
name="improved",
blur_radius=1,
interpolation=cv2.INTER_NEAREST,
contrast_frame_history=50,
):
ptz_metrics: Optional[PTZMetrics] = None,
name: str = "improved",
blur_radius: int = 1,
interpolation: int = cv2.INTER_NEAREST,
contrast_frame_history: int = 50,
) -> None:
self.name = name
self.config = config
self.frame_shape = frame_shape
self.resize_factor = frame_shape[0] / config.frame_height
frame_height = config.frame_height or frame_shape[0]
self.resize_factor = frame_shape[0] / frame_height
self.motion_frame_size = (
config.frame_height,
config.frame_height * frame_shape[1] // frame_shape[0],
frame_height,
frame_height * frame_shape[1] // frame_shape[0],
)
self.avg_frame = np.zeros(self.motion_frame_size, np.float32)
self.motion_frame_count = 0
@ -44,20 +46,20 @@ class ImprovedMotionDetector(MotionDetector):
self.contrast_values[:, 1:2] = 255
self.contrast_values_index = 0
self.ptz_metrics = ptz_metrics
self.last_stop_time = None
self.last_stop_time: float | None = None
def is_calibrating(self):
def is_calibrating(self) -> bool:
return self.calibrating
def detect(self, frame):
motion_boxes = []
def detect(self, frame: np.ndarray) -> list[tuple[int, int, int, int]]:
motion_boxes: list[tuple[int, int, int, int]] = []
if not self.config.enabled:
return motion_boxes
# if ptz motor is moving from autotracking, quickly return
# a single box that is 80% of the frame
if (
if self.ptz_metrics is not None and (
self.ptz_metrics.autotracker_enabled.value
and not self.ptz_metrics.motor_stopped.is_set()
):
@ -130,19 +132,19 @@ class ImprovedMotionDetector(MotionDetector):
# dilate the thresholded image to fill in holes, then find contours
# on thresholded image
thresh_dilated = cv2.dilate(thresh, None, iterations=1)
thresh_dilated = cv2.dilate(thresh, None, iterations=1) # type: ignore[call-overload]
contours = cv2.findContours(
thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
contours = grab_cv2_contours(contours)
# loop over the contours
total_contour_area = 0
total_contour_area: float = 0
for c in contours:
# if the contour is big enough, count it as motion
contour_area = cv2.contourArea(c)
total_contour_area += contour_area
if contour_area > self.config.contour_area:
if contour_area > (self.config.contour_area or 0):
x, y, w, h = cv2.boundingRect(c)
motion_boxes.append(
(
@ -159,7 +161,7 @@ class ImprovedMotionDetector(MotionDetector):
# check if the motor has just stopped from autotracking
# if so, reassign the average to the current frame so we begin with a new baseline
if (
if self.ptz_metrics is not None and (
# ensure we only do this for cameras with autotracking enabled
self.ptz_metrics.autotracker_enabled.value
and self.ptz_metrics.motor_stopped.is_set()

View File

@ -41,6 +41,24 @@ ignore_errors = false
[mypy-frigate.events]
ignore_errors = false
[mypy-frigate.genai.*]
ignore_errors = false
[mypy-frigate.jobs.*]
ignore_errors = false
[mypy-frigate.motion.*]
ignore_errors = false
[mypy-frigate.object_detection.*]
ignore_errors = false
[mypy-frigate.output.*]
ignore_errors = false
[mypy-frigate.ptz]
ignore_errors = false
[mypy-frigate.log]
ignore_errors = false

View File

@ -7,6 +7,7 @@ from abc import ABC, abstractmethod
from collections import deque
from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional
import numpy as np
import zmq
@ -34,26 +35,25 @@ logger = logging.getLogger(__name__)
class ObjectDetector(ABC):
@abstractmethod
def detect(self, tensor_input, threshold: float = 0.4):
def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list:
pass
class BaseLocalDetector(ObjectDetector):
def __init__(
self,
detector_config: BaseDetectorConfig = None,
labels: str = None,
stop_event: MpEvent = None,
):
detector_config: Optional[BaseDetectorConfig] = None,
labels: Optional[str] = None,
stop_event: Optional[MpEvent] = None,
) -> None:
self.fps = EventsPerSecond()
if labels is None:
self.labels = {}
self.labels: dict[int, str] = {}
else:
self.labels = load_labels(labels)
if detector_config:
if detector_config and detector_config.model:
self.input_transform = tensor_transform(detector_config.model.input_tensor)
self.dtype = detector_config.model.input_dtype
else:
self.input_transform = None
@ -77,10 +77,10 @@ class BaseLocalDetector(ObjectDetector):
return tensor_input
def detect(self, tensor_input: np.ndarray, threshold=0.4):
def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list:
detections = []
raw_detections = self.detect_raw(tensor_input)
raw_detections = self.detect_raw(tensor_input) # type: ignore[attr-defined]
for d in raw_detections:
if int(d[0]) < 0 or int(d[0]) >= len(self.labels):
@ -96,28 +96,28 @@ class BaseLocalDetector(ObjectDetector):
class LocalObjectDetector(BaseLocalDetector):
def detect_raw(self, tensor_input: np.ndarray):
def detect_raw(self, tensor_input: np.ndarray) -> np.ndarray:
tensor_input = self._transform_input(tensor_input)
return self.detect_api.detect_raw(tensor_input=tensor_input)
return self.detect_api.detect_raw(tensor_input=tensor_input) # type: ignore[no-any-return]
class AsyncLocalObjectDetector(BaseLocalDetector):
def async_send_input(self, tensor_input: np.ndarray, connection_id: str):
def async_send_input(self, tensor_input: np.ndarray, connection_id: str) -> None:
tensor_input = self._transform_input(tensor_input)
return self.detect_api.send_input(connection_id, tensor_input)
self.detect_api.send_input(connection_id, tensor_input)
def async_receive_output(self):
def async_receive_output(self) -> Any:
return self.detect_api.receive_output()
class DetectorRunner(FrigateProcess):
def __init__(
self,
name,
name: str,
detection_queue: Queue,
cameras: list[str],
avg_speed: Value,
start_time: Value,
avg_speed: Any,
start_time: Any,
config: FrigateConfig,
detector_config: BaseDetectorConfig,
stop_event: MpEvent,
@ -129,11 +129,11 @@ class DetectorRunner(FrigateProcess):
self.start_time = start_time
self.config = config
self.detector_config = detector_config
self.outputs: dict = {}
self.outputs: dict[str, Any] = {}
def create_output_shm(self, name: str):
def create_output_shm(self, name: str) -> None:
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
out_np: np.ndarray = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
self.outputs[name] = {"shm": out_shm, "np": out_np}
def run(self) -> None:
@ -155,8 +155,8 @@ class DetectorRunner(FrigateProcess):
connection_id,
(
1,
self.detector_config.model.height,
self.detector_config.model.width,
self.detector_config.model.height, # type: ignore[union-attr]
self.detector_config.model.width, # type: ignore[union-attr]
3,
),
)
@ -187,11 +187,11 @@ class DetectorRunner(FrigateProcess):
class AsyncDetectorRunner(FrigateProcess):
def __init__(
self,
name,
name: str,
detection_queue: Queue,
cameras: list[str],
avg_speed: Value,
start_time: Value,
avg_speed: Any,
start_time: Any,
config: FrigateConfig,
detector_config: BaseDetectorConfig,
stop_event: MpEvent,
@ -203,15 +203,15 @@ class AsyncDetectorRunner(FrigateProcess):
self.start_time = start_time
self.config = config
self.detector_config = detector_config
self.outputs: dict = {}
self.outputs: dict[str, Any] = {}
self._frame_manager: SharedMemoryFrameManager | None = None
self._publisher: ObjectDetectorPublisher | None = None
self._detector: AsyncLocalObjectDetector | None = None
self.send_times = deque()
self.send_times: deque[float] = deque()
def create_output_shm(self, name: str):
def create_output_shm(self, name: str) -> None:
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
out_np: np.ndarray = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
self.outputs[name] = {"shm": out_shm, "np": out_np}
def _detect_worker(self) -> None:
@ -222,12 +222,13 @@ class AsyncDetectorRunner(FrigateProcess):
except queue.Empty:
continue
assert self._frame_manager is not None
input_frame = self._frame_manager.get(
connection_id,
(
1,
self.detector_config.model.height,
self.detector_config.model.width,
self.detector_config.model.height, # type: ignore[union-attr]
self.detector_config.model.width, # type: ignore[union-attr]
3,
),
)
@ -238,11 +239,13 @@ class AsyncDetectorRunner(FrigateProcess):
# mark start time and send to accelerator
self.send_times.append(time.perf_counter())
assert self._detector is not None
self._detector.async_send_input(input_frame, connection_id)
def _result_worker(self) -> None:
logger.info("Starting Result Worker Thread")
while not self.stop_event.is_set():
assert self._detector is not None
connection_id, detections = self._detector.async_receive_output()
# Handle timeout case (queue.Empty) - just continue
@ -256,6 +259,7 @@ class AsyncDetectorRunner(FrigateProcess):
duration = time.perf_counter() - ts
# release input buffer
assert self._frame_manager is not None
self._frame_manager.close(connection_id)
if connection_id not in self.outputs:
@ -264,6 +268,7 @@ class AsyncDetectorRunner(FrigateProcess):
# write results and publish
if detections is not None:
self.outputs[connection_id]["np"][:] = detections[:]
assert self._publisher is not None
self._publisher.publish(connection_id)
# update timers
@ -330,11 +335,14 @@ class ObjectDetectProcess:
self.stop_event = stop_event
self.start_or_restart()
def stop(self):
def stop(self) -> None:
# if the process has already exited on its own, just return
if self.detect_process and self.detect_process.exitcode:
return
if self.detect_process is None:
return
logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
@ -343,8 +351,8 @@ class ObjectDetectProcess:
self.detect_process.join()
logging.info("Detection process has exited...")
def start_or_restart(self):
self.detection_start.value = 0.0
def start_or_restart(self) -> None:
self.detection_start.value = 0.0 # type: ignore[attr-defined]
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
@ -389,17 +397,19 @@ class RemoteObjectDetector:
self.detection_queue = detection_queue
self.stop_event = stop_event
self.shm = UntrackedSharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray(
self.np_shm: np.ndarray = np.ndarray(
(1, model_config.height, model_config.width, 3),
dtype=np.uint8,
buffer=self.shm.buf,
)
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
self.out_np_shm: np.ndarray = np.ndarray(
(20, 6), dtype=np.float32, buffer=self.out_shm.buf
)
self.detector_subscriber = ObjectDetectorSubscriber(name)
def detect(self, tensor_input, threshold=0.4):
detections = []
def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list:
detections: list = []
if self.stop_event.is_set():
return detections
@ -431,7 +441,7 @@ class RemoteObjectDetector:
self.fps.update()
return detections
def cleanup(self):
def cleanup(self) -> None:
self.detector_subscriber.stop()
self.shm.unlink()
self.out_shm.unlink()

View File

@ -13,10 +13,10 @@ class RequestStore:
A thread-safe hash-based response store that handles creating requests.
"""
def __init__(self):
def __init__(self) -> None:
self.request_counter = 0
self.request_counter_lock = threading.Lock()
self.input_queue = queue.Queue()
self.input_queue: queue.Queue[tuple[int, ndarray]] = queue.Queue()
def __get_request_id(self) -> int:
with self.request_counter_lock:
@ -45,17 +45,19 @@ class ResponseStore:
their request's result appears.
"""
def __init__(self):
self.responses = {} # Maps request_id -> (original_input, infer_results)
def __init__(self) -> None:
self.responses: dict[
int, ndarray
] = {} # Maps request_id -> (original_input, infer_results)
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
def put(self, request_id: int, response: ndarray):
def put(self, request_id: int, response: ndarray) -> None:
with self.cond:
self.responses[request_id] = response
self.cond.notify_all()
def get(self, request_id: int, timeout=None) -> ndarray:
def get(self, request_id: int, timeout: float | None = None) -> ndarray:
with self.cond:
if not self.cond.wait_for(
lambda: request_id in self.responses, timeout=timeout
@ -65,7 +67,9 @@ class ResponseStore:
return self.responses.pop(request_id)
def tensor_transform(desired_shape: InputTensorEnum):
def tensor_transform(
desired_shape: InputTensorEnum,
) -> tuple[int, int, int, int] | None:
# Currently this function only supports BHWC permutations
if desired_shape == InputTensorEnum.nhwc:
return None

View File

@ -4,13 +4,13 @@ import datetime
import glob
import logging
import math
import multiprocessing as mp
import os
import queue
import subprocess as sp
import threading
import time
import traceback
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional
import cv2
@ -74,25 +74,25 @@ class Canvas:
self,
canvas_width: int,
canvas_height: int,
scaling_factor: int,
scaling_factor: float,
) -> None:
self.scaling_factor = scaling_factor
gcd = math.gcd(canvas_width, canvas_height)
self.aspect = get_standard_aspect_ratio(
(canvas_width / gcd), (canvas_height / gcd)
int(canvas_width / gcd), int(canvas_height / gcd)
)
self.width = canvas_width
self.height = (self.width * self.aspect[1]) / self.aspect[0]
self.coefficient_cache: dict[int, int] = {}
self.height: float = (self.width * self.aspect[1]) / self.aspect[0]
self.coefficient_cache: dict[int, float] = {}
self.aspect_cache: dict[str, tuple[int, int]] = {}
def get_aspect(self, coefficient: int) -> tuple[int, int]:
def get_aspect(self, coefficient: float) -> tuple[float, float]:
return (self.aspect[0] * coefficient, self.aspect[1] * coefficient)
def get_coefficient(self, camera_count: int) -> int:
def get_coefficient(self, camera_count: int) -> float:
return self.coefficient_cache.get(camera_count, self.scaling_factor)
def set_coefficient(self, camera_count: int, coefficient: int) -> None:
def set_coefficient(self, camera_count: int, coefficient: float) -> None:
self.coefficient_cache[camera_count] = coefficient
def get_camera_aspect(
@ -105,7 +105,7 @@ class Canvas:
gcd = math.gcd(camera_width, camera_height)
camera_aspect = get_standard_aspect_ratio(
camera_width / gcd, camera_height / gcd
int(camera_width / gcd), int(camera_height / gcd)
)
self.aspect_cache[cam_name] = camera_aspect
return camera_aspect
@ -116,7 +116,7 @@ class FFMpegConverter(threading.Thread):
self,
ffmpeg: FfmpegConfig,
input_queue: queue.Queue,
stop_event: mp.Event,
stop_event: MpEvent,
in_width: int,
in_height: int,
out_width: int,
@ -128,7 +128,7 @@ class FFMpegConverter(threading.Thread):
self.camera = "birdseye"
self.input_queue = input_queue
self.stop_event = stop_event
self.bd_pipe = None
self.bd_pipe: int | None = None
if birdseye_rtsp:
self.recreate_birdseye_pipe()
@ -181,7 +181,8 @@ class FFMpegConverter(threading.Thread):
os.close(stdin)
self.reading_birdseye = False
def __write(self, b) -> None:
def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b)
if self.bd_pipe:
@ -200,13 +201,13 @@ class FFMpegConverter(threading.Thread):
return
def read(self, length):
def read(self, length: int) -> Any:
try:
return self.process.stdout.read1(length)
return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError:
return False
def exit(self):
def exit(self) -> None:
if self.bd_pipe:
os.close(self.bd_pipe)
@ -233,8 +234,8 @@ class BroadcastThread(threading.Thread):
self,
camera: str,
converter: FFMpegConverter,
websocket_server,
stop_event: mp.Event,
websocket_server: Any,
stop_event: MpEvent,
):
super().__init__()
self.camera = camera
@ -242,7 +243,7 @@ class BroadcastThread(threading.Thread):
self.websocket_server = websocket_server
self.stop_event = stop_event
def run(self):
def run(self) -> None:
while not self.stop_event.is_set():
buf = self.converter.read(65536)
if buf:
@ -270,16 +271,16 @@ class BirdsEyeFrameManager:
def __init__(
self,
config: FrigateConfig,
stop_event: mp.Event,
stop_event: MpEvent,
):
self.config = config
width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height)
self.frame_shape = (height, width)
self.yuv_shape = (height * 3 // 2, width)
self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8)
self.frame: np.ndarray = np.ndarray(self.yuv_shape, dtype=np.uint8)
self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor)
self.stop_event = stop_event
self.last_refresh_time = 0
self.last_refresh_time: float = 0
# initialize the frame as black and with the Frigate logo
self.blank_frame = np.zeros(self.yuv_shape, np.uint8)
@ -323,15 +324,15 @@ class BirdsEyeFrameManager:
self.frame[:] = self.blank_frame
self.cameras = {}
self.cameras: dict[str, Any] = {}
for camera in self.config.cameras.keys():
self.add_camera(camera)
self.camera_layout = []
self.active_cameras = set()
self.camera_layout: list[Any] = []
self.active_cameras: set[str] = set()
self.last_output_time = 0.0
def add_camera(self, cam: str):
def add_camera(self, cam: str) -> None:
"""Add a camera to self.cameras with the correct structure."""
settings = self.config.cameras[cam]
# precalculate the coordinates for all the channels
@ -361,16 +362,21 @@ class BirdsEyeFrameManager:
},
}
def remove_camera(self, cam: str):
def remove_camera(self, cam: str) -> None:
"""Remove a camera from self.cameras."""
if cam in self.cameras:
del self.cameras[cam]
def clear_frame(self):
def clear_frame(self) -> None:
logger.debug("Clearing the birdseye frame")
self.frame[:] = self.blank_frame
def copy_to_position(self, position, camera=None, frame: np.ndarray = None):
def copy_to_position(
self,
position: Any,
camera: Optional[str] = None,
frame: Optional[np.ndarray] = None,
) -> None:
if camera is None:
frame = None
channel_dims = None
@ -389,7 +395,9 @@ class BirdsEyeFrameManager:
channel_dims,
)
def camera_active(self, mode, object_box_count, motion_box_count):
def camera_active(
self, mode: Any, object_box_count: int, motion_box_count: int
) -> bool:
if mode == BirdseyeModeEnum.continuous:
return True
@ -399,6 +407,8 @@ class BirdsEyeFrameManager:
if mode == BirdseyeModeEnum.objects and object_box_count > 0:
return True
return False
def get_camera_coordinates(self) -> dict[str, dict[str, int]]:
"""Return the coordinates of each camera in the current layout."""
coordinates = {}
@ -451,7 +461,7 @@ class BirdsEyeFrameManager:
- self.cameras[active_camera]["last_active_frame"]
),
)
active_cameras = limited_active_cameras[:max_cameras]
active_cameras = set(limited_active_cameras[:max_cameras])
max_camera_refresh = True
self.last_refresh_time = now
@ -510,7 +520,7 @@ class BirdsEyeFrameManager:
# center camera view in canvas and ensure that it fits
if scaled_width < self.canvas.width:
coefficient = 1
coefficient: float = 1
x_offset = int((self.canvas.width - scaled_width) / 2)
else:
coefficient = self.canvas.width / scaled_width
@ -557,7 +567,7 @@ class BirdsEyeFrameManager:
calculating = False
self.canvas.set_coefficient(len(active_cameras), coefficient)
self.camera_layout = layout_candidate
self.camera_layout = layout_candidate or []
frame_changed = True
# Draw the layout
@ -577,10 +587,12 @@ class BirdsEyeFrameManager:
self,
cameras_to_add: list[str],
coefficient: float,
) -> tuple[Any]:
) -> Optional[list[list[Any]]]:
"""Calculate the optimal layout for 2+ cameras."""
def map_layout(camera_layout: list[list[Any]], row_height: int):
def map_layout(
camera_layout: list[list[Any]], row_height: int
) -> tuple[int, int, Optional[list[list[Any]]]]:
"""Map the calculated layout."""
candidate_layout = []
starting_x = 0
@ -777,11 +789,11 @@ class Birdseye:
def __init__(
self,
config: FrigateConfig,
stop_event: mp.Event,
websocket_server,
stop_event: MpEvent,
websocket_server: Any,
) -> None:
self.config = config
self.input = queue.Queue(maxsize=10)
self.input: queue.Queue[bytes] = queue.Queue(maxsize=10)
self.converter = FFMpegConverter(
config.ffmpeg,
self.input,
@ -806,7 +818,7 @@ class Birdseye:
)
if config.birdseye.restream:
self.birdseye_buffer = self.frame_manager.create(
self.birdseye_buffer: Any = self.frame_manager.create(
"birdseye",
self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1],
)

View File

@ -1,10 +1,11 @@
"""Handle outputting individual cameras via jsmpeg."""
import logging
import multiprocessing as mp
import queue
import subprocess as sp
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import CameraConfig, FfmpegConfig
@ -17,7 +18,7 @@ class FFMpegConverter(threading.Thread):
camera: str,
ffmpeg: FfmpegConfig,
input_queue: queue.Queue,
stop_event: mp.Event,
stop_event: MpEvent,
in_width: int,
in_height: int,
out_width: int,
@ -64,16 +65,17 @@ class FFMpegConverter(threading.Thread):
start_new_session=True,
)
def __write(self, b) -> None:
def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b)
def read(self, length):
def read(self, length: int) -> Any:
try:
return self.process.stdout.read1(length)
return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError:
return False
def exit(self):
def exit(self) -> None:
self.process.terminate()
try:
@ -98,8 +100,8 @@ class BroadcastThread(threading.Thread):
self,
camera: str,
converter: FFMpegConverter,
websocket_server,
stop_event: mp.Event,
websocket_server: Any,
stop_event: MpEvent,
):
super().__init__()
self.camera = camera
@ -107,7 +109,7 @@ class BroadcastThread(threading.Thread):
self.websocket_server = websocket_server
self.stop_event = stop_event
def run(self):
def run(self) -> None:
while not self.stop_event.is_set():
buf = self.converter.read(65536)
if buf:
@ -133,15 +135,15 @@ class BroadcastThread(threading.Thread):
class JsmpegCamera:
def __init__(
self, config: CameraConfig, stop_event: mp.Event, websocket_server
self, config: CameraConfig, stop_event: MpEvent, websocket_server: Any
) -> None:
self.config = config
self.input = queue.Queue(maxsize=config.detect.fps)
self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps)
width = int(
config.live.height * (config.frame_shape[1] / config.frame_shape[0])
)
self.converter = FFMpegConverter(
config.name,
config.name or "",
config.ffmpeg,
self.input,
stop_event,
@ -152,13 +154,13 @@ class JsmpegCamera:
config.live.quality,
)
self.broadcaster = BroadcastThread(
config.name, self.converter, websocket_server, stop_event
config.name or "", self.converter, websocket_server, stop_event
)
self.converter.start()
self.broadcaster.start()
def write_frame(self, frame_bytes) -> None:
def write_frame(self, frame_bytes: bytes) -> None:
try:
self.input.put_nowait(frame_bytes)
except queue.Full:

View File

@ -61,6 +61,12 @@ def check_disabled_camera_update(
# last camera update was more than 1 second ago
# need to send empty data to birdseye because current
# frame is now out of date
cam_width = config.cameras[camera].detect.width
cam_height = config.cameras[camera].detect.height
if cam_width is None or cam_height is None:
raise ValueError(f"Camera {camera} detect dimensions not configured")
if birdseye and offline_time < 10:
# we only need to send blank frames to birdseye at the beginning of a camera being offline
birdseye.write_data(
@ -68,10 +74,7 @@ def check_disabled_camera_update(
[],
[],
now,
get_blank_yuv_frame(
config.cameras[camera].detect.width,
config.cameras[camera].detect.height,
),
get_blank_yuv_frame(cam_width, cam_height),
)
if not has_enabled_camera and birdseye:
@ -173,7 +176,7 @@ class OutputProcess(FrigateProcess):
birdseye_config_subscriber.check_for_update()
)
if update_topic is not None:
if update_topic is not None and birdseye_config is not None:
previous_global_mode = self.config.birdseye.mode
self.config.birdseye = birdseye_config
@ -198,7 +201,10 @@ class OutputProcess(FrigateProcess):
birdseye,
)
(topic, data) = detection_subscriber.check_for_update(timeout=1)
_result = detection_subscriber.check_for_update(timeout=1)
if _result is None:
continue
(topic, data) = _result
now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5:
@ -208,7 +214,7 @@ class OutputProcess(FrigateProcess):
self.config, birdseye, preview_recorders, preview_write_times
)
if not topic:
if not topic or data is None:
continue
(
@ -262,11 +268,15 @@ class OutputProcess(FrigateProcess):
jsmpeg_cameras[camera].write_frame(frame.tobytes())
# send output data to birdseye if websocket is connected or restreaming
if self.config.birdseye.enabled and (
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
if (
self.config.birdseye.enabled
and birdseye is not None
and (
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
)
):
birdseye.write_data(
@ -282,9 +292,12 @@ class OutputProcess(FrigateProcess):
move_preview_frames("clips")
while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0)
_cleanup_result = detection_subscriber.check_for_update(timeout=0)
if _cleanup_result is None:
break
(topic, data) = _cleanup_result
if not topic:
if not topic or data is None:
break
(
@ -322,7 +335,7 @@ class OutputProcess(FrigateProcess):
logger.info("exiting output process...")
def move_preview_frames(loc: str):
def move_preview_frames(loc: str) -> None:
preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache")
preview_cache = os.path.join(CACHE_DIR, "preview_frames")

View File

@ -22,7 +22,6 @@ from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_encode,
)
from frigate.models import Previews
from frigate.track.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop
logger = logging.getLogger(__name__)
@ -66,7 +65,9 @@ def get_cache_image_name(camera: str, frame_time: float) -> str:
)
def get_most_recent_preview_frame(camera: str, before: float = None) -> str | None:
def get_most_recent_preview_frame(
camera: str, before: float | None = None
) -> str | None:
"""Get the most recent preview frame for a camera."""
if not os.path.exists(PREVIEW_CACHE_DIR):
return None
@ -147,12 +148,12 @@ class FFMpegConverter(threading.Thread):
if t_idx == item_count - 1:
# last frame does not get a duration
playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'"
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type]
)
continue
playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'"
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type]
)
playlist.append(
f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}"
@ -199,30 +200,33 @@ class FFMpegConverter(threading.Thread):
# unlink files from cache
# don't delete last frame as it will be used as first frame in next segment
for t in self.frame_times[0:-1]:
Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True)
Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) # type: ignore[arg-type]
class PreviewRecorder:
def __init__(self, config: CameraConfig) -> None:
self.config = config
self.start_time = 0
self.last_output_time = 0
self.camera_name: str = config.name or ""
self.start_time: float = 0
self.last_output_time: float = 0
self.offline = False
self.output_frames = []
self.output_frames: list[float] = []
if config.detect.width > config.detect.height:
if config.detect.width is None or config.detect.height is None:
raise ValueError("Detect width and height must be set for previews.")
self.detect_width: int = config.detect.width
self.detect_height: int = config.detect.height
if self.detect_width > self.detect_height:
self.out_height = PREVIEW_HEIGHT
self.out_width = (
int((config.detect.width / config.detect.height) * self.out_height)
// 4
* 4
int((self.detect_width / self.detect_height) * self.out_height) // 4 * 4
)
else:
self.out_width = PREVIEW_HEIGHT
self.out_height = (
int((config.detect.height / config.detect.width) * self.out_width)
// 4
* 4
int((self.detect_height / self.detect_width) * self.out_width) // 4 * 4
)
# create communication for finished previews
@ -302,7 +306,7 @@ class PreviewRecorder:
)
self.start_time = frame_time
self.last_output_time = frame_time
self.output_frames: list[float] = []
self.output_frames = []
def should_write_frame(
self,
@ -342,7 +346,9 @@ class PreviewRecorder:
def write_frame_to_cache(self, frame_time: float, frame: np.ndarray) -> None:
# resize yuv frame
small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8)
small_frame: np.ndarray = np.zeros(
(self.out_height * 3 // 2, self.out_width), np.uint8
)
copy_yuv_to_position(
small_frame,
(0, 0),
@ -356,7 +362,7 @@ class PreviewRecorder:
cv2.COLOR_YUV2BGR_I420,
)
cv2.imwrite(
get_cache_image_name(self.config.name, frame_time),
get_cache_image_name(self.camera_name, frame_time),
small_frame,
[
int(cv2.IMWRITE_WEBP_QUALITY),
@ -396,7 +402,7 @@ class PreviewRecorder:
).start()
else:
logger.debug(
f"Not saving preview for {self.config.name} because there are no saved frames."
f"Not saving preview for {self.camera_name} because there are no saved frames."
)
self.reset_frame_cache(frame_time)
@ -416,9 +422,7 @@ class PreviewRecorder:
if not self.offline:
self.write_frame_to_cache(
frame_time,
get_blank_yuv_frame(
self.config.detect.width, self.config.detect.height
),
get_blank_yuv_frame(self.detect_width, self.detect_height),
)
self.offline = True
@ -431,9 +435,9 @@ class PreviewRecorder:
return
old_frame_path = get_cache_image_name(
self.config.name, self.output_frames[-1]
self.camera_name, self.output_frames[-1]
)
new_frame_path = get_cache_image_name(self.config.name, frame_time)
new_frame_path = get_cache_image_name(self.camera_name, frame_time)
shutil.copy(old_frame_path, new_frame_path)
# save last frame to ensure consistent duration
@ -447,13 +451,12 @@ class PreviewRecorder:
self.reset_frame_cache(frame_time)
def stop(self) -> None:
self.config_subscriber.stop()
self.requestor.stop()
def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]
) -> list[TrackedObject]:
frame_time: float, camera_config: CameraConfig, all_objects: list[dict[str, Any]]
) -> list[dict[str, Any]]:
"""get active objects for detection."""
return [
o

View File

@ -10,7 +10,7 @@ from ruamel.yaml.constructor import DuplicateKeyError
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors import DetectorTypeEnum
from frigate.util.builtin import deep_merge
from frigate.util.builtin import deep_merge, load_labels
class TestConfig(unittest.TestCase):
@ -288,6 +288,65 @@ class TestConfig(unittest.TestCase):
frigate_config = FrigateConfig(**config)
assert "dog" in frigate_config.cameras["back"].objects.filters
def test_default_audio_filters(self):
config = {
"mqtt": {"host": "mqtt"},
"audio": {"listen": ["speech", "yell"]},
"cameras": {
"back": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
]
},
"detect": {
"height": 1080,
"width": 1920,
"fps": 5,
},
}
},
}
frigate_config = FrigateConfig(**config)
all_audio_labels = {
label
for label in load_labels("/audio-labelmap.txt", prefill=521).values()
if label
}
assert all_audio_labels.issubset(
set(frigate_config.cameras["back"].audio.filters.keys())
)
def test_override_audio_filters(self):
config = {
"mqtt": {"host": "mqtt"},
"cameras": {
"back": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
]
},
"detect": {
"height": 1080,
"width": 1920,
"fps": 5,
},
"audio": {
"listen": ["speech", "yell"],
"filters": {"speech": {"threshold": 0.9}},
},
}
},
}
frigate_config = FrigateConfig(**config)
assert "speech" in frigate_config.cameras["back"].audio.filters
assert frigate_config.cameras["back"].audio.filters["speech"].threshold == 0.9
assert "babbling" in frigate_config.cameras["back"].audio.filters
def test_inherit_object_filters(self):
config = {
"mqtt": {"host": "mqtt"},

View File

@ -81,6 +81,7 @@ class TrackedObjectProcessor(threading.Thread):
CameraConfigUpdateEnum.motion,
CameraConfigUpdateEnum.objects,
CameraConfigUpdateEnum.remove,
CameraConfigUpdateEnum.timestamp_style,
CameraConfigUpdateEnum.zones,
],
)

View File

@ -752,7 +752,7 @@
},
"live": {
"label": "Live playback",
"description": "Settings used by the Web UI to control live stream resolution and quality.",
"description": "Settings to control the jsmpeg live stream resolution and quality. This does not affect restreamed cameras that use go2rtc for live view.",
"streams": {
"label": "Live stream names",
"description": "Mapping of configured stream names to restream/go2rtc names used for live playback."

View File

@ -825,6 +825,12 @@
"area": "Area"
}
},
"timestampPosition": {
"tl": "Top left",
"tr": "Top right",
"bl": "Bottom left",
"br": "Bottom right"
},
"users": {
"title": "Users",
"management": {
@ -1342,7 +1348,22 @@
"preset-nvidia": "NVIDIA GPU",
"preset-jetson-h264": "NVIDIA Jetson (H.264)",
"preset-jetson-h265": "NVIDIA Jetson (H.265)",
"preset-rkmpp": "Rockchip RKMPP"
"preset-rkmpp": "Rockchip RKMPP",
"preset-http-jpeg-generic": "HTTP JPEG (Generic)",
"preset-http-mjpeg-generic": "HTTP MJPEG (Generic)",
"preset-http-reolink": "HTTP - Reolink Cameras",
"preset-rtmp-generic": "RTMP (Generic)",
"preset-rtsp-generic": "RTSP (Generic)",
"preset-rtsp-restream": "RTSP - Restream from go2rtc",
"preset-rtsp-restream-low-latency": "RTSP - Restream from go2rtc (Low Latency)",
"preset-rtsp-udp": "RTSP - UDP",
"preset-rtsp-blue-iris": "RTSP - Blue Iris",
"preset-record-generic": "Record (Generic, no audio)",
"preset-record-generic-audio-copy": "Record (Generic + Copy Audio)",
"preset-record-generic-audio-aac": "Record (Generic + Audio to AAC)",
"preset-record-mjpeg": "Record - MJPEG Cameras",
"preset-record-jpeg": "Record - JPEG Cameras",
"preset-record-ubiquiti": "Record - Ubiquiti Cameras"
}
},
"cameraInputs": {

View File

@ -19,6 +19,16 @@ const audio: SectionConfigOverrides = {
hiddenFields: ["enabled_in_config"],
advancedFields: ["min_volume", "max_not_heard", "num_threads"],
uiSchema: {
filters: {
"ui:options": {
expandable: false,
},
},
"filters.*": {
"ui:options": {
additionalPropertyKeyReadonly: true,
},
},
listen: {
"ui:widget": "audioLabels",
},

View File

@ -29,6 +29,11 @@ const objects: SectionConfigOverrides = {
],
advancedFields: ["genai"],
uiSchema: {
filters: {
"ui:options": {
expandable: false,
},
},
"filters.*.min_area": {
"ui:options": {
suppressMultiSchema: true,

View File

@ -4,12 +4,13 @@ const timestampStyle: SectionConfigOverrides = {
base: {
sectionDocs: "/configuration/reference",
restartRequired: [],
fieldOrder: ["position", "format", "color", "thickness"],
fieldOrder: ["position", "format", "thickness", "color"],
hiddenFields: ["effect", "enabled_in_config"],
advancedFields: [],
uiSchema: {
position: {
"ui:size": "xs",
"ui:options": { enumI18nPrefix: "timestampPosition" },
},
format: {
"ui:size": "xs",
@ -17,7 +18,7 @@ const timestampStyle: SectionConfigOverrides = {
},
},
global: {
restartRequired: ["position", "format", "color", "thickness", "effect"],
restartRequired: [],
},
camera: {
restartRequired: [],

View File

@ -1,5 +1,6 @@
// Select Widget - maps to shadcn/ui Select
import type { WidgetProps } from "@rjsf/utils";
import { useTranslation } from "react-i18next";
import {
Select,
SelectContent,
@ -21,9 +22,18 @@ export function SelectWidget(props: WidgetProps) {
schema,
} = props;
const { t } = useTranslation(["views/settings"]);
const { enumOptions = [] } = options;
const enumI18nPrefix = options["enumI18nPrefix"] as string | undefined;
const fieldClassName = getSizedFieldClassName(options, "sm");
const getLabel = (option: { value: unknown; label: string }) => {
if (enumI18nPrefix) {
return t(`${enumI18nPrefix}.${option.value}`);
}
return option.label;
};
return (
<Select
value={value?.toString() ?? ""}
@ -42,7 +52,7 @@ export function SelectWidget(props: WidgetProps) {
<SelectContent>
{enumOptions.map((option: { value: unknown; label: string }) => (
<SelectItem key={String(option.value)} value={String(option.value)}>
{option.label}
{getLabel(option)}
</SelectItem>
))}
</SelectContent>

View File

@ -707,14 +707,23 @@ export default function LiveCameraView({
}}
>
<div
className={`relative flex flex-col items-center justify-center ${growClassName}`}
className={cn(
"flex flex-col items-center justify-center",
growClassName,
)}
ref={clickOverlayRef}
style={{
aspectRatio: constrainedAspectRatio,
}}
>
{clickOverlay && overlaySize.width > 0 && (
<div className="absolute inset-0 z-40 cursor-crosshair">
<div
className="absolute z-40 cursor-crosshair"
style={{
width: overlaySize.width,
height: overlaySize.height,
}}
>
<Stage
width={overlaySize.width}
height={overlaySize.height}