Optimize more mypy classes (#22637)

* Cleanup motion mypy

* Cleanup object detection mypy

* Update output mypy

* Cleanup
This commit is contained in:
Nicolas Mowen 2026-03-25 12:53:19 -06:00 committed by GitHub
parent 80c4ce2b5d
commit b1c410bc3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 228 additions and 179 deletions

View File

@ -317,7 +317,7 @@ class MemryXDetector(DetectionApi):
f"Failed to remove downloaded zip {zip_path}: {e}" f"Failed to remove downloaded zip {zip_path}: {e}"
) )
def send_input(self, connection_id, tensor_input: np.ndarray): def send_input(self, connection_id, tensor_input: np.ndarray) -> None:
"""Pre-process (if needed) and send frame to MemryX input queue""" """Pre-process (if needed) and send frame to MemryX input queue"""
if tensor_input is None: if tensor_input is None:
raise ValueError("[send_input] No image data provided for inference") raise ValueError("[send_input] No image data provided for inference")

View File

@ -1,7 +1,9 @@
from typing import Any
import cv2 import cv2
import numpy as np import numpy as np
from frigate.config import MotionConfig from frigate.config.config import RuntimeMotionConfig
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
from frigate.util.image import grab_cv2_contours from frigate.util.image import grab_cv2_contours
@ -9,19 +11,20 @@ from frigate.util.image import grab_cv2_contours
class FrigateMotionDetector(MotionDetector): class FrigateMotionDetector(MotionDetector):
def __init__( def __init__(
self, self,
frame_shape, frame_shape: tuple[int, ...],
config: MotionConfig, config: RuntimeMotionConfig,
fps: int, fps: int,
improve_contrast, improve_contrast: Any,
threshold, threshold: Any,
contour_area, contour_area: Any,
): ) -> None:
self.config = config self.config = config
self.frame_shape = frame_shape self.frame_shape = frame_shape
self.resize_factor = frame_shape[0] / config.frame_height frame_height = config.frame_height or frame_shape[0]
self.resize_factor = frame_shape[0] / frame_height
self.motion_frame_size = ( self.motion_frame_size = (
config.frame_height, frame_height,
config.frame_height * frame_shape[1] // frame_shape[0], frame_height * frame_shape[1] // frame_shape[0],
) )
self.avg_frame = np.zeros(self.motion_frame_size, np.float32) self.avg_frame = np.zeros(self.motion_frame_size, np.float32)
self.avg_delta = np.zeros(self.motion_frame_size, np.float32) self.avg_delta = np.zeros(self.motion_frame_size, np.float32)
@ -38,10 +41,10 @@ class FrigateMotionDetector(MotionDetector):
self.threshold = threshold self.threshold = threshold
self.contour_area = contour_area self.contour_area = contour_area
def is_calibrating(self): def is_calibrating(self) -> bool:
return False return False
def detect(self, frame): def detect(self, frame: np.ndarray) -> list:
motion_boxes = [] motion_boxes = []
gray = frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]] gray = frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]]
@ -99,7 +102,7 @@ class FrigateMotionDetector(MotionDetector):
# dilate the thresholded image to fill in holes, then find contours # dilate the thresholded image to fill in holes, then find contours
# on thresholded image # on thresholded image
thresh_dilated = cv2.dilate(thresh, None, iterations=2) thresh_dilated = cv2.dilate(thresh, None, iterations=2) # type: ignore[call-overload]
contours = cv2.findContours( contours = cv2.findContours(
thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
) )

View File

