Update output mypy

This commit is contained in:
Nicolas Mowen 2026-03-25 12:19:16 -06:00
parent f1c8c69448
commit 6a1bc18bc8
6 changed files with 131 additions and 95 deletions

View File

@ -53,7 +53,7 @@ ignore_errors = false
[mypy-frigate.object_detection.*] [mypy-frigate.object_detection.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.output] [mypy-frigate.output.*]
ignore_errors = false ignore_errors = false
[mypy-frigate.ptz] [mypy-frigate.ptz]

View File

@ -46,7 +46,9 @@ class ResponseStore:
""" """
def __init__(self) -> None: def __init__(self) -> None:
self.responses: dict[int, ndarray] = {} # Maps request_id -> (original_input, infer_results) self.responses: dict[
int, ndarray
] = {} # Maps request_id -> (original_input, infer_results)
self.lock = threading.Lock() self.lock = threading.Lock()
self.cond = threading.Condition(self.lock) self.cond = threading.Condition(self.lock)
@ -65,7 +67,9 @@ class ResponseStore:
return self.responses.pop(request_id) return self.responses.pop(request_id)
def tensor_transform(desired_shape: InputTensorEnum) -> tuple[int, int, int, int] | None: def tensor_transform(
desired_shape: InputTensorEnum,
) -> tuple[int, int, int, int] | None:
# Currently this function only supports BHWC permutations # Currently this function only supports BHWC permutations
if desired_shape == InputTensorEnum.nhwc: if desired_shape == InputTensorEnum.nhwc:
return None return None

View File

