diff --git a/frigate/detectors/plugins/memryx.py b/frigate/detectors/plugins/memryx.py index e0ad401cb..2c03d14a4 100644 --- a/frigate/detectors/plugins/memryx.py +++ b/frigate/detectors/plugins/memryx.py @@ -317,7 +317,7 @@ class MemryXDetector(DetectionApi): f"Failed to remove downloaded zip {zip_path}: {e}" ) - def send_input(self, connection_id, tensor_input: np.ndarray): + def send_input(self, connection_id, tensor_input: np.ndarray) -> None: """Pre-process (if needed) and send frame to MemryX input queue""" if tensor_input is None: raise ValueError("[send_input] No image data provided for inference") diff --git a/frigate/motion/frigate_motion.py b/frigate/motion/frigate_motion.py index d49b0e861..8a067e1da 100644 --- a/frigate/motion/frigate_motion.py +++ b/frigate/motion/frigate_motion.py @@ -1,7 +1,9 @@ +from typing import Any + import cv2 import numpy as np -from frigate.config import MotionConfig +from frigate.config.config import RuntimeMotionConfig from frigate.motion import MotionDetector from frigate.util.image import grab_cv2_contours @@ -9,19 +11,20 @@ from frigate.util.image import grab_cv2_contours class FrigateMotionDetector(MotionDetector): def __init__( self, - frame_shape, - config: MotionConfig, + frame_shape: tuple[int, ...], + config: RuntimeMotionConfig, fps: int, - improve_contrast, - threshold, - contour_area, - ): + improve_contrast: Any, + threshold: Any, + contour_area: Any, + ) -> None: self.config = config self.frame_shape = frame_shape - self.resize_factor = frame_shape[0] / config.frame_height + frame_height = config.frame_height or frame_shape[0] + self.resize_factor = frame_shape[0] / frame_height self.motion_frame_size = ( - config.frame_height, - config.frame_height * frame_shape[1] // frame_shape[0], + frame_height, + frame_height * frame_shape[1] // frame_shape[0], ) self.avg_frame = np.zeros(self.motion_frame_size, np.float32) self.avg_delta = np.zeros(self.motion_frame_size, np.float32) @@ -38,10 +41,10 @@ class FrigateMotionDetector(MotionDetector): self.threshold = threshold self.contour_area = contour_area - def is_calibrating(self): + def is_calibrating(self) -> bool: return False - def detect(self, frame): + def detect(self, frame: np.ndarray) -> list: motion_boxes = [] gray = frame[0 : self.frame_shape[0], 0 : self.frame_shape[1]] @@ -99,7 +102,7 @@ class FrigateMotionDetector(MotionDetector): # dilate the thresholded image to fill in holes, then find contours # on thresholded image - thresh_dilated = cv2.dilate(thresh, None, iterations=2) + thresh_dilated = cv2.dilate(thresh, None, iterations=2) # type: ignore[call-overload] contours = cv2.findContours( thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) diff --git a/frigate/motion/improved_motion.py b/frigate/motion/improved_motion.py index b821e9532..6694dafff 100644 --- a/frigate/motion/improved_motion.py +++ b/frigate/motion/improved_motion.py @@ -1,11 +1,12 @@ import logging +from typing import Optional import cv2 import numpy as np from scipy.ndimage import gaussian_filter from frigate.camera import PTZMetrics -from frigate.config import MotionConfig +from frigate.config.config import RuntimeMotionConfig from frigate.motion import MotionDetector from frigate.util.image import grab_cv2_contours @@ -15,22 +16,23 @@ logger = logging.getLogger(__name__) class ImprovedMotionDetector(MotionDetector): def __init__( self, - frame_shape, - config: MotionConfig, + frame_shape: tuple[int, ...], + config: RuntimeMotionConfig, fps: int, - ptz_metrics: PTZMetrics = None, - name="improved", - blur_radius=1, - interpolation=cv2.INTER_NEAREST, - contrast_frame_history=50, - ): + ptz_metrics: Optional[PTZMetrics] = None, + name: str = "improved", + blur_radius: int = 1, + interpolation: int = cv2.INTER_NEAREST, + contrast_frame_history: int = 50, + ) -> None: self.name = name self.config = config self.frame_shape = frame_shape - self.resize_factor = frame_shape[0] / config.frame_height + frame_height = config.frame_height or frame_shape[0] + self.resize_factor = frame_shape[0] / frame_height self.motion_frame_size = ( - config.frame_height, - config.frame_height * frame_shape[1] // frame_shape[0], + frame_height, + frame_height * frame_shape[1] // frame_shape[0], ) self.avg_frame = np.zeros(self.motion_frame_size, np.float32) self.motion_frame_count = 0 @@ -44,20 +46,20 @@ class ImprovedMotionDetector(MotionDetector): self.contrast_values[:, 1:2] = 255 self.contrast_values_index = 0 self.ptz_metrics = ptz_metrics - self.last_stop_time = None + self.last_stop_time: float | None = None - def is_calibrating(self): + def is_calibrating(self) -> bool: return self.calibrating - def detect(self, frame): - motion_boxes = [] + def detect(self, frame: np.ndarray) -> list[tuple[int, int, int, int]]: + motion_boxes: list[tuple[int, int, int, int]] = [] if not self.config.enabled: return motion_boxes # if ptz motor is moving from autotracking, quickly return # a single box that is 80% of the frame - if ( + if self.ptz_metrics is not None and ( self.ptz_metrics.autotracker_enabled.value and not self.ptz_metrics.motor_stopped.is_set() ): @@ -130,19 +132,19 @@ class ImprovedMotionDetector(MotionDetector): # dilate the thresholded image to fill in holes, then find contours # on thresholded image - thresh_dilated = cv2.dilate(thresh, None, iterations=1) + thresh_dilated = cv2.dilate(thresh, None, iterations=1) # type: ignore[call-overload] contours = cv2.findContours( thresh_dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) contours = grab_cv2_contours(contours) # loop over the contours - total_contour_area = 0 + total_contour_area: float = 0 for c in contours: # if the contour is big enough, count it as motion contour_area = cv2.contourArea(c) total_contour_area += contour_area - if contour_area > self.config.contour_area: + if contour_area > (self.config.contour_area or 0): x, y, w, h = cv2.boundingRect(c) motion_boxes.append( ( @@ -159,7 +161,7 @@ class ImprovedMotionDetector(MotionDetector): # check if the motor has just stopped from autotracking # if so, reassign the average to the current frame so we begin with a new baseline - if ( + if self.ptz_metrics is not None and ( # ensure we only do this for cameras with autotracking enabled self.ptz_metrics.autotracker_enabled.value and self.ptz_metrics.motor_stopped.is_set() diff --git a/frigate/mypy.ini b/frigate/mypy.ini index e1da675be..9c44e9f38 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -47,13 +47,13 @@ ignore_errors = false [mypy-frigate.jobs.*] ignore_errors = false -[mypy-frigate.motion] +[mypy-frigate.motion.*] ignore_errors = false -[mypy-frigate.object_detection] +[mypy-frigate.object_detection.*] ignore_errors = false -[mypy-frigate.output] +[mypy-frigate.output.*] ignore_errors = false [mypy-frigate.ptz] diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index d2a54afbc..a62fe4843 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -7,6 +7,7 @@ from abc import ABC, abstractmethod from collections import deque from multiprocessing import Queue, Value from multiprocessing.synchronize import Event as MpEvent +from typing import Any, Optional import numpy as np import zmq @@ -34,26 +35,25 @@ logger = logging.getLogger(__name__) class ObjectDetector(ABC): @abstractmethod - def detect(self, tensor_input, threshold: float = 0.4): + def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list: pass class BaseLocalDetector(ObjectDetector): def __init__( self, - detector_config: BaseDetectorConfig = None, - labels: str = None, - stop_event: MpEvent = None, - ): + detector_config: Optional[BaseDetectorConfig] = None, + labels: Optional[str] = None, + stop_event: Optional[MpEvent] = None, + ) -> None: self.fps = EventsPerSecond() if labels is None: - self.labels = {} + self.labels: dict[int, str] = {} else: self.labels = load_labels(labels) - if detector_config: + if detector_config and detector_config.model: self.input_transform = tensor_transform(detector_config.model.input_tensor) - self.dtype = detector_config.model.input_dtype else: self.input_transform = None @@ -77,10 +77,10 @@ class BaseLocalDetector(ObjectDetector): return tensor_input - def detect(self, tensor_input: np.ndarray, threshold=0.4): + def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list: detections = [] - raw_detections = self.detect_raw(tensor_input) + raw_detections = self.detect_raw(tensor_input) # type: ignore[attr-defined] for d in raw_detections: if int(d[0]) < 0 or int(d[0]) >= len(self.labels): @@ -96,28 +96,28 @@ class BaseLocalDetector(ObjectDetector): class LocalObjectDetector(BaseLocalDetector): - def detect_raw(self, tensor_input: np.ndarray): + def detect_raw(self, tensor_input: np.ndarray) -> np.ndarray: tensor_input = self._transform_input(tensor_input) - return self.detect_api.detect_raw(tensor_input=tensor_input) + return self.detect_api.detect_raw(tensor_input=tensor_input) # type: ignore[no-any-return] class AsyncLocalObjectDetector(BaseLocalDetector): - def async_send_input(self, tensor_input: np.ndarray, connection_id: str): + def async_send_input(self, tensor_input: np.ndarray, connection_id: str) -> None: tensor_input = self._transform_input(tensor_input) - return self.detect_api.send_input(connection_id, tensor_input) + self.detect_api.send_input(connection_id, tensor_input) - def async_receive_output(self): + def async_receive_output(self) -> Any: return self.detect_api.receive_output() class DetectorRunner(FrigateProcess): def __init__( self, - name, + name: str, detection_queue: Queue, cameras: list[str], - avg_speed: Value, - start_time: Value, + avg_speed: Any, + start_time: Any, config: FrigateConfig, detector_config: BaseDetectorConfig, stop_event: MpEvent, @@ -129,11 +129,11 @@ class DetectorRunner(FrigateProcess): self.start_time = start_time self.config = config self.detector_config = detector_config - self.outputs: dict = {} + self.outputs: dict[str, Any] = {} - def create_output_shm(self, name: str): + def create_output_shm(self, name: str) -> None: out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) - out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) + out_np: np.ndarray = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) self.outputs[name] = {"shm": out_shm, "np": out_np} def run(self) -> None: @@ -155,8 +155,8 @@ class DetectorRunner(FrigateProcess): connection_id, ( 1, - self.detector_config.model.height, - self.detector_config.model.width, + self.detector_config.model.height, # type: ignore[union-attr] + self.detector_config.model.width, # type: ignore[union-attr] 3, ), ) @@ -187,11 +187,11 @@ class DetectorRunner(FrigateProcess): class AsyncDetectorRunner(FrigateProcess): def __init__( self, - name, + name: str, detection_queue: Queue, cameras: list[str], - avg_speed: Value, - start_time: Value, + avg_speed: Any, + start_time: Any, config: FrigateConfig, detector_config: BaseDetectorConfig, stop_event: MpEvent, @@ -203,15 +203,15 @@ class AsyncDetectorRunner(FrigateProcess): self.start_time = start_time self.config = config self.detector_config = detector_config - self.outputs: dict = {} + self.outputs: dict[str, Any] = {} self._frame_manager: SharedMemoryFrameManager | None = None self._publisher: ObjectDetectorPublisher | None = None self._detector: AsyncLocalObjectDetector | None = None - self.send_times = deque() + self.send_times: deque[float] = deque() - def create_output_shm(self, name: str): + def create_output_shm(self, name: str) -> None: out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) - out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) + out_np: np.ndarray = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) self.outputs[name] = {"shm": out_shm, "np": out_np} def _detect_worker(self) -> None: @@ -222,12 +222,13 @@ class AsyncDetectorRunner(FrigateProcess): except queue.Empty: continue + assert self._frame_manager is not None input_frame = self._frame_manager.get( connection_id, ( 1, - self.detector_config.model.height, - self.detector_config.model.width, + self.detector_config.model.height, # type: ignore[union-attr] + self.detector_config.model.width, # type: ignore[union-attr] 3, ), ) @@ -238,11 +239,13 @@ class AsyncDetectorRunner(FrigateProcess): # mark start time and send to accelerator self.send_times.append(time.perf_counter()) + assert self._detector is not None self._detector.async_send_input(input_frame, connection_id) def _result_worker(self) -> None: logger.info("Starting Result Worker Thread") while not self.stop_event.is_set(): + assert self._detector is not None connection_id, detections = self._detector.async_receive_output() # Handle timeout case (queue.Empty) - just continue @@ -256,6 +259,7 @@ class AsyncDetectorRunner(FrigateProcess): duration = time.perf_counter() - ts # release input buffer + assert self._frame_manager is not None self._frame_manager.close(connection_id) if connection_id not in self.outputs: @@ -264,6 +268,7 @@ class AsyncDetectorRunner(FrigateProcess): # write results and publish if detections is not None: self.outputs[connection_id]["np"][:] = detections[:] + assert self._publisher is not None self._publisher.publish(connection_id) # update timers @@ -330,11 +335,14 @@ class ObjectDetectProcess: self.stop_event = stop_event self.start_or_restart() - def stop(self): + def stop(self) -> None: # if the process has already exited on its own, just return if self.detect_process and self.detect_process.exitcode: return + if self.detect_process is None: + return + logging.info("Waiting for detection process to exit gracefully...") self.detect_process.join(timeout=30) if self.detect_process.exitcode is None: @@ -343,8 +351,8 @@ class ObjectDetectProcess: self.detect_process.join() logging.info("Detection process has exited...") - def start_or_restart(self): - self.detection_start.value = 0.0 + def start_or_restart(self) -> None: + self.detection_start.value = 0.0 # type: ignore[attr-defined] if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() @@ -389,17 +397,19 @@ class RemoteObjectDetector: self.detection_queue = detection_queue self.stop_event = stop_event self.shm = UntrackedSharedMemory(name=self.name, create=False) - self.np_shm = np.ndarray( + self.np_shm: np.ndarray = np.ndarray( (1, model_config.height, model_config.width, 3), dtype=np.uint8, buffer=self.shm.buf, ) self.out_shm = UntrackedSharedMemory(name=f"out-{self.name}", create=False) - self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf) + self.out_np_shm: np.ndarray = np.ndarray( + (20, 6), dtype=np.float32, buffer=self.out_shm.buf + ) self.detector_subscriber = ObjectDetectorSubscriber(name) - def detect(self, tensor_input, threshold=0.4): - detections = [] + def detect(self, tensor_input: np.ndarray, threshold: float = 0.4) -> list: + detections: list = [] if self.stop_event.is_set(): return detections @@ -431,7 +441,7 @@ class RemoteObjectDetector: self.fps.update() return detections - def cleanup(self): + def cleanup(self) -> None: self.detector_subscriber.stop() self.shm.unlink() self.out_shm.unlink() diff --git a/frigate/object_detection/util.py b/frigate/object_detection/util.py index ea8bd4226..4e351d66a 100644 --- a/frigate/object_detection/util.py +++ b/frigate/object_detection/util.py @@ -13,10 +13,10 @@ class RequestStore: A thread-safe hash-based response store that handles creating requests. """ - def __init__(self): + def __init__(self) -> None: self.request_counter = 0 self.request_counter_lock = threading.Lock() - self.input_queue = queue.Queue() + self.input_queue: queue.Queue[tuple[int, ndarray]] = queue.Queue() def __get_request_id(self) -> int: with self.request_counter_lock: @@ -45,17 +45,19 @@ class ResponseStore: their request's result appears. """ - def __init__(self): - self.responses = {} # Maps request_id -> (original_input, infer_results) + def __init__(self) -> None: + self.responses: dict[ + int, ndarray + ] = {} # Maps request_id -> (original_input, infer_results) self.lock = threading.Lock() self.cond = threading.Condition(self.lock) - def put(self, request_id: int, response: ndarray): + def put(self, request_id: int, response: ndarray) -> None: with self.cond: self.responses[request_id] = response self.cond.notify_all() - def get(self, request_id: int, timeout=None) -> ndarray: + def get(self, request_id: int, timeout: float | None = None) -> ndarray: with self.cond: if not self.cond.wait_for( lambda: request_id in self.responses, timeout=timeout @@ -65,7 +67,9 @@ class ResponseStore: return self.responses.pop(request_id) -def tensor_transform(desired_shape: InputTensorEnum): +def tensor_transform( + desired_shape: InputTensorEnum, +) -> tuple[int, int, int, int] | None: # Currently this function only supports BHWC permutations if desired_shape == InputTensorEnum.nhwc: return None diff --git a/frigate/output/birdseye.py b/frigate/output/birdseye.py index 5d80de33c..8b0fea6d7 100644 --- a/frigate/output/birdseye.py +++ b/frigate/output/birdseye.py @@ -4,13 +4,13 @@ import datetime import glob import logging import math -import multiprocessing as mp import os import queue import subprocess as sp import threading import time import traceback +from multiprocessing.synchronize import Event as MpEvent from typing import Any, Optional import cv2 @@ -74,25 +74,25 @@ class Canvas: self, canvas_width: int, canvas_height: int, - scaling_factor: int, + scaling_factor: float, ) -> None: self.scaling_factor = scaling_factor gcd = math.gcd(canvas_width, canvas_height) self.aspect = get_standard_aspect_ratio( - (canvas_width / gcd), (canvas_height / gcd) + int(canvas_width / gcd), int(canvas_height / gcd) ) self.width = canvas_width - self.height = (self.width * self.aspect[1]) / self.aspect[0] - self.coefficient_cache: dict[int, int] = {} + self.height: float = (self.width * self.aspect[1]) / self.aspect[0] + self.coefficient_cache: dict[int, float] = {} self.aspect_cache: dict[str, tuple[int, int]] = {} - def get_aspect(self, coefficient: int) -> tuple[int, int]: + def get_aspect(self, coefficient: float) -> tuple[float, float]: return (self.aspect[0] * coefficient, self.aspect[1] * coefficient) - def get_coefficient(self, camera_count: int) -> int: + def get_coefficient(self, camera_count: int) -> float: return self.coefficient_cache.get(camera_count, self.scaling_factor) - def set_coefficient(self, camera_count: int, coefficient: int) -> None: + def set_coefficient(self, camera_count: int, coefficient: float) -> None: self.coefficient_cache[camera_count] = coefficient def get_camera_aspect( @@ -105,7 +105,7 @@ class Canvas: gcd = math.gcd(camera_width, camera_height) camera_aspect = get_standard_aspect_ratio( - camera_width / gcd, camera_height / gcd + int(camera_width / gcd), int(camera_height / gcd) ) self.aspect_cache[cam_name] = camera_aspect return camera_aspect @@ -116,7 +116,7 @@ class FFMpegConverter(threading.Thread): self, ffmpeg: FfmpegConfig, input_queue: queue.Queue, - stop_event: mp.Event, + stop_event: MpEvent, in_width: int, in_height: int, out_width: int, @@ -128,7 +128,7 @@ class FFMpegConverter(threading.Thread): self.camera = "birdseye" self.input_queue = input_queue self.stop_event = stop_event - self.bd_pipe = None + self.bd_pipe: int | None = None if birdseye_rtsp: self.recreate_birdseye_pipe() @@ -181,7 +181,8 @@ class FFMpegConverter(threading.Thread): os.close(stdin) self.reading_birdseye = False - def __write(self, b) -> None: + def __write(self, b: bytes) -> None: + assert self.process.stdin is not None self.process.stdin.write(b) if self.bd_pipe: @@ -200,13 +201,13 @@ class FFMpegConverter(threading.Thread): return - def read(self, length): + def read(self, length: int) -> Any: try: - return self.process.stdout.read1(length) + return self.process.stdout.read1(length) # type: ignore[union-attr] except ValueError: return False - def exit(self): + def exit(self) -> None: if self.bd_pipe: os.close(self.bd_pipe) @@ -233,8 +234,8 @@ class BroadcastThread(threading.Thread): self, camera: str, converter: FFMpegConverter, - websocket_server, - stop_event: mp.Event, + websocket_server: Any, + stop_event: MpEvent, ): super().__init__() self.camera = camera @@ -242,7 +243,7 @@ class BroadcastThread(threading.Thread): self.websocket_server = websocket_server self.stop_event = stop_event - def run(self): + def run(self) -> None: while not self.stop_event.is_set(): buf = self.converter.read(65536) if buf: @@ -270,16 +271,16 @@ class BirdsEyeFrameManager: def __init__( self, config: FrigateConfig, - stop_event: mp.Event, + stop_event: MpEvent, ): self.config = config width, height = get_canvas_shape(config.birdseye.width, config.birdseye.height) self.frame_shape = (height, width) self.yuv_shape = (height * 3 // 2, width) - self.frame = np.ndarray(self.yuv_shape, dtype=np.uint8) + self.frame: np.ndarray = np.ndarray(self.yuv_shape, dtype=np.uint8) self.canvas = Canvas(width, height, config.birdseye.layout.scaling_factor) self.stop_event = stop_event - self.last_refresh_time = 0 + self.last_refresh_time: float = 0 # initialize the frame as black and with the Frigate logo self.blank_frame = np.zeros(self.yuv_shape, np.uint8) @@ -323,15 +324,15 @@ class BirdsEyeFrameManager: self.frame[:] = self.blank_frame - self.cameras = {} + self.cameras: dict[str, Any] = {} for camera in self.config.cameras.keys(): self.add_camera(camera) - self.camera_layout = [] - self.active_cameras = set() + self.camera_layout: list[Any] = [] + self.active_cameras: set[str] = set() self.last_output_time = 0.0 - def add_camera(self, cam: str): + def add_camera(self, cam: str) -> None: """Add a camera to self.cameras with the correct structure.""" settings = self.config.cameras[cam] # precalculate the coordinates for all the channels @@ -361,16 +362,21 @@ class BirdsEyeFrameManager: }, } - def remove_camera(self, cam: str): + def remove_camera(self, cam: str) -> None: """Remove a camera from self.cameras.""" if cam in self.cameras: del self.cameras[cam] - def clear_frame(self): + def clear_frame(self) -> None: logger.debug("Clearing the birdseye frame") self.frame[:] = self.blank_frame - def copy_to_position(self, position, camera=None, frame: np.ndarray = None): + def copy_to_position( + self, + position: Any, + camera: Optional[str] = None, + frame: Optional[np.ndarray] = None, + ) -> None: if camera is None: frame = None channel_dims = None @@ -389,7 +395,9 @@ class BirdsEyeFrameManager: channel_dims, ) - def camera_active(self, mode, object_box_count, motion_box_count): + def camera_active( + self, mode: Any, object_box_count: int, motion_box_count: int + ) -> bool: if mode == BirdseyeModeEnum.continuous: return True @@ -399,6 +407,8 @@ class BirdsEyeFrameManager: if mode == BirdseyeModeEnum.objects and object_box_count > 0: return True + return False + def get_camera_coordinates(self) -> dict[str, dict[str, int]]: """Return the coordinates of each camera in the current layout.""" coordinates = {} @@ -451,7 +461,7 @@ class BirdsEyeFrameManager: - self.cameras[active_camera]["last_active_frame"] ), ) - active_cameras = limited_active_cameras[:max_cameras] + active_cameras = set(limited_active_cameras[:max_cameras]) max_camera_refresh = True self.last_refresh_time = now @@ -510,7 +520,7 @@ class BirdsEyeFrameManager: # center camera view in canvas and ensure that it fits if scaled_width < self.canvas.width: - coefficient = 1 + coefficient: float = 1 x_offset = int((self.canvas.width - scaled_width) / 2) else: coefficient = self.canvas.width / scaled_width @@ -557,7 +567,7 @@ class BirdsEyeFrameManager: calculating = False self.canvas.set_coefficient(len(active_cameras), coefficient) - self.camera_layout = layout_candidate + self.camera_layout = layout_candidate or [] frame_changed = True # Draw the layout @@ -577,10 +587,12 @@ class BirdsEyeFrameManager: self, cameras_to_add: list[str], coefficient: float, - ) -> tuple[Any]: + ) -> Optional[list[list[Any]]]: """Calculate the optimal layout for 2+ cameras.""" - def map_layout(camera_layout: list[list[Any]], row_height: int): + def map_layout( + camera_layout: list[list[Any]], row_height: int + ) -> tuple[int, int, Optional[list[list[Any]]]]: """Map the calculated layout.""" candidate_layout = [] starting_x = 0 @@ -777,11 +789,11 @@ class Birdseye: def __init__( self, config: FrigateConfig, - stop_event: mp.Event, - websocket_server, + stop_event: MpEvent, + websocket_server: Any, ) -> None: self.config = config - self.input = queue.Queue(maxsize=10) + self.input: queue.Queue[bytes] = queue.Queue(maxsize=10) self.converter = FFMpegConverter( config.ffmpeg, self.input, @@ -806,7 +818,7 @@ class Birdseye: ) if config.birdseye.restream: - self.birdseye_buffer = self.frame_manager.create( + self.birdseye_buffer: Any = self.frame_manager.create( "birdseye", self.birdseye_manager.yuv_shape[0] * self.birdseye_manager.yuv_shape[1], ) diff --git a/frigate/output/camera.py b/frigate/output/camera.py index 2311ec659..917e38dd1 100644 --- a/frigate/output/camera.py +++ b/frigate/output/camera.py @@ -1,10 +1,11 @@ """Handle outputting individual cameras via jsmpeg.""" import logging -import multiprocessing as mp import queue import subprocess as sp import threading +from multiprocessing.synchronize import Event as MpEvent +from typing import Any from frigate.config import CameraConfig, FfmpegConfig @@ -17,7 +18,7 @@ class FFMpegConverter(threading.Thread): camera: str, ffmpeg: FfmpegConfig, input_queue: queue.Queue, - stop_event: mp.Event, + stop_event: MpEvent, in_width: int, in_height: int, out_width: int, @@ -64,16 +65,17 @@ class FFMpegConverter(threading.Thread): start_new_session=True, ) - def __write(self, b) -> None: + def __write(self, b: bytes) -> None: + assert self.process.stdin is not None self.process.stdin.write(b) - def read(self, length): + def read(self, length: int) -> Any: try: - return self.process.stdout.read1(length) + return self.process.stdout.read1(length) # type: ignore[union-attr] except ValueError: return False - def exit(self): + def exit(self) -> None: self.process.terminate() try: @@ -98,8 +100,8 @@ class BroadcastThread(threading.Thread): self, camera: str, converter: FFMpegConverter, - websocket_server, - stop_event: mp.Event, + websocket_server: Any, + stop_event: MpEvent, ): super().__init__() self.camera = camera @@ -107,7 +109,7 @@ class BroadcastThread(threading.Thread): self.websocket_server = websocket_server self.stop_event = stop_event - def run(self): + def run(self) -> None: while not self.stop_event.is_set(): buf = self.converter.read(65536) if buf: @@ -133,15 +135,15 @@ class BroadcastThread(threading.Thread): class JsmpegCamera: def __init__( - self, config: CameraConfig, stop_event: mp.Event, websocket_server + self, config: CameraConfig, stop_event: MpEvent, websocket_server: Any ) -> None: self.config = config - self.input = queue.Queue(maxsize=config.detect.fps) + self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps) width = int( config.live.height * (config.frame_shape[1] / config.frame_shape[0]) ) self.converter = FFMpegConverter( - config.name, + config.name or "", config.ffmpeg, self.input, stop_event, @@ -152,13 +154,13 @@ class JsmpegCamera: config.live.quality, ) self.broadcaster = BroadcastThread( - config.name, self.converter, websocket_server, stop_event + config.name or "", self.converter, websocket_server, stop_event ) self.converter.start() self.broadcaster.start() - def write_frame(self, frame_bytes) -> None: + def write_frame(self, frame_bytes: bytes) -> None: try: self.input.put_nowait(frame_bytes) except queue.Full: diff --git a/frigate/output/output.py b/frigate/output/output.py index 83962e1c9..22bcbb31f 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -61,6 +61,12 @@ def check_disabled_camera_update( # last camera update was more than 1 second ago # need to send empty data to birdseye because current # frame is now out of date + cam_width = config.cameras[camera].detect.width + cam_height = config.cameras[camera].detect.height + + if cam_width is None or cam_height is None: + raise ValueError(f"Camera {camera} detect dimensions not configured") + if birdseye and offline_time < 10: # we only need to send blank frames to birdseye at the beginning of a camera being offline birdseye.write_data( @@ -68,10 +74,7 @@ def check_disabled_camera_update( [], [], now, - get_blank_yuv_frame( - config.cameras[camera].detect.width, - config.cameras[camera].detect.height, - ), + get_blank_yuv_frame(cam_width, cam_height), ) if not has_enabled_camera and birdseye: @@ -173,7 +176,7 @@ class OutputProcess(FrigateProcess): birdseye_config_subscriber.check_for_update() ) - if update_topic is not None: + if update_topic is not None and birdseye_config is not None: previous_global_mode = self.config.birdseye.mode self.config.birdseye = birdseye_config @@ -198,7 +201,10 @@ class OutputProcess(FrigateProcess): birdseye, ) - (topic, data) = detection_subscriber.check_for_update(timeout=1) + _result = detection_subscriber.check_for_update(timeout=1) + if _result is None: + continue + (topic, data) = _result now = datetime.datetime.now().timestamp() if now - last_disabled_cam_check > 5: @@ -208,7 +214,7 @@ class OutputProcess(FrigateProcess): self.config, birdseye, preview_recorders, preview_write_times ) - if not topic: + if not topic or data is None: continue ( @@ -262,11 +268,15 @@ class OutputProcess(FrigateProcess): jsmpeg_cameras[camera].write_frame(frame.tobytes()) # send output data to birdseye if websocket is connected or restreaming - if self.config.birdseye.enabled and ( - self.config.birdseye.restream - or any( - ws.environ["PATH_INFO"].endswith("birdseye") - for ws in websocket_server.manager + if ( + self.config.birdseye.enabled + and birdseye is not None + and ( + self.config.birdseye.restream + or any( + ws.environ["PATH_INFO"].endswith("birdseye") + for ws in websocket_server.manager + ) ) ): birdseye.write_data( @@ -282,9 +292,12 @@ class OutputProcess(FrigateProcess): move_preview_frames("clips") while True: - (topic, data) = detection_subscriber.check_for_update(timeout=0) + _cleanup_result = detection_subscriber.check_for_update(timeout=0) + if _cleanup_result is None: + break + (topic, data) = _cleanup_result - if not topic: + if not topic or data is None: break ( @@ -322,7 +335,7 @@ class OutputProcess(FrigateProcess): logger.info("exiting output process...") -def move_preview_frames(loc: str): +def move_preview_frames(loc: str) -> None: preview_holdover = os.path.join(CLIPS_DIR, "preview_restart_cache") preview_cache = os.path.join(CACHE_DIR, "preview_frames") diff --git a/frigate/output/preview.py b/frigate/output/preview.py index 2c439038a..041d952d9 100644 --- a/frigate/output/preview.py +++ b/frigate/output/preview.py @@ -22,7 +22,6 @@ from frigate.ffmpeg_presets import ( parse_preset_hardware_acceleration_encode, ) from frigate.models import Previews -from frigate.track.object_processing import TrackedObject from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop logger = logging.getLogger(__name__) @@ -66,7 +65,9 @@ def get_cache_image_name(camera: str, frame_time: float) -> str: ) -def get_most_recent_preview_frame(camera: str, before: float = None) -> str | None: +def get_most_recent_preview_frame( + camera: str, before: float | None = None +) -> str | None: """Get the most recent preview frame for a camera.""" if not os.path.exists(PREVIEW_CACHE_DIR): return None @@ -147,12 +148,12 @@ class FFMpegConverter(threading.Thread): if t_idx == item_count - 1: # last frame does not get a duration playlist.append( - f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" + f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type] ) continue playlist.append( - f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" + f"file '{get_cache_image_name(self.config.name, self.frame_times[t_idx])}'" # type: ignore[arg-type] ) playlist.append( f"duration {self.frame_times[t_idx + 1] - self.frame_times[t_idx]}" @@ -199,30 +200,33 @@ class FFMpegConverter(threading.Thread): # unlink files from cache # don't delete last frame as it will be used as first frame in next segment for t in self.frame_times[0:-1]: - Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) + Path(get_cache_image_name(self.config.name, t)).unlink(missing_ok=True) # type: ignore[arg-type] class PreviewRecorder: def __init__(self, config: CameraConfig) -> None: self.config = config - self.start_time = 0 - self.last_output_time = 0 + self.camera_name: str = config.name or "" + self.start_time: float = 0 + self.last_output_time: float = 0 self.offline = False - self.output_frames = [] + self.output_frames: list[float] = [] - if config.detect.width > config.detect.height: + if config.detect.width is None or config.detect.height is None: + raise ValueError("Detect width and height must be set for previews.") + + self.detect_width: int = config.detect.width + self.detect_height: int = config.detect.height + + if self.detect_width > self.detect_height: self.out_height = PREVIEW_HEIGHT self.out_width = ( - int((config.detect.width / config.detect.height) * self.out_height) - // 4 - * 4 + int((self.detect_width / self.detect_height) * self.out_height) // 4 * 4 ) else: self.out_width = PREVIEW_HEIGHT self.out_height = ( - int((config.detect.height / config.detect.width) * self.out_width) - // 4 - * 4 + int((self.detect_height / self.detect_width) * self.out_width) // 4 * 4 ) # create communication for finished previews @@ -302,7 +306,7 @@ class PreviewRecorder: ) self.start_time = frame_time self.last_output_time = frame_time - self.output_frames: list[float] = [] + self.output_frames = [] def should_write_frame( self, @@ -342,7 +346,9 @@ class PreviewRecorder: def write_frame_to_cache(self, frame_time: float, frame: np.ndarray) -> None: # resize yuv frame - small_frame = np.zeros((self.out_height * 3 // 2, self.out_width), np.uint8) + small_frame: np.ndarray = np.zeros( + (self.out_height * 3 // 2, self.out_width), np.uint8 + ) copy_yuv_to_position( small_frame, (0, 0), @@ -356,7 +362,7 @@ class PreviewRecorder: cv2.COLOR_YUV2BGR_I420, ) cv2.imwrite( - get_cache_image_name(self.config.name, frame_time), + get_cache_image_name(self.camera_name, frame_time), small_frame, [ int(cv2.IMWRITE_WEBP_QUALITY), @@ -396,7 +402,7 @@ class PreviewRecorder: ).start() else: logger.debug( - f"Not saving preview for {self.config.name} because there are no saved frames." + f"Not saving preview for {self.camera_name} because there are no saved frames." ) self.reset_frame_cache(frame_time) @@ -416,9 +422,7 @@ class PreviewRecorder: if not self.offline: self.write_frame_to_cache( frame_time, - get_blank_yuv_frame( - self.config.detect.width, self.config.detect.height - ), + get_blank_yuv_frame(self.detect_width, self.detect_height), ) self.offline = True @@ -431,9 +435,9 @@ class PreviewRecorder: return old_frame_path = get_cache_image_name( - self.config.name, self.output_frames[-1] + self.camera_name, self.output_frames[-1] ) - new_frame_path = get_cache_image_name(self.config.name, frame_time) + new_frame_path = get_cache_image_name(self.camera_name, frame_time) shutil.copy(old_frame_path, new_frame_path) # save last frame to ensure consistent duration @@ -447,13 +451,12 @@ class PreviewRecorder: self.reset_frame_cache(frame_time) def stop(self) -> None: - self.config_subscriber.stop() self.requestor.stop() def get_active_objects( - frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject] -) -> list[TrackedObject]: + frame_time: float, camera_config: CameraConfig, all_objects: list[dict[str, Any]] +) -> list[dict[str, Any]]: """get active objects for detection.""" return [ o