@ -1,11 +1,12 @@
import logging import logging
from typing import Optional
import cv2 import cv2
import numpy as np import numpy as np
from scipy.ndimage import gaussian_filter from scipy.ndimage import gaussian_filter
from frigate.camera import PTZMetrics from frigate.camera import PTZMetrics
from frigate.config import MotionConfig from frigate.config.config import RuntimeMotionConfig
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
from frigate.util.image import grab_cv2_contours from frigate.util.image import grab_cv2_contours
@ -15,22 +16,23 @@ logger = logging.getLogger(__name__)
class ImprovedMotionDetector(MotionDetector): class ImprovedMotionDetector(MotionDetector):
def __init__( def __init__(
self, self,
frame_shape, frame_shape: tuple[int, ...],
config: MotionConfig, config: RuntimeMotionConfig,
fps: int, fps: int,
ptz_metrics: PTZMetrics = None, ptz_metrics: Optional[PTZMetrics] = None,
name="improved", name: str = "improved",
blur_radius=1, blur_radius: int = 1,
interpolation=cv2.INTER_NEAREST, interpolation: int = cv2.INTER_NEAREST,
contrast_frame_history=50, contrast_frame_history: int = 50,
): ) -> None:
self.name = name self.name = name
self.config = config self.config = config
self.frame_shape = frame_shape self.frame_shape = frame_shape
self.resize_factor = frame_shape[0] / config.frame_height frame_height = config.frame_height or frame_shape[0]
self.resize_factor = frame_shape[0] / frame_height
self.motion_frame_size = ( self.motion_frame_size = (
config.frame_height, frame_height,
config.frame_height * frame_shape[1] // frame_shape[0], frame_height * frame_shape[1] // frame_shape[0],
) )
self.avg_frame = np.zeros(self.motion_frame_size, np.float32) self.avg_frame = np.zeros(self.motion_frame_size, np.float32)
self.motion_frame_count = 0 self.motion_frame_count = 0
@ -44,20 +46,20 @@ class ImprovedMotionDetector(MotionDetector):
self.contrast_values[:, 1:2] = 255 self.contrast_values[:, 1:2] = 255
self.contrast_values_index = 0 self.contrast_values_index = 0
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
self.last_stop_time = None self.last_stop_time: float | None = None
def is_calibrating(self): def is_calibrating(self) -> bool:
return self.calibrating return self.calibrating
def detect(self, frame): def detect(self, frame: np.ndarray) -> list[tuple[int, int, int, int]]:
motion_boxes = [] motion_boxes: list[tuple[int, int, int, int]] = []
if not self.config.enabled: if not self.config.enabled:
return motion_boxes return motion_boxes
# if ptz motor is moving from autotracking, quickly return # if ptz motor is moving from autotracking, quickly return
# a single box that is 80% of the frame # a single box that is 80% of the frame
if ( if self.ptz_metrics is not None and (
self.ptz_metrics.autotracker_enabled.value self.ptz_metrics.autotracker_enabled.value
and not self.ptz_metrics.motor_stopped.is_set() and not self.ptz_metrics.motor_stopped.is_set()
): ):
@ -130,19 +132,19 @@ class ImprovedMotionDetector(MotionDetector):
# dilate the thresholded image to fill in holes, then find contours # dilate the thresholded image to fill in holes, then find contours
# on thresholded image # on thresholded image
thresh_dilated = cv2.dilate(thresh, None, iterations=1) thresh_dilated = cv2.dilate(thresh, None, iterations=1) # type: ignore[call-overload]
contours = cv2.findContours( contours = cv2.findContours(
thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
) )
contours = grab_cv2_contours(contours) contours = grab_cv2_contours(contours)
# loop over the contours # loop over the contours
total_contour_area = 0 total_contour_area: float = 0
for c in contours: for c in contours:
# if the contour is big enough, count it as motion # if the contour is big enough, count it as motion
contour_area = cv2.contourArea(c) contour_area = cv2.contourArea(c)
total_contour_area += contour_area total_contour_area += contour_area
if contour_area > self.config.contour_area: if contour_area > (self.config.contour_area or 0):
x, y, w, h = cv2.boundingRect(c) x, y, w, h = cv2.boundingRect(c)
motion_boxes.append( motion_boxes.append(
( (
@ -159,7 +161,7 @@ class ImprovedMotionDetector(MotionDetector):
# check if the motor has just stopped from autotracking # check if the motor has just stopped from autotracking
# if so, reassign the average to the current frame so we begin with a new baseline # if so, reassign the average to the current frame so we begin with a new baseline
if ( if self.ptz_metrics is not None and (
# ensure we only do this for cameras with autotracking enabled # ensure we only do this for cameras with autotracking enabled
self.ptz_metrics.autotracker_enabled.value self.ptz_metrics.autotracker_enabled.value
and self.ptz_metrics.motor_stopped.is_set() and self.ptz_metrics.motor_stopped.is_set()

View File

@ -47,13 +47,13 @@ ignore_errors = false
[mypy-frigate.jobs.*] [mypy-frigate.jobs.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.motion] [mypy-frigate.motion.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.object_detection] [mypy-frigate.object_detection.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.output] [mypy-frigate.output.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.ptz] [mypy-frigate.ptz]

View File

@ -7,6 +7,7 @@ from abc import ABC, abstractmethod
from collections import deque from collections import deque
from multiprocessing import Queue, Value from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional
import numpy as np import numpy as np
import zmq import zmq
@ -34,26 +35,25 @@ logger = logging.getLogger(__name__)
class ObjectDetector(ABC): class ObjectDetector(ABC):
@abstractmethod @abstractmethod
def detect(self, tensor_input, threshold: float = 0.4): def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list:
pass pass
class BaseLocalDetector(ObjectDetector): class BaseLocalDetector(ObjectDetector):
def __init__( def __init__(
self, self,
detector_config: BaseDetectorConfig = None, detector_config: Optional[BaseDetectorConfig] = None,
labels: str = None, labels: Optional[str] = None,
stop_event: MpEvent = None, stop_event: Optional[MpEvent] = None,
): ) -> None:
self.fps = EventsPerSecond() self.fps = EventsPerSecond()
if labels is None: if labels is None:
self.labels = {} self.labels: dict[int, str] = {}
else: else:
self.labels = load_labels(labels) self.labels = load_labels(labels)
if detector_config: if detector_config and detector_config.model:
self.input_transform = tensor_transform(detector_config.model.input_tensor) self.input_transform = tensor_transform(detector_config.model.input_tensor)
self.dtype = detector_config.model.input_dtype self.dtype = detector_config.model.input_dtype
else: else:
self.input_transform = None self.input_transform = None
@ -77,10 +77,10 @@ class BaseLocalDetector(ObjectDetector):
return tensor_input return tensor_input
def detect(self, tensor_input: np.ndarray, threshold=0.4): def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list:
detections = [] detections = []
raw_detections = self.detect_raw(tensor_input) raw_detections = self.detect_raw(tensor_input) # type: ignore[attr-defined]
for d in raw_detections: for d in raw_detections:
if int(d[0]) < 0 or int(d[0]) >= len(self.labels): if int(d[0]) < 0 or int(d[0]) >= len(self.labels):
@ -96,28 +96,28 @@ class BaseLocalDetector(ObjectDetector):
class LocalObjectDetector(BaseLocalDetector): class LocalObjectDetector(BaseLocalDetector):
def detect_raw(self, tensor_input: np.ndarray): def detect_raw(self, tensor_input: np.ndarray) -> np.ndarray:
tensor_input = self._transform_input(tensor_input) tensor_input = self._transform_input(tensor_input)
return self.detect_api.detect_raw(tensor_input=tensor_input) return self.detect_api.detect_raw(tensor_input=tensor_input) # type: ignore[no-any-return]
class AsyncLocalObjectDetector(BaseLocalDetector): class AsyncLocalObjectDetector(BaseLocalDetector):
def async_send_input(self, tensor_input: np.ndarray, connection_id: str): def async_send_input(self, tensor_input: np.ndarray, connection_id: str) -> None:
tensor_input = self._transform_input(tensor_input) tensor_input = self._transform_input(tensor_input)
return self.detect_api.send_input(connection_id, tensor_input) self.detect_api.send_input(connection_id, tensor_input)
def async_receive_output(self): def async_receive_output(self) -> Any:
return self.detect_api.receive_output() return self.detect_api.receive_output()
class DetectorRunner(FrigateProcess): class DetectorRunner(FrigateProcess):
def __init__( def __init__(
self, self,
name, name: str,
detection_queue: Queue, detection_queue: Queue,
cameras: list[str], cameras: list[str],
avg_speed: Value, avg_speed: Any,
start_time: Value, start_time: Any,
config: FrigateConfig, config: FrigateConfig,
detector_config: BaseDetectorConfig, detector_config: BaseDetectorConfig,
stop_event: MpEvent, stop_event: MpEvent,
@ -129,11 +129,11 @@ class DetectorRunner(FrigateProcess):
self.start_time = start_time self.start_time = start_time
self.config = config self.config = config
self.detector_config = detector_config self.detector_config = detector_config
self.outputs: dict = {} self.outputs: dict[str, Any] = {}
def create_output_shm(self, name: str): def create_output_shm(self, name: str) -> None:
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) out_np: np.ndarray = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
self.outputs[name] = {"shm": out_shm, "np": out_np} self.outputs[name] = {"shm": out_shm, "np": out_np}
def run(self) -> None: def run(self) -> None:
@ -155,8 +155,8 @@ class DetectorRunner(FrigateProcess):
connection_id, connection_id,
( (
1, 1,
self.detector_config.model.height, self.detector_config.model.height, # type: ignore[union-attr]
self.detector_config.model.width, self.detector_config.model.width, # type: ignore[union-attr]
3, 3,
), ),
) )
@ -187,11 +187,11 @@ class DetectorRunner(FrigateProcess):
class AsyncDetectorRunner(FrigateProcess): class AsyncDetectorRunner(FrigateProcess):
def __init__( def __init__(
self, self,
name, name: str,
detection_queue: Queue, detection_queue: Queue,
cameras: list[str], cameras: list[str],
avg_speed: Value, avg_speed: Any,
start_time: Value, start_time: Any,
config: FrigateConfig, config: FrigateConfig,
detector_config: BaseDetectorConfig, detector_config: BaseDetectorConfig,
stop_event: MpEvent, stop_event: MpEvent,
@ -203,15 +203,15 @@ class AsyncDetectorRunner(FrigateProcess):
self.start_time = start_time self.start_time = start_time
self.config = config self.config = config
self.detector_config = detector_config self.detector_config = detector_config
self.outputs: dict = {} self.outputs: dict[str, Any] = {}
self._frame_manager: SharedMemoryFrameManager | None = None self._frame_manager: SharedMemoryFrameManager | None = None
self._publisher: ObjectDetectorPublisher | None = None self._publisher: ObjectDetectorPublisher | None = None
self._detector: AsyncLocalObjectDetector | None = None self._detector: AsyncLocalObjectDetector | None = None
self.send_times = deque() self.send_times: deque[float] = deque()
def create_output_shm(self, name: str): def create_output_shm(self, name: str) -> None:
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) out_np: np.ndarray = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
self.outputs[name] = {"shm": out_shm, "np": out_np} self.outputs[name] = {"shm": out_shm, "np": out_np}
def _detect_worker(self) -> None: def _detect_worker(self) -> None:
@ -222,12 +222,13 @@ class AsyncDetectorRunner(FrigateProcess):
except queue.Empty: except queue.Empty:
continue continue
assert self._frame_manager is not None
input_frame = self._frame_manager.get( input_frame = self._frame_manager.get(
connection_id, connection_id,
( (
1, 1,
self.detector_config.model.height, self.detector_config.model.height, # type: ignore[union-attr]
self.detector_config.model.width, self.detector_config.model.width, # type: ignore[union-attr]
3, 3,
), ),
) )
@ -238,11 +239,13 @@ class AsyncDetectorRunner(FrigateProcess):
# mark start time and send to accelerator # mark start time and send to accelerator
self.send_times.append(time.perf_counter()) self.send_times.append(time.perf_counter())
assert self._detector is not None
self._detector.async_send_input(input_frame, connection_id) self._detector.async_send_input(input_frame, connection_id)
def _result_worker(self) -> None: def _result_worker(self) -> None:
logger.info("Starting Result Worker Thread") logger.info("Starting Result Worker Thread")
while not self.stop_event.is_set(): while not self.stop_event.is_set():
assert self._detector is not None
connection_id, detections = self._detector.async_receive_output() connection_id, detections = self._detector.async_receive_output()
# Handle timeout case (queue.Empty) - just continue # Handle timeout case (queue.Empty) - just continue
@ -256,6 +259,7 @@ class AsyncDetectorRunner(FrigateProcess):
duration = time.perf_counter() - ts duration = time.perf_counter() - ts
# release input buffer # release input buffer
assert self._frame_manager is not None
self._frame_manager.close(connection_id) self._frame_manager.close(connection_id)
if connection_id not in self.outputs: if connection_id not in self.outputs:
@ -264,6 +268,7 @@ class AsyncDetectorRunner(FrigateProcess):
# write results and publish # write results and publish
if detections is not None: if detections is not None:
self.outputs[connection_id]["np"][:] = detections[:] self.outputs[connection_id]["np"][:] = detections[:]
assert self._publisher is not None
self._publisher.publish(connection_id) self._publisher.publish(connection_id)
# update timers # update timers
@ -330,11 +335,14 @@ class ObjectDetectProcess:
self.stop_event = stop_event self.stop_event = stop_event
self.start_or_restart() self.start_or_restart()
def stop(self): def stop(self) -> None:
# if the process has already exited on its own, just return # if the process has already exited on its own, just return
if self.detect_process and self.detect_process.exitcode: if self.detect_process and self.detect_process.exitcode:
return return
if self.detect_process is None:
return
logging.info("Waiting for detection process to exit gracefully...") logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30) self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None: if self.detect_process.exitcode is None:
@ -343,8 +351,8 @@ class ObjectDetectProcess:
self.detect_process.join() self.detect_process.join()
logging.info("Detection process has exited...") logging.info("Detection process has exited...")
def start_or_restart(self): def start_or_restart(self) -> None:
self.detection_start.value = 0.0 self.detection_start.value = 0.0 # type: ignore[attr-defined]
if (self.detect_process is not None) and self.detect_process.is_alive(): if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop() self.stop()
@ -389,17 +397,19 @@ class RemoteObjectDetector:
self.detection_queue = detection_queue self.detection_queue = detection_queue
self.stop_event = stop_event self.stop_event = stop_event
self.shm = UntrackedSharedMemory(name=self.name, create=False) self.shm = UntrackedSharedMemory(name=self.name, create=False)
self.np_shm = np.ndarray( self.np_shm: np.ndarray = np.ndarray(
(1, model_config.height, model_config.width, 3), (1, model_config.height, model_config.width, 3),
dtype=np.uint8, dtype=np.uint8,
buffer=self.shm.buf, buffer=self.shm.buf,
) )
self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False)
self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) self.out_np_shm: np.ndarray = np.ndarray(
(20, 6), dtype=np.float32, buffer=self.out_shm.buf
)
self.detector_subscriber = ObjectDetectorSubscriber(name) self.detector_subscriber = ObjectDetectorSubscriber(name)
def detect(self, tensor_input, threshold=0.4): def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list:
detections = [] detections: list = []
if self.stop_event.is_set(): if self.stop_event.is_set():
return detections return detections
@ -431,7 +441,7 @@ class RemoteObjectDetector:
self.fps.update() self.fps.update()
return detections return detections
def cleanup(self): def cleanup(self) -> None:
self.detector_subscriber.stop() self.detector_subscriber.stop()
self.shm.unlink() self.shm.unlink()
self.out_shm.unlink() self.out_shm.unlink()

View File

@ -13,10 +13,10 @@ class RequestStore:
A thread-safe hash-based response store that handles creating requests. A thread-safe hash-based response store that handles creating requests.
""" """
def __init__(self): def __init__(self) -> None:
self.request_counter = 0 self.request_counter = 0
self.request_counter_lock = threading.Lock() self.request_counter_lock = threading.Lock()
self.input_queue = queue.Queue() self.input_queue: queue.Queue[tuple[int, ndarray]] = queue.Queue()
def __get_request_id(self) -> int: def __get_request_id(self) -> int:
with self.request_counter_lock: with self.request_counter_lock:
@ -45,17 +45,19 @@ class ResponseStore:
their request's result appears. their request's result appears.
""" """
def __init__(self): def __init__(self) -> None:
self.responses = {} # Maps request_id -> (original_input, infer_results) self.responses: dict[
int, ndarray
] = {} # Maps request_id -> (original_input, infer_results)
self.lock = threading.Lock() self.lock = threading.Lock()
self.cond = threading.Condition(self.lock) self.cond = threading.Condition(self.lock)
def put(self, request_id: int, response: ndarray): def put(self, request_id: int, response: ndarray) -> None:
with self.cond: with self.cond:
self.responses[request_id] = response self.responses[request_id] = response
self.cond.notify_all() self.cond.notify_all()
def get(self, request_id: int, timeout=None) -> ndarray: def get(self, request_id: int, timeout: float | None = None) -> ndarray:
with self.cond: with self.cond:
if not self.cond.wait_for( if not self.cond.wait_for(
lambda: request_id in self.responses, timeout=timeout lambda: request_id in self.responses, timeout=timeout
@ -65,7 +67,9 @@ class ResponseStore:
return self.responses.pop(request_id) return self.responses.pop(request_id)
def tensor_transform(desired_shape: InputTensorEnum): def tensor_transform(
desired_shape: InputTensorEnum,
) -> tuple[int, int, int, int] | None:
# Currently this function only supports BHWC permutations # Currently this function only supports BHWC permutations
if desired_shape == InputTensorEnum.nhwc: if desired_shape == InputTensorEnum.nhwc:
return None return None

View File

@ -4,13 +4,13 @@ import datetime
import glob import glob
import logging import logging
import math import math
import multiprocessing as mp
import os import os
import queue import queue
import subprocess as sp import subprocess as sp
import threading import threading
import time import time
import traceback import traceback
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional from typing import Any, Optional
import cv2 import cv2
@ -74,25 +74,25 @@ class Canvas:
self, self,
canvas_width: int, canvas_width: int,
canvas_height: int, canvas_height: int,
scaling_factor: int, scaling_factor: float,
) -> None: ) -> None:
self.scaling_factor = scaling_factor self.scaling_factor = scaling_factor
gcd = math.gcd(canvas_width, canvas_height) gcd = math.gcd(canvas_width, canvas_height)
self.aspect = get_standard_aspect_ratio( self.aspect = get_standard_aspect_ratio(
(canvas_width / gcd), (canvas_height / gcd) int(canvas_width / gcd), int(canvas_height / gcd)
) )
self.width = canvas_width self.width = canvas_width
self.height = (self.width * self.aspect[1]) / self.aspect[0] self.height: float = (self.width * self.aspect[1]) / self.aspect[0]
self.coefficient_cache: dict[int, int] = {} self.coefficient_cache: dict[int, float] = {}
self.aspect_cache: dict[str, tuple[int, int]] = {} self.aspect_cache: dict[str, tuple[int, int]] = {}
def get_aspect(self, coefficient: int) -> tuple[int, int]: def get_aspect(self, coefficient: float) -> tuple[float, float]:
return (self.aspect[0] * coefficient, self.aspect[1] * coefficient) return (self.aspect[0] * coefficient, self.aspect[1] * coefficient)
def get_coefficient(self, camera_count: int) -> int: def get_coefficient(self, camera_count: int) -> float:
return self.coefficient_cache.get(camera_count, self.scaling_factor) return self.coefficient_cache.get(camera_count, self.scaling_factor)
def set_coefficient(self, camera_count: int, coefficient: int) -> None: def set_coefficient(self, camera_count: int, coefficient: float) -> None:
self.coefficient_cache[camera_count] = coefficient self.coefficient_cache[camera_count] = coefficient
def get_camera_aspect( def get_camera_aspect(
@ -105,7 +105,7 @@ class Canvas:
gcd = math.gcd(camera_width, camera_height) gcd = math.gcd(camera_width, camera_height)
camera_aspect = get_standard_aspect_ratio( camera_aspect = get_standard_aspect_ratio(
camera_width / gcd, camera_height / gcd int(camera_width / gcd), int(camera_height / gcd)
) )
self.aspect_cache[cam_name] = camera_aspect self.aspect_cache[cam_name] = camera_aspect
return camera_aspect return camera_aspect
@ -116,7 +116,7 @@ class FFMpegConverter(threading.Thread):
self, self,
ffmpeg: FfmpegConfig, ffmpeg: FfmpegConfig,
input_queue: queue.Queue, input_queue: queue.Queue,
stop_event: mp.Event, stop_event: MpEvent,
in_width: int, in_width: int,
in_height: int, in_height: int,
out_width: int, out_width: int,
@ -128,7 +128,7 @@ class FFMpegConverter(threading.Thread):
self.camera = "birdseye" self.camera = "birdseye"
self.input_queue = input_queue self.input_queue = input_queue
self.stop_event = stop_event self.stop_event = stop_event
self.bd_pipe = None self.bd_pipe: int | None = None
if birdseye_rtsp: if birdseye_rtsp:
self.recreate_birdseye_pipe() self.recreate_birdseye_pipe()
@ -181,7 +181,8 @@ class FFMpegConverter(threading.Thread):
os.close(stdin) os.close(stdin)
self.reading_birdseye = False self.reading_birdseye = False
def __write(self, b) -> None: def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b) self.process.stdin.write(b)
if self.bd_pipe: if self.bd_pipe:
@ -200,13 +201,13 @@ class FFMpegConverter(threading.Thread):
return return
def read(self, length): def read(self, length: int) -> Any:
try: try:
return self.process.stdout.read1(length) return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError: except ValueError:
return False return False
def exit(self): def exit(self) -> None:
if self.bd_pipe: if self.bd_pipe:
os.close(self.bd_pipe) os.close(self.bd_pipe)
@ -233,8 +234,8 @@ class BroadcastThread(threading.Thread):
self, self,
camera: str, camera: str,
converter: FFMpegConverter, converter: FFMpegConverter,
websocket_server, websocket_server: Any,
stop_event: mp.Event, stop_event: MpEvent,
): ):
super().__init__() super().__init__()
self.camera = camera self.camera = camera
@ -242,7 +243,7 @@ class BroadcastThread(threading.Thread):
self.websocket_server = websocket_server self.websocket_server = websocket_server
self.stop_event = stop_event self.stop_event = stop_event
def run(self): def run(self) -> None:
while not self.stop_event.is_set(): while not self.stop_event.is_set():
buf = self.converter.read(65536) buf = self.converter.read(65536)
if buf: if buf:
@ -270,16 +271,16 @@ class BirdsEyeFrameManager:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
stop_event: mp.Event, stop_event: MpEvent,
): ):
self.config = config self.config = config
width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height) width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height)
self.frame_shape = (height, width) self.frame_shape = (height, width)
self.yuv_shape = (height * 3 // 2, width) self.yuv_shape = (height * 3 // 2, width)
self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8) self.frame: np.ndarray = np.ndarray(self.yuv_shape, dtype=np.uint8)
self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor) self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor)
self.stop_event = stop_event self.stop_event = stop_event
self.last_refresh_time = 0 self.last_refresh_time: float = 0
# initialize the frame as black and with the Frigate logo # initialize the frame as black and with the Frigate logo
self.blank_frame = np.zeros(self.yuv_shape, np.uint8) self.blank_frame = np.zeros(self.yuv_shape, np.uint8)
@ -323,15 +324,15 @@ class BirdsEyeFrameManager:
self.frame[:] = self.blank_frame self.frame[:] = self.blank_frame
self.cameras = {} self.cameras: dict[str, Any] = {}
for camera in self.config.cameras.keys(): for camera in self.config.cameras.keys():
self.add_camera(camera) self.add_camera(camera)
self.camera_layout = [] self.camera_layout: list[Any] = []
self.active_cameras = set() self.active_cameras: set[str] = set()
self.last_output_time = 0.0 self.last_output_time = 0.0
def add_camera(self, cam: str): def add_camera(self, cam: str) -> None:
"""Add a camera to self.cameras with the correct structure.""" """Add a camera to self.cameras with the correct structure."""
settings = self.config.cameras[cam] settings = self.config.cameras[cam]
# precalculate the coordinates for all the channels # precalculate the coordinates for all the channels
@ -361,16 +362,21 @@ class BirdsEyeFrameManager:
}, },
} }
def remove_camera(self, cam: str): def remove_camera(self, cam: str) -> None:
"""Remove a camera from self.cameras.""" """Remove a camera from self.cameras."""
if cam in self.cameras: if cam in self.cameras:
del self.cameras[cam] del self.cameras[cam]
def clear_frame(self): def clear_frame(self) -> None:
logger.debug("Clearing the birdseye frame") logger.debug("Clearing the birdseye frame")
self.frame[:] = self.blank_frame self.frame[:] = self.blank_frame
def copy_to_position(self, position, camera=None, frame: np.ndarray = None): def copy_to_position(
self,
position: Any,
camera: Optional[str] = None,
frame: Optional[np.ndarray] = None,
) -> None:
if camera is None: if camera is None:
frame = None frame = None
channel_dims = None channel_dims = None
@ -389,7 +395,9 @@ class BirdsEyeFrameManager:
channel_dims, channel_dims,
) )
def camera_active(self, mode, object_box_count, motion_box_count): def camera_active(
self, mode: Any, object_box_count: int, motion_box_count: int
) -> bool:
if mode == BirdseyeModeEnum.continuous: if mode == BirdseyeModeEnum.continuous:
return True return True
@ -399,6 +407,8 @@ class BirdsEyeFrameManager:
if mode == BirdseyeModeEnum.objects and object_box_count > 0: if mode == BirdseyeModeEnum.objects and object_box_count > 0:
return True return True
return False
def get_camera_coordinates(self) -> dict[str, dict[str, int]]: def get_camera_coordinates(self) -> dict[str, dict[str, int]]:
"""Return the coordinates of each camera in the current layout.""" """Return the coordinates of each camera in the current layout."""
coordinates = {} coordinates = {}
@ -451,7 +461,7 @@ class BirdsEyeFrameManager:
- self.cameras[active_camera]["last_active_frame"] - self.cameras[active_camera]["last_active_frame"]
), ),
) )
active_cameras = limited_active_cameras[:max_cameras] active_cameras = set(limited_active_cameras[:max_cameras])
max_camera_refresh = True max_camera_refresh = True
self.last_refresh_time = now self.last_refresh_time = now
@ -510,7 +520,7 @@ class BirdsEyeFrameManager:
# center camera view in canvas and ensure that it fits # center camera view in canvas and ensure that it fits
if scaled_width < self.canvas.width: if scaled_width < self.canvas.width:
coefficient = 1 coefficient: float = 1
x_offset = int((self.canvas.width - scaled_width) / 2) x_offset = int((self.canvas.width - scaled_width) / 2)
else: else:
coefficient = self.canvas.width / scaled_width coefficient = self.canvas.width / scaled_width
@ -557,7 +567,7 @@ class BirdsEyeFrameManager:
calculating = False calculating = False
self.canvas.set_coefficient(len(active_cameras), coefficient) self.canvas.set_coefficient(len(active_cameras), coefficient)
self.camera_layout = layout_candidate self.camera_layout = layout_candidate or []
frame_changed = True frame_changed = True
# Draw the layout # Draw the layout
@ -577,10 +587,12 @@ class BirdsEyeFrameManager:
self, self,
cameras_to_add: list[str], cameras_to_add: list[str],
coefficient: float, coefficient: float,
) -> tuple[Any]: ) -> Optional[list[list[Any]]]:
"""Calculate the optimal layout for 2+ cameras.""" """Calculate the optimal layout for 2+ cameras."""
def map_layout(camera_layout: list[list[Any]], row_height: int): def map_layout(
camera_layout: list[list[Any]], row_height: int
) -> tuple[int, int, Optional[list[list[Any]]]]:
"""Map the calculated layout.""" """Map the calculated layout."""
candidate_layout = [] candidate_layout = []
starting_x = 0 starting_x = 0
@ -777,11 +789,11 @@ class Birdseye:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
stop_event: mp.Event, stop_event: MpEvent,
websocket_server, websocket_server: Any,
) -> None: ) -> None:
self.config = config self.config = config
self.input = queue.Queue(maxsize=10) self.input: queue.Queue[bytes] = queue.Queue(maxsize=10)
self.converter = FFMpegConverter( self.converter = FFMpegConverter(
config.ffmpeg, config.ffmpeg,
self.input, self.input,
@ -806,7 +818,7 @@ class Birdseye:
) )
if config.birdseye.restream: if config.birdseye.restream:
self.birdseye_buffer = self.frame_manager.create( self.birdseye_buffer: Any = self.frame_manager.create(
"birdseye", "birdseye",
self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1], self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1],
) )