@ -11,6 +11,7 @@ import subprocess as sp
import threading import threading
import time import time
import traceback import traceback
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional from typing import Any, Optional
import cv2 import cv2
@ -74,25 +75,25 @@ class Canvas:
self, self,
canvas_width: int, canvas_width: int,
canvas_height: int, canvas_height: int,
scaling_factor: int, scaling_factor: float,
) -> None: ) -> None:
self.scaling_factor = scaling_factor self.scaling_factor = scaling_factor
gcd = math.gcd(canvas_width, canvas_height) gcd = math.gcd(canvas_width, canvas_height)
self.aspect = get_standard_aspect_ratio( self.aspect = get_standard_aspect_ratio(
(canvas_width / gcd), (canvas_height / gcd) int(canvas_width / gcd), int(canvas_height / gcd)
) )
self.width = canvas_width self.width = canvas_width
self.height = (self.width * self.aspect[1]) / self.aspect[0] self.height: float = (self.width * self.aspect[1]) / self.aspect[0]
self.coefficient_cache: dict[int, int] = {} self.coefficient_cache: dict[int, float] = {}
self.aspect_cache: dict[str, tuple[int, int]] = {} self.aspect_cache: dict[str, tuple[int, int]] = {}
def get_aspect(self, coefficient: int) -> tuple[int, int]: def get_aspect(self, coefficient: float) -> tuple[float, float]:
return (self.aspect[0] * coefficient, self.aspect[1] * coefficient) return (self.aspect[0] * coefficient, self.aspect[1] * coefficient)
def get_coefficient(self, camera_count: int) -> int: def get_coefficient(self, camera_count: int) -> float:
return self.coefficient_cache.get(camera_count, self.scaling_factor) return self.coefficient_cache.get(camera_count, self.scaling_factor)
def set_coefficient(self, camera_count: int, coefficient: int) -> None: def set_coefficient(self, camera_count: int, coefficient: float) -> None:
self.coefficient_cache[camera_count] = coefficient self.coefficient_cache[camera_count] = coefficient
def get_camera_aspect( def get_camera_aspect(
@ -105,7 +106,7 @@ class Canvas:
gcd = math.gcd(camera_width, camera_height) gcd = math.gcd(camera_width, camera_height)
camera_aspect = get_standard_aspect_ratio( camera_aspect = get_standard_aspect_ratio(
camera_width / gcd, camera_height / gcd int(camera_width / gcd), int(camera_height / gcd)
) )
self.aspect_cache[cam_name] = camera_aspect self.aspect_cache[cam_name] = camera_aspect
return camera_aspect return camera_aspect
@ -116,7 +117,7 @@ class FFMpegConverter(threading.Thread):
self, self,
ffmpeg: FfmpegConfig, ffmpeg: FfmpegConfig,
input_queue: queue.Queue, input_queue: queue.Queue,
stop_event: mp.Event, stop_event: MpEvent,
in_width: int, in_width: int,
in_height: int, in_height: int,
out_width: int, out_width: int,
@ -128,7 +129,7 @@ class FFMpegConverter(threading.Thread):
self.camera = "birdseye" self.camera = "birdseye"
self.input_queue = input_queue self.input_queue = input_queue
self.stop_event = stop_event self.stop_event = stop_event
self.bd_pipe = None self.bd_pipe: int | None = None
if birdseye_rtsp: if birdseye_rtsp:
self.recreate_birdseye_pipe() self.recreate_birdseye_pipe()
@ -181,7 +182,8 @@ class FFMpegConverter(threading.Thread):
os.close(stdin) os.close(stdin)
self.reading_birdseye = False self.reading_birdseye = False
def __write(self, b) -> None: def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b) self.process.stdin.write(b)
if self.bd_pipe: if self.bd_pipe:
@ -200,13 +202,13 @@ class FFMpegConverter(threading.Thread):
return return
def read(self, length): def read(self, length: int) -> Any:
try: try:
return self.process.stdout.read1(length) return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError: except ValueError:
return False return False
def exit(self): def exit(self) -> None:
if self.bd_pipe: if self.bd_pipe:
os.close(self.bd_pipe) os.close(self.bd_pipe)
@ -233,8 +235,8 @@ class BroadcastThread(threading.Thread):
self, self,
camera: str, camera: str,
converter: FFMpegConverter, converter: FFMpegConverter,
websocket_server, websocket_server: Any,
stop_event: mp.Event, stop_event: MpEvent,
): ):
super().__init__() super().__init__()
self.camera = camera self.camera = camera
@ -242,7 +244,7 @@ class BroadcastThread(threading.Thread):
self.websocket_server = websocket_server self.websocket_server = websocket_server
self.stop_event = stop_event self.stop_event = stop_event
def run(self): def run(self) -> None:
while not self.stop_event.is_set(): while not self.stop_event.is_set():
buf = self.converter.read(65536) buf = self.converter.read(65536)
if buf: if buf:
@ -270,16 +272,16 @@ class BirdsEyeFrameManager:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
stop_event: mp.Event, stop_event: MpEvent,
): ):
self.config = config self.config = config
width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height) width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height)
self.frame_shape = (height, width) self.frame_shape = (height, width)
self.yuv_shape = (height * 3 // 2, width) self.yuv_shape = (height * 3 // 2, width)
self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8) self.frame: np.ndarray = np.ndarray(self.yuv_shape, dtype=np.uint8)
self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor) self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor)
self.stop_event = stop_event self.stop_event = stop_event
self.last_refresh_time = 0 self.last_refresh_time: float = 0
# initialize the frame as black and with the Frigate logo # initialize the frame as black and with the Frigate logo
self.blank_frame = np.zeros(self.yuv_shape, np.uint8) self.blank_frame = np.zeros(self.yuv_shape, np.uint8)
@ -323,15 +325,15 @@ class BirdsEyeFrameManager:
self.frame[:] = self.blank_frame self.frame[:] = self.blank_frame
self.cameras = {} self.cameras: dict[str, Any] = {}
for camera in self.config.cameras.keys(): for camera in self.config.cameras.keys():
self.add_camera(camera) self.add_camera(camera)
self.camera_layout = [] self.camera_layout: list[Any] = []
self.active_cameras = set() self.active_cameras: set[str] = set()
self.last_output_time = 0.0 self.last_output_time = 0.0
def add_camera(self, cam: str): def add_camera(self, cam: str) -> None:
"""Add a camera to self.cameras with the correct structure.""" """Add a camera to self.cameras with the correct structure."""
settings = self.config.cameras[cam] settings = self.config.cameras[cam]
# precalculate the coordinates for all the channels # precalculate the coordinates for all the channels
@ -361,16 +363,21 @@ class BirdsEyeFrameManager:
}, },
} }
def remove_camera(self, cam: str): def remove_camera(self, cam: str) -> None:
"""Remove a camera from self.cameras.""" """Remove a camera from self.cameras."""
if cam in self.cameras: if cam in self.cameras:
del self.cameras[cam] del self.cameras[cam]
def clear_frame(self): def clear_frame(self) -> None:
logger.debug("Clearing the birdseye frame") logger.debug("Clearing the birdseye frame")
self.frame[:] = self.blank_frame self.frame[:] = self.blank_frame
def copy_to_position(self, position, camera=None, frame: np.ndarray = None): def copy_to_position(
self,
position: Any,
camera: Optional[str] = None,
frame: Optional[np.ndarray] = None,
) -> None:
if camera is None: if camera is None:
frame = None frame = None
channel_dims = None channel_dims = None
@ -389,7 +396,9 @@ class BirdsEyeFrameManager:
channel_dims, channel_dims,
) )
def camera_active(self, mode, object_box_count, motion_box_count): def camera_active(
self, mode: Any, object_box_count: int, motion_box_count: int
) -> bool:
if mode == BirdseyeModeEnum.continuous: if mode == BirdseyeModeEnum.continuous:
return True return True
@ -399,6 +408,8 @@ class BirdsEyeFrameManager:
if mode == BirdseyeModeEnum.objects and object_box_count > 0: if mode == BirdseyeModeEnum.objects and object_box_count > 0:
return True return True
return False
def get_camera_coordinates(self) -> dict[str, dict[str, int]]: def get_camera_coordinates(self) -> dict[str, dict[str, int]]:
"""Return the coordinates of each camera in the current layout.""" """Return the coordinates of each camera in the current layout."""
coordinates = {} coordinates = {}
@ -451,7 +462,7 @@ class BirdsEyeFrameManager:
- self.cameras[active_camera]["last_active_frame"] - self.cameras[active_camera]["last_active_frame"]
), ),
) )
active_cameras = limited_active_cameras[:max_cameras] active_cameras = set(limited_active_cameras[:max_cameras])
max_camera_refresh = True max_camera_refresh = True
self.last_refresh_time = now self.last_refresh_time = now
@ -510,7 +521,7 @@ class BirdsEyeFrameManager:
# center camera view in canvas and ensure that it fits # center camera view in canvas and ensure that it fits
if scaled_width < self.canvas.width: if scaled_width < self.canvas.width:
coefficient = 1 coefficient: float = 1
x_offset = int((self.canvas.width - scaled_width) / 2) x_offset = int((self.canvas.width - scaled_width) / 2)
else: else:
coefficient = self.canvas.width / scaled_width coefficient = self.canvas.width / scaled_width
@ -557,7 +568,7 @@ class BirdsEyeFrameManager:
calculating = False calculating = False
self.canvas.set_coefficient(len(active_cameras), coefficient) self.canvas.set_coefficient(len(active_cameras), coefficient)
self.camera_layout = layout_candidate self.camera_layout = layout_candidate or []
frame_changed = True frame_changed = True
# Draw the layout # Draw the layout
@ -577,10 +588,12 @@ class BirdsEyeFrameManager:
self, self,
cameras_to_add: list[str], cameras_to_add: list[str],
coefficient: float, coefficient: float,
) -> tuple[Any]: ) -> Optional[list[list[Any]]]:
"""Calculate the optimal layout for 2+ cameras.""" """Calculate the optimal layout for 2+ cameras."""
def map_layout(camera_layout: list[list[Any]], row_height: int): def map_layout(
camera_layout: list[list[Any]], row_height: int
) -> tuple[int, int, Optional[list[list[Any]]]]:
"""Map the calculated layout.""" """Map the calculated layout."""
candidate_layout = [] candidate_layout = []
starting_x = 0 starting_x = 0
@ -777,11 +790,11 @@ class Birdseye:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
stop_event: mp.Event, stop_event: MpEvent,
websocket_server, websocket_server: Any,
) -> None: ) -> None:
self.config = config self.config = config
self.input = queue.Queue(maxsize=10) self.input: queue.Queue[bytes] = queue.Queue(maxsize=10)
self.converter = FFMpegConverter( self.converter = FFMpegConverter(
config.ffmpeg, config.ffmpeg,
self.input, self.input,
@ -806,7 +819,7 @@ class Birdseye:
) )
if config.birdseye.restream: if config.birdseye.restream:
self.birdseye_buffer = self.frame_manager.create( self.birdseye_buffer: Any = self.frame_manager.create(
"birdseye", "birdseye",
self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1], self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1],
) )