View File

@ -1,10 +1,11 @@
"""Handle outputting individual cameras via jsmpeg.""" """Handle outputting individual cameras via jsmpeg."""
import logging import logging
import multiprocessing as mp
import queue import queue
import subprocess as sp import subprocess as sp
import threading import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import CameraConfig, FfmpegConfig from frigate.config import CameraConfig, FfmpegConfig
@ -17,7 +18,7 @@ class FFMpegConverter(threading.Thread):
camera: str, camera: str,
ffmpeg: FfmpegConfig, ffmpeg: FfmpegConfig,
input_queue: queue.Queue, input_queue: queue.Queue,
stop_event: mp.Event, stop_event: MpEvent,
in_width: int, in_width: int,
in_height: int, in_height: int,
out_width: int, out_width: int,
@ -64,16 +65,17 @@ class FFMpegConverter(threading.Thread):
start_new_session=True, start_new_session=True,
) )
def __write(self, b) -> None: def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b) self.process.stdin.write(b)
def read(self, length): def read(self, length: int) -> Any:
try: try:
return self.process.stdout.read1(length) return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError: except ValueError:
return False return False
def exit(self): def exit(self) -> None:
self.process.terminate() self.process.terminate()
try: try:
@ -98,8 +100,8 @@ class BroadcastThread(threading.Thread):
self, self,
camera: str, camera: str,
converter: FFMpegConverter, converter: FFMpegConverter,
websocket_server, websocket_server: Any,
stop_event: mp.Event, stop_event: MpEvent,
): ):
super().__init__() super().__init__()
self.camera = camera self.camera = camera
@ -107,7 +109,7 @@ class BroadcastThread(threading.Thread):
self.websocket_server = websocket_server self.websocket_server = websocket_server
self.stop_event = stop_event self.stop_event = stop_event
def run(self): def run(self) -> None:
while not self.stop_event.is_set(): while not self.stop_event.is_set():
buf = self.converter.read(65536) buf = self.converter.read(65536)
if buf: if buf:
@ -133,15 +135,15 @@ class BroadcastThread(threading.Thread):
class JsmpegCamera: class JsmpegCamera:
def __init__( def __init__(
self, config: CameraConfig, stop_event: mp.Event, websocket_server self, config: CameraConfig, stop_event: MpEvent, websocket_server: Any
) -> None: ) -> None:
self.config = config self.config = config
self.input = queue.Queue(maxsize=config.detect.fps) self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps)
width = int( width = int(
config.live.height * (config.frame_shape[1] / config.frame_shape[0]) config.live.height * (config.frame_shape[1] / config.frame_shape[0])
) )
self.converter = FFMpegConverter( self.converter = FFMpegConverter(
config.name, config.name or "",
config.ffmpeg, config.ffmpeg,
self.input, self.input,
stop_event, stop_event,
@ -152,13 +154,13 @@ class JsmpegCamera:
config.live.quality, config.live.quality,
) )
self.broadcaster = BroadcastThread( self.broadcaster = BroadcastThread(
config.name, self.converter, websocket_server, stop_event config.name or "", self.converter, websocket_server, stop_event
) )
self.converter.start() self.converter.start()
self.broadcaster.start() self.broadcaster.start()
def write_frame(self, frame_bytes) -> None: def write_frame(self, frame_bytes: bytes) -> None:
try: try:
self.input.put_nowait(frame_bytes) self.input.put_nowait(frame_bytes)
except queue.Full: except queue.Full:

View File

@ -61,6 +61,12 @@ def check_disabled_camera_update(
# last camera update was more than 1 second ago # last camera update was more than 1 second ago
# need to send empty data to birdseye because current # need to send empty data to birdseye because current
# frame is now out of date # frame is now out of date
cam_width = config.cameras[camera].detect.width
cam_height = config.cameras[camera].detect.height
if cam_width is None or cam_height is None:
raise ValueError(f"Camera {camera} detect dimensions not configured")
if birdseye and offline_time < 10: if birdseye and offline_time < 10:
# we only need to send blank frames to birdseye at the beginning of a camera being offline # we only need to send blank frames to birdseye at the beginning of a camera being offline
birdseye.write_data( birdseye.write_data(
@ -68,10 +74,7 @@ def check_disabled_camera_update(
[], [],
[], [],
now, now,
get_blank_yuv_frame( get_blank_yuv_frame(cam_width, cam_height),
config.cameras[camera].detect.width,
config.cameras[camera].detect.height,
),
) )
if not has_enabled_camera and birdseye: if not has_enabled_camera and birdseye:
@ -173,7 +176,7 @@ class OutputProcess(FrigateProcess):
birdseye_config_subscriber.check_for_update() birdseye_config_subscriber.check_for_update()
) )
if update_topic is not None: if update_topic is not None and birdseye_config is not None:
previous_global_mode = self.config.birdseye.mode previous_global_mode = self.config.birdseye.mode
self.config.birdseye = birdseye_config self.config.birdseye = birdseye_config
@ -198,7 +201,10 @@ class OutputProcess(FrigateProcess):
birdseye, birdseye,
) )
(topic, data) = detection_subscriber.check_for_update(timeout=1) _result = detection_subscriber.check_for_update(timeout=1)
if _result is None:
continue
(topic, data) = _result
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5: if now - last_disabled_cam_check > 5:
@ -208,7 +214,7 @@ class OutputProcess(FrigateProcess):
self.config, birdseye, preview_recorders, preview_write_times self.config, birdseye, preview_recorders, preview_write_times
) )
if not topic: if not topic or data is None:
continue continue
( (
@ -262,11 +268,15 @@ class OutputProcess(FrigateProcess):
jsmpeg_cameras[camera].write_frame(frame.tobytes()) jsmpeg_cameras[camera].write_frame(frame.tobytes())
# send output data to birdseye if websocket is connected or restreaming # send output data to birdseye if websocket is connected or restreaming
if self.config.birdseye.enabled and ( if (
self.config.birdseye.restream self.config.birdseye.enabled
or any( and birdseye is not None
ws.environ["PATH_INFO"].endswith("birdseye") and (
for ws in websocket_server.manager self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager
)
) )
): ):
birdseye.write_data( birdseye.write_data(
@ -282,9 +292,12 @@ class OutputProcess(FrigateProcess):
move_preview_frames("clips") move_preview_frames("clips")
while True: while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0) _cleanup_result = detection_subscriber.check_for_update(timeout=0)
if _cleanup_result is None:
break
(topic, data) = _cleanup_result
if not topic: if not topic or data is None:
break break
( (
@ -322,7 +335,7 @@ class OutputProcess(FrigateProcess):
logger.info("exiting output process...") logger.info("exiting output process...")
def move_preview_frames(loc: str): def move_preview_frames(loc: str) -> None:
preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache") preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache")
preview_cache = os.path.join(CACHE_DIR, "preview_frames") preview_cache = os.path.join(CACHE_DIR, "preview_frames")

View File

@ -22,7 +22,6 @@ from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_encode, parse_preset_hardware_acceleration_encode,
) )
from frigate.models import Previews from frigate.models import Previews
from frigate.track.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -66,7 +65,9 @@ def get_cache_image_name(camera: str, frame_time: float) -> str:
) )
def get_most_recent_preview_frame(camera: str, before: float = None) -> str | None: def get_most_recent_preview_frame(
camera: str, before: float | None = None
) -> str | None:
"""Get the most recent preview frame for a camera.""" """Get the most recent preview frame for a camera."""
if not os.path.exists(PREVIEW_CACHE_DIR): if not os.path.exists(PREVIEW_CACHE_DIR):
return None return None
@ -147,12 +148,12 @@ class FFMpegConverter(threading.Thread):
if t_idx == item_count - 1: if t_idx == item_count - 1:
# last frame does not get a duration # last frame does not get a duration
playlist.append( playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type]
) )
continue continue
playlist.append( playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type]
) )
playlist.append( playlist.append(
f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}" f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}"
@ -199,30 +200,33 @@ class FFMpegConverter(threading.Thread):
# unlink files from cache # unlink files from cache
# don't delete last frame as it will be used as first frame in next segment # don't delete last frame as it will be used as first frame in next segment
for t in self.frame_times[0:-1]: for t in self.frame_times[0:-1]:
Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) # type: ignore[arg-type]
class PreviewRecorder: class PreviewRecorder:
def __init__(self, config: CameraConfig) -> None: def __init__(self, config: CameraConfig) -> None:
self.config = config self.config = config
self.start_time = 0 self.camera_name: str = config.name or ""
self.last_output_time = 0 self.start_time: float = 0
self.last_output_time: float = 0
self.offline = False self.offline = False
self.output_frames = [] self.output_frames: list[float] = []
if config.detect.width > config.detect.height: if config.detect.width is None or config.detect.height is None:
raise ValueError("Detect width and height must be set for previews.")
self.detect_width: int = config.detect.width
self.detect_height: int = config.detect.height
if self.detect_width > self.detect_height:
self.out_height = PREVIEW_HEIGHT self.out_height = PREVIEW_HEIGHT
self.out_width = ( self.out_width = (
int((config.detect.width / config.detect.height) * self.out_height) int((self.detect_width / self.detect_height) * self.out_height) // 4 * 4
// 4
* 4
) )
else: else:
self.out_width = PREVIEW_HEIGHT self.out_width = PREVIEW_HEIGHT
self.out_height = ( self.out_height = (
int((config.detect.height / config.detect.width) * self.out_width) int((self.detect_height / self.detect_width) * self.out_width) // 4 * 4
// 4
* 4
) )
# create communication for finished previews # create communication for finished previews
@ -302,7 +306,7 @@ class PreviewRecorder:
) )
self.start_time = frame_time self.start_time = frame_time
self.last_output_time = frame_time self.last_output_time = frame_time
self.output_frames: list[float] = [] self.output_frames = []
def should_write_frame( def should_write_frame(
self, self,
@ -342,7 +346,9 @@ class PreviewRecorder:
def write_frame_to_cache(self, frame_time: float, frame: np.ndarray) -> None: def write_frame_to_cache(self, frame_time: float, frame: np.ndarray) -> None:
# resize yuv frame # resize yuv frame
small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8) small_frame: np.ndarray = np.zeros(
(self.out_height * 3 // 2, self.out_width), np.uint8
)
copy_yuv_to_position( copy_yuv_to_position(
small_frame, small_frame,
(0, 0), (0, 0),
@ -356,7 +362,7 @@ class PreviewRecorder:
cv2.COLOR_YUV2BGR_I420, cv2.COLOR_YUV2BGR_I420,
) )
cv2.imwrite( cv2.imwrite(
get_cache_image_name(self.config.name, frame_time), get_cache_image_name(self.camera_name, frame_time),
small_frame, small_frame,
[ [
int(cv2.IMWRITE_WEBP_QUALITY), int(cv2.IMWRITE_WEBP_QUALITY),
@ -396,7 +402,7 @@ class PreviewRecorder:
).start() ).start()
else: else:
logger.debug( logger.debug(
f"Not saving preview for {self.config.name} because there are no saved frames." f"Not saving preview for {self.camera_name} because there are no saved frames."
) )
self.reset_frame_cache(frame_time) self.reset_frame_cache(frame_time)
@ -416,9 +422,7 @@ class PreviewRecorder:
if not self.offline: if not self.offline:
self.write_frame_to_cache( self.write_frame_to_cache(
frame_time, frame_time,
get_blank_yuv_frame( get_blank_yuv_frame(self.detect_width, self.detect_height),
self.config.detect.width, self.config.detect.height
),
) )
self.offline = True self.offline = True
@ -431,9 +435,9 @@ class PreviewRecorder:
return return
old_frame_path = get_cache_image_name( old_frame_path = get_cache_image_name(
self.config.name, self.output_frames[-1] self.camera_name, self.output_frames[-1]
) )
new_frame_path = get_cache_image_name(self.config.name, frame_time) new_frame_path = get_cache_image_name(self.camera_name, frame_time)
shutil.copy(old_frame_path, new_frame_path) shutil.copy(old_frame_path, new_frame_path)
# save last frame to ensure consistent duration # save last frame to ensure consistent duration
@ -447,13 +451,12 @@ class PreviewRecorder:
self.reset_frame_cache(frame_time) self.reset_frame_cache(frame_time)
def stop(self) -> None: def stop(self) -> None:
self.config_subscriber.stop()
self.requestor.stop() self.requestor.stop()
def get_active_objects( def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject] frame_time: float, camera_config: CameraConfig, all_objects: list[dict[str, Any]]
) -> list[TrackedObject]: ) -> list[dict[str, Any]]:
"""get active objects for detection.""" """get active objects for detection."""
return [ return [
o o