View File

@ -5,6 +5,8 @@ import multiprocessing as mp
import queue import queue
import subprocess as sp import subprocess as sp
import threading import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import CameraConfig, FfmpegConfig from frigate.config import CameraConfig, FfmpegConfig
@ -17,7 +19,7 @@ class FFMpegConverter(threading.Thread):
camera: str, camera: str,
ffmpeg: FfmpegConfig, ffmpeg: FfmpegConfig,
input_queue: queue.Queue, input_queue: queue.Queue,
stop_event: mp.Event, stop_event: MpEvent,
in_width: int, in_width: int,
in_height: int, in_height: int,
out_width: int, out_width: int,
@ -64,16 +66,17 @@ class FFMpegConverter(threading.Thread):
start_new_session=True, start_new_session=True,
) )
def __write(self, b) -> None: def __write(self, b: bytes) -> None:
assert self.process.stdin is not None
self.process.stdin.write(b) self.process.stdin.write(b)
def read(self, length): def read(self, length: int) -> Any:
try: try:
return self.process.stdout.read1(length) return self.process.stdout.read1(length) # type: ignore[union-attr]
except ValueError: except ValueError:
return False return False
def exit(self): def exit(self) -> None:
self.process.terminate() self.process.terminate()
try: try:
@ -98,8 +101,8 @@ class BroadcastThread(threading.Thread):
self, self,
camera: str, camera: str,
converter: FFMpegConverter, converter: FFMpegConverter,
websocket_server, websocket_server: Any,
stop_event: mp.Event, stop_event: MpEvent,
): ):
super().__init__() super().__init__()
self.camera = camera self.camera = camera
@ -107,7 +110,7 @@ class BroadcastThread(threading.Thread):
self.websocket_server = websocket_server self.websocket_server = websocket_server
self.stop_event = stop_event self.stop_event = stop_event
def run(self): def run(self) -> None:
while not self.stop_event.is_set(): while not self.stop_event.is_set():
buf = self.converter.read(65536) buf = self.converter.read(65536)
if buf: if buf:
@ -133,15 +136,15 @@ class BroadcastThread(threading.Thread):
class JsmpegCamera: class JsmpegCamera:
def __init__( def __init__(
self, config: CameraConfig, stop_event: mp.Event, websocket_server self, config: CameraConfig, stop_event: MpEvent, websocket_server: Any
) -> None: ) -> None:
self.config = config self.config = config
self.input = queue.Queue(maxsize=config.detect.fps) self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps)
width = int( width = int(
config.live.height * (config.frame_shape[1] / config.frame_shape[0]) config.live.height * (config.frame_shape[1] / config.frame_shape[0])
) )
self.converter = FFMpegConverter( self.converter = FFMpegConverter(
config.name, config.name or "",
config.ffmpeg, config.ffmpeg,
self.input, self.input,
stop_event, stop_event,
@ -152,13 +155,13 @@ class JsmpegCamera:
config.live.quality, config.live.quality,
) )
self.broadcaster = BroadcastThread( self.broadcaster = BroadcastThread(
config.name, self.converter, websocket_server, stop_event config.name or "", self.converter, websocket_server, stop_event
) )
self.converter.start() self.converter.start()
self.broadcaster.start() self.broadcaster.start()
def write_frame(self, frame_bytes) -> None: def write_frame(self, frame_bytes: bytes) -> None:
try: try:
self.input.put_nowait(frame_bytes) self.input.put_nowait(frame_bytes)
except queue.Full: except queue.Full:

View File

@ -61,6 +61,12 @@ def check_disabled_camera_update(
# last camera update was more than 1 second ago # last camera update was more than 1 second ago
# need to send empty data to birdseye because current # need to send empty data to birdseye because current
# frame is now out of date # frame is now out of date
cam_width = config.cameras[camera].detect.width
cam_height = config.cameras[camera].detect.height
if cam_width is None or cam_height is None:
raise ValueError(f"Camera {camera} detect dimensions not configured")
if birdseye and offline_time < 10: if birdseye and offline_time < 10:
# we only need to send blank frames to birdseye at the beginning of a camera being offline # we only need to send blank frames to birdseye at the beginning of a camera being offline
birdseye.write_data( birdseye.write_data(
@ -68,10 +74,7 @@ def check_disabled_camera_update(
[], [],
[], [],
now, now,
get_blank_yuv_frame( get_blank_yuv_frame(cam_width, cam_height),
config.cameras[camera].detect.width,
config.cameras[camera].detect.height,
),
) )
if not has_enabled_camera and birdseye: if not has_enabled_camera and birdseye:
@ -173,7 +176,7 @@ class OutputProcess(FrigateProcess):
birdseye_config_subscriber.check_for_update() birdseye_config_subscriber.check_for_update()
) )
if update_topic is not None: if update_topic is not None and birdseye_config is not None:
previous_global_mode = self.config.birdseye.mode previous_global_mode = self.config.birdseye.mode
self.config.birdseye = birdseye_config self.config.birdseye = birdseye_config
@ -198,7 +201,10 @@ class OutputProcess(FrigateProcess):
birdseye, birdseye,
) )
(topic, data) = detection_subscriber.check_for_update(timeout=1) _result = detection_subscriber.check_for_update(timeout=1)
if _result is None:
continue
(topic, data) = _result
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
if now - last_disabled_cam_check > 5: if now - last_disabled_cam_check > 5:
@ -208,7 +214,7 @@ class OutputProcess(FrigateProcess):
self.config, birdseye, preview_recorders, preview_write_times self.config, birdseye, preview_recorders, preview_write_times
) )
if not topic: if not topic or data is None:
continue continue
( (
@ -262,12 +268,16 @@ class OutputProcess(FrigateProcess):
jsmpeg_cameras[camera].write_frame(frame.tobytes()) jsmpeg_cameras[camera].write_frame(frame.tobytes())
# send output data to birdseye if websocket is connected or restreaming # send output data to birdseye if websocket is connected or restreaming
if self.config.birdseye.enabled and ( if (
self.config.birdseye.enabled
and birdseye is not None
and (
self.config.birdseye.restream self.config.birdseye.restream
or any( or any(
ws.environ["PATH_INFO"].endswith("birdseye") ws.environ["PATH_INFO"].endswith("birdseye")
for ws in websocket_server.manager for ws in websocket_server.manager
) )
)
): ):
birdseye.write_data( birdseye.write_data(
camera, camera,
@ -282,9 +292,12 @@ class OutputProcess(FrigateProcess):
move_preview_frames("clips") move_preview_frames("clips")
while True: while True:
(topic, data) = detection_subscriber.check_for_update(timeout=0) _cleanup_result = detection_subscriber.check_for_update(timeout=0)
if _cleanup_result is None:
break
(topic, data) = _cleanup_result
if not topic: if not topic or data is None:
break break
( (
@ -322,7 +335,7 @@ class OutputProcess(FrigateProcess):
logger.info("exiting output process...") logger.info("exiting output process...")
def move_preview_frames(loc: str): def move_preview_frames(loc: str) -> None:
preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache") preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache")
preview_cache = os.path.join(CACHE_DIR, "preview_frames") preview_cache = os.path.join(CACHE_DIR, "preview_frames")

View File

@ -22,7 +22,6 @@ from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_encode, parse_preset_hardware_acceleration_encode,
) )
from frigate.models import Previews from frigate.models import Previews
from frigate.track.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -66,7 +65,9 @@ def get_cache_image_name(camera: str, frame_time: float) -> str:
) )
def get_most_recent_preview_frame(camera: str, before: float = None) -> str | None: def get_most_recent_preview_frame(
camera: str, before: float | None = None
) -> str | None:
"""Get the most recent preview frame for a camera.""" """Get the most recent preview frame for a camera."""
if not os.path.exists(PREVIEW_CACHE_DIR): if not os.path.exists(PREVIEW_CACHE_DIR):
return None return None
@ -147,12 +148,12 @@ class FFMpegConverter(threading.Thread):
if t_idx == item_count - 1: if t_idx == item_count - 1:
# last frame does not get a duration # last frame does not get a duration
playlist.append( playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type]
) )
continue continue
playlist.append( playlist.append(
f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type]
) )
playlist.append( playlist.append(
f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}" f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}"
@ -199,30 +200,33 @@ class FFMpegConverter(threading.Thread):
# unlink files from cache # unlink files from cache
# don't delete last frame as it will be used as first frame in next segment # don't delete last frame as it will be used as first frame in next segment
for t in self.frame_times[0:-1]: for t in self.frame_times[0:-1]:
Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) # type: ignore[arg-type]
class PreviewRecorder: class PreviewRecorder:
def __init__(self, config: CameraConfig) -> None: def __init__(self, config: CameraConfig) -> None:
self.config = config self.config = config
self.start_time = 0 self.camera_name: str = config.name or ""
self.last_output_time = 0 self.start_time: float = 0
self.last_output_time: float = 0
self.offline = False self.offline = False
self.output_frames = [] self.output_frames: list[float] = []
if config.detect.width > config.detect.height: if config.detect.width is None or config.detect.height is None:
raise ValueError("Detect width and height must be set for previews.")
self.detect_width: int = config.detect.width
self.detect_height: int = config.detect.height
if self.detect_width > self.detect_height:
self.out_height = PREVIEW_HEIGHT self.out_height = PREVIEW_HEIGHT
self.out_width = ( self.out_width = (
int((config.detect.width / config.detect.height) * self.out_height) int((self.detect_width / self.detect_height) * self.out_height) // 4 * 4
// 4
* 4
) )
else: else:
self.out_width = PREVIEW_HEIGHT self.out_width = PREVIEW_HEIGHT
self.out_height = ( self.out_height = (
int((config.detect.height / config.detect.width) * self.out_width) int((self.detect_height / self.detect_width) * self.out_width) // 4 * 4
// 4
* 4
) )
# create communication for finished previews # create communication for finished previews
@ -302,7 +306,7 @@ class PreviewRecorder:
) )
self.start_time = frame_time self.start_time = frame_time
self.last_output_time = frame_time self.last_output_time = frame_time
self.output_frames: list[float] = [] self.output_frames = []
def should_write_frame( def should_write_frame(
self, self,
@ -342,7 +346,9 @@ class PreviewRecorder:
def write_frame_to_cache(self, frame_time: float, frame: np.ndarray) -> None: def write_frame_to_cache(self, frame_time: float, frame: np.ndarray) -> None:
# resize yuv frame # resize yuv frame
small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8) small_frame: np.ndarray = np.zeros(
(self.out_height * 3 // 2, self.out_width), np.uint8
)
copy_yuv_to_position( copy_yuv_to_position(
small_frame, small_frame,
(0, 0), (0, 0),
@ -356,7 +362,7 @@ class PreviewRecorder:
cv2.COLOR_YUV2BGR_I420, cv2.COLOR_YUV2BGR_I420,
) )
cv2.imwrite( cv2.imwrite(
get_cache_image_name(self.config.name, frame_time), get_cache_image_name(self.camera_name, frame_time),
small_frame, small_frame,
[ [
int(cv2.IMWRITE_WEBP_QUALITY), int(cv2.IMWRITE_WEBP_QUALITY),
@ -396,7 +402,7 @@ class PreviewRecorder:
).start() ).start()
else: else:
logger.debug( logger.debug(
f"Not saving preview for {self.config.name} because there are no saved frames." f"Not saving preview for {self.camera_name} because there are no saved frames."
) )
self.reset_frame_cache(frame_time) self.reset_frame_cache(frame_time)
@ -416,9 +422,7 @@ class PreviewRecorder:
if not self.offline: if not self.offline:
self.write_frame_to_cache( self.write_frame_to_cache(
frame_time, frame_time,
get_blank_yuv_frame( get_blank_yuv_frame(self.detect_width, self.detect_height),
self.config.detect.width, self.config.detect.height
),
) )
self.offline = True self.offline = True
@ -431,9 +435,9 @@ class PreviewRecorder:
return return
old_frame_path = get_cache_image_name( old_frame_path = get_cache_image_name(
self.config.name, self.output_frames[-1] self.camera_name, self.output_frames[-1]
) )
new_frame_path = get_cache_image_name(self.config.name, frame_time) new_frame_path = get_cache_image_name(self.camera_name, frame_time)
shutil.copy(old_frame_path, new_frame_path) shutil.copy(old_frame_path, new_frame_path)
# save last frame to ensure consistent duration # save last frame to ensure consistent duration
@ -447,13 +451,12 @@ class PreviewRecorder:
self.reset_frame_cache(frame_time) self.reset_frame_cache(frame_time)
def stop(self) -> None: def stop(self) -> None:
self.config_subscriber.stop()
self.requestor.stop() self.requestor.stop()
def get_active_objects( def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject] frame_time: float, camera_config: CameraConfig, all_objects: list[dict[str, Any]]
) -> list[TrackedObject]: ) -> list[dict[str, Any]]:
"""get active objects for detection.""" """get active objects for detection."""
return [ return [
o o