diff --git a/frigate/data_processing/common/audio_transcription/model.py b/frigate/data_processing/common/audio_transcription/model.py index 82472ad62..a610ca9e9 100644 --- a/frigate/data_processing/common/audio_transcription/model.py +++ b/frigate/data_processing/common/audio_transcription/model.py @@ -53,7 +53,7 @@ class AudioTranscriptionModelRunner: self.downloader = ModelDownloader( model_name="sherpa-onnx", download_path=download_path, - file_names=self.model_files.keys(), + file_names=list(self.model_files.keys()), download_func=self.__download_models, ) self.downloader.ensure_model_files() diff --git a/frigate/data_processing/common/face/model.py b/frigate/data_processing/common/face/model.py index 51ee64938..45e8b8939 100644 --- a/frigate/data_processing/common/face/model.py +++ b/frigate/data_processing/common/face/model.py @@ -21,7 +21,7 @@ class FaceRecognizer(ABC): def __init__(self, config: FrigateConfig) -> None: self.config = config - self.landmark_detector: cv2.face.FacemarkLBF = None + self.landmark_detector: cv2.face.Facemark | None = None self.init_landmark_detector() @abstractmethod @@ -38,13 +38,14 @@ class FaceRecognizer(ABC): def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: pass - @redirect_output_to_logger(logger, logging.DEBUG) + @redirect_output_to_logger(logger, logging.DEBUG) # type: ignore[misc] def init_landmark_detector(self) -> None: landmark_model = os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml") if os.path.exists(landmark_model): - self.landmark_detector = cv2.face.createFacemarkLBF() - self.landmark_detector.loadModel(landmark_model) + landmark_detector = cv2.face.createFacemarkLBF() + landmark_detector.loadModel(landmark_model) + self.landmark_detector = landmark_detector def align_face( self, @@ -52,8 +53,10 @@ class FaceRecognizer(ABC): output_width: int, output_height: int, ) -> np.ndarray: - # landmark is run on grayscale images + if not self.landmark_detector: + raise ValueError("Landmark detector not initialized") + # landmark is run on grayscale images if image.ndim == 3: land_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: @@ -131,8 +134,11 @@ class FaceRecognizer(ABC): def similarity_to_confidence( - cosine_similarity: float, median=0.3, range_width=0.6, slope_factor=12 -): + cosine_similarity: float, + median: float = 0.3, + range_width: float = 0.6, + slope_factor: float = 12, +) -> float: """ Default sigmoid function to map cosine similarity to confidence. @@ -151,14 +157,14 @@ def similarity_to_confidence( bias = median # Calculate confidence - confidence = 1 / (1 + np.exp(-slope * (cosine_similarity - bias))) + confidence: float = 1 / (1 + np.exp(-slope * (cosine_similarity - bias))) return confidence class FaceNetRecognizer(FaceRecognizer): def __init__(self, config: FrigateConfig): super().__init__(config) - self.mean_embs: dict[int, np.ndarray] = {} + self.mean_embs: dict[str, np.ndarray] = {} self.face_embedder: FaceNetEmbedding = FaceNetEmbedding() self.model_builder_queue: queue.Queue | None = None @@ -168,7 +174,7 @@ class FaceNetRecognizer(FaceRecognizer): def run_build_task(self) -> None: self.model_builder_queue = queue.Queue() - def build_model(): + def build_model() -> None: face_embeddings_map: dict[str, list[np.ndarray]] = {} idx = 0 @@ -187,7 +193,7 @@ class FaceNetRecognizer(FaceRecognizer): img = cv2.imread(os.path.join(face_folder, image)) if img is None: - continue + continue # type: ignore[unreachable] img = self.align_face(img, img.shape[1], img.shape[0]) emb = self.face_embedder([img])[0].squeeze() @@ -195,12 +201,13 @@ class FaceNetRecognizer(FaceRecognizer): idx += 1 + assert self.model_builder_queue is not None self.model_builder_queue.put(face_embeddings_map) thread = threading.Thread(target=build_model, daemon=True) thread.start() - def build(self): + def build(self) -> None: if not self.landmark_detector: self.init_landmark_detector() return None @@ -226,7 +233,7 @@ class FaceNetRecognizer(FaceRecognizer): logger.debug("Finished building ArcFace model") - def classify(self, face_image): + def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: if not self.landmark_detector: return None @@ -245,7 +252,7 @@ class FaceNetRecognizer(FaceRecognizer): img = self.align_face(face_image, face_image.shape[1], face_image.shape[0]) embedding = self.face_embedder([img])[0].squeeze() - score = 0 + score: float = 0 label = "" for name, mean_emb in self.mean_embs.items(): @@ -268,7 +275,7 @@ class FaceNetRecognizer(FaceRecognizer): class ArcFaceRecognizer(FaceRecognizer): def __init__(self, config: FrigateConfig): super().__init__(config) - self.mean_embs: dict[int, np.ndarray] = {} + self.mean_embs: dict[str, np.ndarray] = {} self.face_embedder: ArcfaceEmbedding = ArcfaceEmbedding(config.face_recognition) self.model_builder_queue: queue.Queue | None = None @@ -278,7 +285,7 @@ class ArcFaceRecognizer(FaceRecognizer): def run_build_task(self) -> None: self.model_builder_queue = queue.Queue() - def build_model(): + def build_model() -> None: face_embeddings_map: dict[str, list[np.ndarray]] = {} idx = 0 @@ -297,20 +304,21 @@ class ArcFaceRecognizer(FaceRecognizer): img = cv2.imread(os.path.join(face_folder, image)) if img is None: - continue + continue # type: ignore[unreachable] img = self.align_face(img, img.shape[1], img.shape[0]) - emb = self.face_embedder([img])[0].squeeze() + emb = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type] face_embeddings_map[name].append(emb) idx += 1 + assert self.model_builder_queue is not None self.model_builder_queue.put(face_embeddings_map) thread = threading.Thread(target=build_model, daemon=True) thread.start() - def build(self): + def build(self) -> None: if not self.landmark_detector: self.init_landmark_detector() return None @@ -336,7 +344,7 @@ class ArcFaceRecognizer(FaceRecognizer): logger.debug("Finished building ArcFace model") - def classify(self, face_image): + def classify(self, face_image: np.ndarray) -> tuple[str, float] | None: if not self.landmark_detector: return None @@ -353,9 +361,9 @@ class ArcFaceRecognizer(FaceRecognizer): # align face and run recognition img = self.align_face(face_image, face_image.shape[1], face_image.shape[0]) - embedding = self.face_embedder([img])[0].squeeze() + embedding = self.face_embedder([img])[0].squeeze() # type: ignore[arg-type] - score = 0 + score: float = 0 label = "" for name, mean_emb in self.mean_embs.items(): diff --git a/frigate/data_processing/common/license_plate/mixin.py b/frigate/data_processing/common/license_plate/mixin.py index e4fbd1172..462a314f2 100644 --- a/frigate/data_processing/common/license_plate/mixin.py +++ b/frigate/data_processing/common/license_plate/mixin.py @@ -10,7 +10,7 @@ import random import re import string from pathlib import Path -from typing import Any, List, Optional, Tuple +from typing import Any, List, Tuple import cv2 import numpy as np @@ -22,19 +22,35 @@ from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, EventMetadataTypeEnum, ) +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import FrigateConfig +from frigate.config.classification import LicensePlateRecognitionConfig from frigate.const import CLIPS_DIR, MODEL_CACHE_DIR +from frigate.data_processing.common.license_plate.model import LicensePlateModelRunner from frigate.embeddings.onnx.lpr_embedding import LPR_EMBEDDING_SIZE from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import EventsPerSecond, InferenceSpeed from frigate.util.image import area +from ...types import DataProcessorMetrics + logger = logging.getLogger(__name__) WRITE_DEBUG_IMAGES = False class LicensePlateProcessingMixin: - def __init__(self, *args, **kwargs): + # Attributes expected from consuming classes (set before super().__init__) + config: FrigateConfig + metrics: DataProcessorMetrics + model_runner: LicensePlateModelRunner + lpr_config: LicensePlateRecognitionConfig + requestor: InterProcessRequestor + detected_license_plates: dict[str, dict[str, Any]] + camera_current_cars: dict[str, list[str]] + sub_label_publisher: EventMetadataPublisher + + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.plate_rec_speed = InferenceSpeed(self.metrics.alpr_speed) self.plates_rec_second = EventsPerSecond() @@ -97,7 +113,7 @@ class LicensePlateProcessingMixin: ) try: - outputs = self.model_runner.detection_model([normalized_image])[0] + outputs = self.model_runner.detection_model([normalized_image])[0] # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running LPR box detection model: {e}") return [] @@ -105,18 +121,18 @@ class LicensePlateProcessingMixin: outputs = outputs[0, :, :] if False: - current_time = int(datetime.datetime.now().timestamp()) + current_time = int(datetime.datetime.now().timestamp()) # type: ignore[unreachable] cv2.imwrite( f"debug/frames/probability_map_{current_time}.jpg", (outputs * 255).astype(np.uint8), ) boxes, _ = self._boxes_from_bitmap(outputs, outputs > self.mask_thresh, w, h) - return self._filter_polygon(boxes, (h, w)) + return self._filter_polygon(boxes, (h, w)) # type: ignore[return-value,arg-type] def _classify( self, images: List[np.ndarray] - ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]]: + ) -> Tuple[List[np.ndarray], List[Tuple[str, float]]] | None: """ Classify the orientation or category of each detected license plate. @@ -138,15 +154,15 @@ class LicensePlateProcessingMixin: norm_images.append(norm_img) try: - outputs = self.model_runner.classification_model(norm_images) + outputs = self.model_runner.classification_model(norm_images) # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running LPR classification model: {e}") - return + return None return self._process_classification_output(images, outputs) def _recognize( - self, camera: string, images: List[np.ndarray] + self, camera: str, images: List[np.ndarray] ) -> Tuple[List[str], List[List[float]]]: """ Recognize the characters on the detected license plates using the recognition model. @@ -179,7 +195,7 @@ class LicensePlateProcessingMixin: norm_images.append(norm_image) try: - outputs = self.model_runner.recognition_model(norm_images) + outputs = self.model_runner.recognition_model(norm_images) # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running LPR recognition model: {e}") return [], [] @@ -410,7 +426,8 @@ class LicensePlateProcessingMixin: ) if sorted_data: - return map(list, zip(*sorted_data)) + plates, confs, areas_list = zip(*sorted_data) + return list(plates), list(confs), list(areas_list) return [], [], [] @@ -532,7 +549,7 @@ class LicensePlateProcessingMixin: # Add the last box merged_boxes.append(current_box) - return np.array(merged_boxes, dtype=np.int32) + return np.array(merged_boxes, dtype=np.int32) # type: ignore[return-value] def _boxes_from_bitmap( self, output: np.ndarray, mask: np.ndarray, dest_width: int, dest_height: int @@ -560,38 +577,42 @@ class LicensePlateProcessingMixin: boxes = [] scores = [] - for index in range(len(contours)): - contour = contours[index] + for index in range(len(contours)): # type: ignore[arg-type] + contour = contours[index] # type: ignore[index] # get minimum bounding box (rotated rectangle) around the contour and the smallest side length. points, sside = self._get_min_boxes(contour) if sside < self.min_size: continue - points = np.array(points, dtype=np.float32) + points = np.array(points, dtype=np.float32) # type: ignore[assignment] score = self._box_score(output, contour) if self.box_thresh > score: continue - points = self._expand_box(points) + points = self._expand_box(points) # type: ignore[assignment] # Get the minimum area rectangle again after expansion - points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) + points, sside = self._get_min_boxes(points.reshape(-1, 1, 2)) # type: ignore[attr-defined] if sside < self.min_size + 2: continue - points = np.array(points, dtype=np.float32) + points = np.array(points, dtype=np.float32) # type: ignore[assignment] # normalize and clip box coordinates to fit within the destination image size. - points[:, 0] = np.clip( - np.round(points[:, 0] / width * dest_width), 0, dest_width + points[:, 0] = np.clip( # type: ignore[call-overload] + np.round(points[:, 0] / width * dest_width), # type: ignore[call-overload] + 0, + dest_width, ) - points[:, 1] = np.clip( - np.round(points[:, 1] / height * dest_height), 0, dest_height + points[:, 1] = np.clip( # type: ignore[call-overload] + np.round(points[:, 1] / height * dest_height), # type: ignore[call-overload] + 0, + dest_height, ) - boxes.append(points.astype("int32")) + boxes.append(points.astype("int32")) # type: ignore[attr-defined] scores.append(score) return np.array(boxes, dtype="int32"), scores @@ -632,7 +653,7 @@ class LicensePlateProcessingMixin: x1, y1 = np.clip(contour.min(axis=0), 0, [w - 1, h - 1]) x2, y2 = np.clip(contour.max(axis=0), 0, [w - 1, h - 1]) mask = np.zeros((y2 - y1 + 1, x2 - x1 + 1), dtype=np.uint8) - cv2.fillPoly(mask, [contour - [x1, y1]], 1) + cv2.fillPoly(mask, [contour - [x1, y1]], 1) # type: ignore[call-overload] return cv2.mean(bitmap[y1 : y2 + 1, x1 : x2 + 1], mask)[0] @staticmethod @@ -690,7 +711,7 @@ class LicensePlateProcessingMixin: Returns: bool: Whether the polygon is valid or not. """ - return ( + return bool( point[:, 0].min() >= 0 and point[:, 0].max() < width and point[:, 1].min() >= 0 @@ -735,7 +756,7 @@ class LicensePlateProcessingMixin: return np.array([tl, tr, br, bl]) @staticmethod - def _sort_boxes(boxes): + def _sort_boxes(boxes: list[np.ndarray]) -> list[np.ndarray]: """ Sort polygons based on their position in the image. If boxes are close in vertical position (within 5 pixels), sort them by horizontal position. @@ -837,16 +858,16 @@ class LicensePlateProcessingMixin: results = [["", 0.0]] * len(images) indices = np.argsort(np.array([x.shape[1] / x.shape[0] for x in images])) - outputs = np.stack(outputs) + stacked_outputs = np.stack(outputs) - outputs = [ - (labels[idx], outputs[i, idx]) - for i, idx in enumerate(outputs.argmax(axis=1)) + stacked_outputs = [ + (labels[idx], stacked_outputs[i, idx]) + for i, idx in enumerate(stacked_outputs.argmax(axis=1)) ] for i in range(0, len(images), self.batch_size): - for j in range(len(outputs)): - label, score = outputs[j] + for j in range(len(stacked_outputs)): + label, score = stacked_outputs[j] results[indices[i + j]] = [label, score] # make sure we have high confidence if we need to flip a box if "180" in label and score >= 0.7: @@ -854,10 +875,10 @@ class LicensePlateProcessingMixin: images[indices[i + j]], cv2.ROTATE_180 ) - return images, results + return images, results # type: ignore[return-value] def _preprocess_recognition_image( - self, camera: string, image: np.ndarray, max_wh_ratio: float + self, camera: str, image: np.ndarray, max_wh_ratio: float ) -> np.ndarray: """ Preprocess an image for recognition by dynamically adjusting its width. @@ -925,7 +946,7 @@ class LicensePlateProcessingMixin: input_w = int(input_h * max_wh_ratio) # check for model-specific input width - model_input_w = self.model_runner.recognition_model.runner.get_input_width() + model_input_w = self.model_runner.recognition_model.runner.get_input_width() # type: ignore[union-attr] if isinstance(model_input_w, int) and model_input_w > 0: input_w = model_input_w @@ -945,7 +966,7 @@ class LicensePlateProcessingMixin: padded_image[:, :, :resized_w] = resized_image if False: - current_time = int(datetime.datetime.now().timestamp() * 1000) + current_time = int(datetime.datetime.now().timestamp() * 1000) # type: ignore[unreachable] cv2.imwrite( f"debug/frames/preprocessed_recognition_{current_time}.jpg", image, @@ -983,8 +1004,9 @@ class LicensePlateProcessingMixin: np.linalg.norm(points[1] - points[2]), ) ) - pts_std = np.float32( - [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]] + pts_std = np.array( + [[0, 0], [crop_width, 0], [crop_width, crop_height], [0, crop_height]], + dtype=np.float32, ) matrix = cv2.getPerspectiveTransform(points, pts_std) image = cv2.warpPerspective( @@ -1000,15 +1022,15 @@ class LicensePlateProcessingMixin: return image def _detect_license_plate( - self, camera: string, input: np.ndarray - ) -> tuple[int, int, int, int]: + self, camera: str, input: np.ndarray + ) -> tuple[int, int, int, int] | None: """ Use a lightweight YOLOv9 model to detect license plates for users without Frigate+ Return the dimensions of the detected plate as [x1, y1, x2, y2]. """ try: - predictions = self.model_runner.yolov9_detection_model(input) + predictions = self.model_runner.yolov9_detection_model(input) # type: ignore[arg-type] except Exception as e: logger.warning(f"Error running YOLOv9 license plate detection model: {e}") return None @@ -1073,7 +1095,7 @@ class LicensePlateProcessingMixin: logger.debug( f"{camera}: Found license plate. Bounding box: {expanded_box.astype(int)}" ) - return tuple(expanded_box.astype(int)) + return tuple(expanded_box.astype(int)) # type: ignore[return-value] else: return None # No detection above the threshold @@ -1097,7 +1119,7 @@ class LicensePlateProcessingMixin: f" Variant {i + 1}: '{p['plate']}' (conf: {p['conf']:.3f}, area: {p['area']})" ) - clusters = [] + clusters: list[list[dict[str, Any]]] = [] for i, plate in enumerate(plates): merged = False for j, cluster in enumerate(clusters): @@ -1132,7 +1154,7 @@ class LicensePlateProcessingMixin: ) # Best cluster: largest size, tiebroken by max conf - def cluster_score(c): + def cluster_score(c: list[dict[str, Any]]) -> tuple[int, float]: return (len(c), max(v["conf"] for v in c)) best_cluster_idx = max( @@ -1178,7 +1200,7 @@ class LicensePlateProcessingMixin: def lpr_process( self, obj_data: dict[str, Any], frame: np.ndarray, dedicated_lpr: bool = False - ): + ) -> None: """Look for license plates in image.""" self.metrics.alpr_pps.value = self.plates_rec_second.eps() self.metrics.yolov9_lpr_pps.value = self.plates_det_second.eps() @@ -1195,7 +1217,7 @@ class LicensePlateProcessingMixin: rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) # apply motion mask - rgb[self.config.cameras[obj_data].motion.rasterized_mask == 0] = [0, 0, 0] + rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined] if WRITE_DEBUG_IMAGES: cv2.imwrite( @@ -1261,7 +1283,7 @@ class LicensePlateProcessingMixin: "stationary", False ): logger.debug( - f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" + f"{camera}: Skipping LPR for non-stationary {obj_data['label']} object {id} with no position changes. (Detected in {self.config.cameras[camera].detect.min_initialized + 1} concurrent frames, threshold to run is {self.config.cameras[camera].detect.min_initialized + 2} frames)" # type: ignore[operator] ) return @@ -1288,7 +1310,7 @@ class LicensePlateProcessingMixin: if time_since_stationary > self.stationary_scan_duration: return - license_plate: Optional[dict[str, Any]] = None + license_plate = None if "license_plate" not in self.config.cameras[camera].objects.track: logger.debug(f"{camera}: Running manual license_plate detection.") @@ -1301,7 +1323,7 @@ class LicensePlateProcessingMixin: rgb = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) # apply motion mask - rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] + rgb[self.config.cameras[camera].motion.rasterized_mask == 0] = [0, 0, 0] # type: ignore[attr-defined] left, top, right, bottom = car_box car = rgb[top:bottom, left:right] @@ -1378,10 +1400,10 @@ class LicensePlateProcessingMixin: if attr.get("label") != "license_plate": continue - if license_plate is None or attr.get( + if license_plate is None or attr.get( # type: ignore[unreachable] "score", 0.0 ) > license_plate.get("score", 0.0): - license_plate = attr + license_plate = attr # type: ignore[assignment] # no license plates detected in this frame if not license_plate: @@ -1389,9 +1411,9 @@ class LicensePlateProcessingMixin: # we are using dedicated lpr with frigate+ if obj_data.get("label") == "license_plate": - license_plate = obj_data + license_plate = obj_data # type: ignore[assignment] - license_plate_box = license_plate.get("box") + license_plate_box = license_plate.get("box") # type: ignore[attr-defined] # check that license plate is valid if ( @@ -1420,7 +1442,7 @@ class LicensePlateProcessingMixin: 0, [license_plate_frame.shape[1], license_plate_frame.shape[0]] * 2 ) - plate_box = tuple(int(x) for x in expanded_box) + plate_box = tuple(int(x) for x in expanded_box) # type: ignore[assignment] # Crop using the expanded box license_plate_frame = license_plate_frame[ @@ -1596,7 +1618,7 @@ class LicensePlateProcessingMixin: sub_label = next( ( label - for label, plates_list in self.lpr_config.known_plates.items() + for label, plates_list in self.lpr_config.known_plates.items() # type: ignore[union-attr] if any( re.match(f"^{plate}$", rep_plate) or Levenshtein.distance(plate, rep_plate) @@ -1649,14 +1671,16 @@ class LicensePlateProcessingMixin: frame_bgr = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) _, encoded_img = cv2.imencode(".jpg", frame_bgr) self.sub_label_publisher.publish( - (base64.b64encode(encoded_img).decode("ASCII"), id, camera), + (base64.b64encode(encoded_img.tobytes()).decode("ASCII"), id, camera), EventMetadataTypeEnum.save_lpr_snapshot.value, ) - def handle_request(self, topic, request_data) -> dict[str, Any] | None: - return + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: + return None - def lpr_expire(self, object_id: str, camera: str): + def lpr_expire(self, object_id: str, camera: str) -> None: if object_id in self.detected_license_plates: self.detected_license_plates.pop(object_id) @@ -1673,7 +1697,7 @@ class CTCDecoder: for each decoded character sequence. """ - def __init__(self, character_dict_path=None): + def __init__(self, character_dict_path: str | None = None) -> None: """ Initializes the CTCDecoder. :param character_dict_path: Path to the character dictionary file. diff --git a/frigate/data_processing/common/license_plate/model.py b/frigate/data_processing/common/license_plate/model.py index f53ed7d95..f7121e65d 100644 --- a/frigate/data_processing/common/license_plate/model.py +++ b/frigate/data_processing/common/license_plate/model.py @@ -1,3 +1,4 @@ +from frigate.comms.inter_process import InterProcessRequestor from frigate.embeddings.onnx.lpr_embedding import ( LicensePlateDetector, PaddleOCRClassification, @@ -9,7 +10,12 @@ from ...types import DataProcessorModelRunner class LicensePlateModelRunner(DataProcessorModelRunner): - def __init__(self, requestor, device: str = "CPU", model_size: str = "small"): + def __init__( + self, + requestor: InterProcessRequestor, + device: str = "CPU", + model_size: str = "small", + ): super().__init__(requestor, device, model_size) self.detection_model = PaddleOCRDetection( model_size=model_size, requestor=requestor, device=device diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py index 2c1359d96..044e5d245 100644 --- a/frigate/data_processing/post/api.py +++ b/frigate/data_processing/post/api.py @@ -17,7 +17,7 @@ class PostProcessorApi(ABC): self, config: FrigateConfig, metrics: DataProcessorMetrics, - model_runner: DataProcessorModelRunner, + model_runner: DataProcessorModelRunner | None, ) -> None: self.config = config self.metrics = metrics @@ -41,7 +41,7 @@ class PostProcessorApi(ABC): @abstractmethod def handle_request( self, topic: str, request_data: dict[str, Any] - ) -> dict[str, Any] | None: + ) -> dict[str, Any] | str | None: """Handle metadata requests. Args: request_data (dict): containing data about requested change to process. diff --git a/frigate/data_processing/post/audio_transcription.py b/frigate/data_processing/post/audio_transcription.py index 558ab433e..dbeb21028 100644 --- a/frigate/data_processing/post/audio_transcription.py +++ b/frigate/data_processing/post/audio_transcription.py @@ -4,7 +4,7 @@ import logging import os import threading import time -from typing import Optional +from typing import Any, Optional from peewee import DoesNotExist @@ -17,6 +17,7 @@ from frigate.const import ( UPDATE_EVENT_DESCRIPTION, ) from frigate.data_processing.types import PostProcessDataEnum +from frigate.embeddings.embeddings import Embeddings from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.audio import get_audio_from_recording @@ -31,7 +32,7 @@ class AudioTranscriptionPostProcessor(PostProcessorApi): self, config: FrigateConfig, requestor: InterProcessRequestor, - embeddings, + embeddings: Embeddings, metrics: DataProcessorMetrics, ): super().__init__(config, metrics, None) @@ -40,7 +41,7 @@ class AudioTranscriptionPostProcessor(PostProcessorApi): self.embeddings = embeddings self.recognizer = None self.transcription_lock = threading.Lock() - self.transcription_thread = None + self.transcription_thread: threading.Thread | None = None self.transcription_running = False # faster-whisper handles model downloading automatically @@ -69,7 +70,7 @@ class AudioTranscriptionPostProcessor(PostProcessorApi): self.recognizer = None def process_data( - self, data: dict[str, any], data_type: PostProcessDataEnum + self, data: dict[str, Any], data_type: PostProcessDataEnum ) -> None: """Transcribe audio from a recording. @@ -141,13 +142,13 @@ class AudioTranscriptionPostProcessor(PostProcessorApi): except Exception as e: logger.error(f"Error in audio transcription post-processing: {e}") - def __transcribe_audio(self, audio_data: bytes) -> Optional[tuple[str, float]]: + def __transcribe_audio(self, audio_data: bytes) -> Optional[str]: """Transcribe WAV audio data using faster-whisper.""" if not self.recognizer: logger.debug("Recognizer not initialized") return None - try: + try: # type: ignore[unreachable] # Save audio data to a temporary wav (faster-whisper expects a file) temp_wav = os.path.join(CACHE_DIR, f"temp_audio_{int(time.time())}.wav") with open(temp_wav, "wb") as f: @@ -176,7 +177,7 @@ class AudioTranscriptionPostProcessor(PostProcessorApi): logger.error(f"Error transcribing audio: {e}") return None - def _transcription_wrapper(self, event: dict[str, any]) -> None: + def _transcription_wrapper(self, event: dict[str, Any]) -> None: """Wrapper to run transcription and reset running flag when done.""" try: self.process_data( @@ -194,7 +195,7 @@ class AudioTranscriptionPostProcessor(PostProcessorApi): self.requestor.send_data(UPDATE_AUDIO_TRANSCRIPTION_STATE, "idle") - def handle_request(self, topic: str, request_data: dict[str, any]) -> str | None: + def handle_request(self, topic: str, request_data: dict[str, Any]) -> str | None: if topic == "transcribe_audio": event = request_data["event"] diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 6f5149b9f..aa89aeb12 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -29,7 +29,7 @@ from .api import PostProcessorApi logger = logging.getLogger(__name__) -class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): +class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): # type: ignore[misc] def __init__( self, config: FrigateConfig, @@ -71,7 +71,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): # don't run LPR post processing for now return - event_id = data["event_id"] + event_id = data["event_id"] # type: ignore[unreachable] camera_name = data["camera"] if data_type == PostProcessDataEnum.recording: @@ -225,7 +225,7 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): logger.debug(f"Post processing plate: {event_id}, {frame_time}") self.lpr_process(keyframe_obj_data, frame) - def handle_request(self, topic, request_data) -> dict[str, Any] | None: + def handle_request(self, topic: str, request_data: dict) -> dict[str, Any] | None: if topic == EmbeddingsRequestEnum.reprocess_plate.value: event = request_data["event"] @@ -242,3 +242,5 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): "message": "Successfully requested reprocessing of license plate.", "success": True, } + + return None diff --git a/frigate/data_processing/post/object_descriptions.py b/frigate/data_processing/post/object_descriptions.py index 65ab6f7c3..ccb7cc023 100644 --- a/frigate/data_processing/post/object_descriptions.py +++ b/frigate/data_processing/post/object_descriptions.py @@ -24,7 +24,7 @@ from frigate.util.file import get_event_thumbnail_bytes, load_event_snapshot_ima from frigate.util.image import create_thumbnail, ensure_jpeg_bytes if TYPE_CHECKING: - from frigate.embeddings import Embeddings + from frigate.embeddings.embeddings import Embeddings from ..post.api import PostProcessorApi from ..types import DataProcessorMetrics @@ -139,7 +139,7 @@ class ObjectDescriptionProcessor(PostProcessorApi): ): self._process_genai_description(event, camera_config, thumbnail) else: - self.cleanup_event(event.id) + self.cleanup_event(str(event.id)) def __regenerate_description(self, event_id: str, source: str, force: bool) -> None: """Regenerate the description for an event.""" @@ -149,17 +149,17 @@ class ObjectDescriptionProcessor(PostProcessorApi): logger.error(f"Event {event_id} not found for description regeneration") return - if self.genai_client is None: - logger.error("GenAI not enabled") - return - - camera_config = self.config.cameras[event.camera] + camera_config = self.config.cameras[str(event.camera)] if not camera_config.objects.genai.enabled and not force: logger.error(f"GenAI not enabled for camera {event.camera}") return thumbnail = get_event_thumbnail_bytes(event) + if thumbnail is None: + logger.error("No thumbnail available for %s", event.id) + return + # ensure we have a jpeg to pass to the model thumbnail = ensure_jpeg_bytes(thumbnail) @@ -187,7 +187,9 @@ class ObjectDescriptionProcessor(PostProcessorApi): ) ) - self._genai_embed_description(event, embed_image) + self._genai_embed_description( + event, [img for img in embed_image if img is not None] + ) def process_data(self, frame_data: dict, data_type: PostProcessDataEnum) -> None: """Process a frame update.""" @@ -241,7 +243,7 @@ class ObjectDescriptionProcessor(PostProcessorApi): # Crop snapshot based on region # provide full image if region doesn't exist (manual events) height, width = img.shape[:2] - x1_rel, y1_rel, width_rel, height_rel = event.data.get( + x1_rel, y1_rel, width_rel, height_rel = event.data.get( # type: ignore[attr-defined] "region", [0, 0, 1, 1] ) x1, y1 = int(x1_rel * width), int(y1_rel * height) @@ -258,14 +260,16 @@ class ObjectDescriptionProcessor(PostProcessorApi): return None def _process_genai_description( - self, event: Event, camera_config: CameraConfig, thumbnail + self, event: Event, camera_config: CameraConfig, thumbnail: bytes ) -> None: + event_id = str(event.id) + if event.has_snapshot and camera_config.objects.genai.use_snapshot: snapshot_image = self._read_and_crop_snapshot(event) if not snapshot_image: return - num_thumbnails = len(self.tracked_events.get(event.id, [])) + num_thumbnails = len(self.tracked_events.get(event_id, [])) # ensure we have a jpeg to pass to the model thumbnail = ensure_jpeg_bytes(thumbnail) @@ -277,7 +281,7 @@ class ObjectDescriptionProcessor(PostProcessorApi): else ( [ data["thumbnail"][:] if data.get("thumbnail") else None - for data in self.tracked_events[event.id] + for data in self.tracked_events[event_id] if data.get("thumbnail") ] if num_thumbnails > 0 @@ -286,22 +290,22 @@ class ObjectDescriptionProcessor(PostProcessorApi): ) if camera_config.objects.genai.debug_save_thumbnails and num_thumbnails > 0: - logger.debug(f"Saving {num_thumbnails} thumbnails for event {event.id}") + logger.debug(f"Saving {num_thumbnails} thumbnails for event {event_id}") - Path(os.path.join(CLIPS_DIR, f"genai-requests/{event.id}")).mkdir( + Path(os.path.join(CLIPS_DIR, f"genai-requests/{event_id}")).mkdir( parents=True, exist_ok=True ) - for idx, data in enumerate(self.tracked_events[event.id], 1): + for idx, data in enumerate(self.tracked_events[event_id], 1): jpg_bytes: bytes | None = data["thumbnail"] if jpg_bytes is None: - logger.warning(f"Unable to save thumbnail {idx} for {event.id}.") + logger.warning(f"Unable to save thumbnail {idx} for {event_id}.") else: with open( os.path.join( CLIPS_DIR, - f"genai-requests/{event.id}/{idx}.jpg", + f"genai-requests/{event_id}/{idx}.jpg", ), "wb", ) as j: @@ -310,7 +314,7 @@ class ObjectDescriptionProcessor(PostProcessorApi): # Generate the description. Call happens in a thread since it is network bound. threading.Thread( target=self._genai_embed_description, - name=f"_genai_embed_description_{event.id}", + name=f"_genai_embed_description_{event_id}", daemon=True, args=( event, @@ -319,12 +323,12 @@ class ObjectDescriptionProcessor(PostProcessorApi): ).start() # Clean up tracked events and early request state - self.cleanup_event(event.id) + self.cleanup_event(event_id) def _genai_embed_description(self, event: Event, thumbnails: list[bytes]) -> None: """Embed the description for an event.""" start = datetime.datetime.now().timestamp() - camera_config = self.config.cameras[event.camera] + camera_config = self.config.cameras[str(event.camera)] description = self.genai_client.generate_object_description( camera_config, thumbnails, event ) @@ -346,7 +350,7 @@ class ObjectDescriptionProcessor(PostProcessorApi): # Embed the description if self.config.semantic_search.enabled: - self.embeddings.embed_description(event.id, description) + self.embeddings.embed_description(str(event.id), description) # Check semantic trigger for this description if self.semantic_trigger_processor is not None: diff --git a/frigate/data_processing/post/review_descriptions.py b/frigate/data_processing/post/review_descriptions.py index 57bf0f7d1..1eda8ee8f 100644 --- a/frigate/data_processing/post/review_descriptions.py +++ b/frigate/data_processing/post/review_descriptions.py @@ -48,8 +48,8 @@ class ReviewDescriptionProcessor(PostProcessorApi): self.metrics = metrics self.genai_client = client self.review_desc_speed = InferenceSpeed(self.metrics.review_desc_speed) - self.review_descs_dps = EventsPerSecond() - self.review_descs_dps.start() + self.review_desc_dps = EventsPerSecond() + self.review_desc_dps.start() def calculate_frame_count( self, @@ -59,7 +59,7 @@ class ReviewDescriptionProcessor(PostProcessorApi): ) -> int: """Calculate optimal number of frames based on context size, image source, and resolution. - Token usage varies by resolution: larger images (ultrawide aspect ratios) use more tokens. + Token usage varies by resolution: larger images (ultra-wide aspect ratios) use more tokens. Estimates ~1 token per 1250 pixels. Targets 98% context utilization with safety margin. Capped at 20 frames. """ @@ -68,7 +68,11 @@ class ReviewDescriptionProcessor(PostProcessorApi): detect_width = camera_config.detect.width detect_height = camera_config.detect.height - aspect_ratio = detect_width / detect_height + + if not detect_width or not detect_height: + aspect_ratio = 16 / 9 + else: + aspect_ratio = detect_width / detect_height if image_source == ImageSourceEnum.recordings: if aspect_ratio >= 1: @@ -99,8 +103,10 @@ class ReviewDescriptionProcessor(PostProcessorApi): return min(max(max_frames, 3), 20) - def process_data(self, data, data_type): - self.metrics.review_desc_dps.value = self.review_descs_dps.eps() + def process_data( + self, data: dict[str, Any], data_type: PostProcessDataEnum + ) -> None: + self.metrics.review_desc_dps.value = self.review_desc_dps.eps() if data_type != PostProcessDataEnum.review: return @@ -186,7 +192,7 @@ class ReviewDescriptionProcessor(PostProcessorApi): ) # kickoff analysis - self.review_descs_dps.update() + self.review_desc_dps.update() threading.Thread( target=run_analysis, args=( @@ -202,7 +208,7 @@ class ReviewDescriptionProcessor(PostProcessorApi): ), ).start() - def handle_request(self, topic, request_data): + def handle_request(self, topic: str, request_data: dict[str, Any]) -> str | None: if topic == EmbeddingsRequestEnum.summarize_review.value: start_ts = request_data["start_ts"] end_ts = request_data["end_ts"] @@ -327,7 +333,7 @@ class ReviewDescriptionProcessor(PostProcessorApi): file_start = f"preview_{camera}-" start_file = f"{file_start}{start_time}.webp" end_file = f"{file_start}{end_time}.webp" - all_frames = [] + all_frames: list[str] = [] for file in sorted(os.listdir(preview_dir)): if not file.startswith(file_start): @@ -465,7 +471,7 @@ class ReviewDescriptionProcessor(PostProcessorApi): thumb_data = cv2.imread(thumb_path) if thumb_data is None: - logger.warning( + logger.warning( # type: ignore[unreachable] "Could not read preview frame at %s, skipping", thumb_path ) continue @@ -488,13 +494,12 @@ class ReviewDescriptionProcessor(PostProcessorApi): return thumbs -@staticmethod def run_analysis( requestor: InterProcessRequestor, genai_client: GenAIClient, review_inference_speed: InferenceSpeed, camera_config: CameraConfig, - final_data: dict[str, str], + final_data: dict[str, Any], thumbs: list[bytes], genai_config: GenAIReviewConfig, labelmap_objects: list[str], diff --git a/frigate/data_processing/post/semantic_trigger.py b/frigate/data_processing/post/semantic_trigger.py index ec9e5d220..08f8a2e76 100644 --- a/frigate/data_processing/post/semantic_trigger.py +++ b/frigate/data_processing/post/semantic_trigger.py @@ -19,6 +19,7 @@ from frigate.config import FrigateConfig from frigate.const import CONFIG_DIR from frigate.data_processing.types import PostProcessDataEnum from frigate.db.sqlitevecq import SqliteVecQueueDatabase +from frigate.embeddings.embeddings import Embeddings from frigate.embeddings.util import ZScoreNormalization from frigate.models import Event, Trigger from frigate.util.builtin import cosine_distance @@ -40,8 +41,8 @@ class SemanticTriggerProcessor(PostProcessorApi): requestor: InterProcessRequestor, sub_label_publisher: EventMetadataPublisher, metrics: DataProcessorMetrics, - embeddings, - ): + embeddings: Embeddings, + ) -> None: super().__init__(config, metrics, None) self.db = db self.embeddings = embeddings @@ -236,11 +237,14 @@ class SemanticTriggerProcessor(PostProcessorApi): return # Skip the event if not an object - if event.data.get("type") != "object": + if event.data.get("type") != "object": # type: ignore[attr-defined] return thumbnail_bytes = get_event_thumbnail_bytes(event) + if thumbnail_bytes is None: + return + nparr = np.frombuffer(thumbnail_bytes, np.uint8) thumbnail = cv2.imdecode(nparr, cv2.IMREAD_COLOR) @@ -262,8 +266,10 @@ class SemanticTriggerProcessor(PostProcessorApi): thumbnail, ) - def handle_request(self, topic, request_data): + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | str | None: return None - def expire_object(self, object_id, camera): + def expire_object(self, object_id: str, camera: str) -> None: pass diff --git a/frigate/data_processing/real_time/audio_transcription.py b/frigate/data_processing/real_time/audio_transcription.py index 2e6d599eb..3d1536f73 100644 --- a/frigate/data_processing/real_time/audio_transcription.py +++ b/frigate/data_processing/real_time/audio_transcription.py @@ -4,7 +4,7 @@ import logging import os import queue import threading -from typing import Optional +from typing import Any, Optional import numpy as np @@ -39,11 +39,11 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): self.config = config self.camera_config = camera_config self.requestor = requestor - self.stream = None - self.whisper_model = None + self.stream: Any = None + self.whisper_model: FasterWhisperASR | None = None self.model_runner = model_runner - self.transcription_segments = [] - self.audio_queue = queue.Queue() + self.transcription_segments: list[str] = [] + self.audio_queue: queue.Queue[tuple[dict[str, Any], np.ndarray]] = queue.Queue() self.stop_event = stop_event def __build_recognizer(self) -> None: @@ -142,10 +142,10 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): logger.error(f"Error processing audio stream: {e}") return None - def process_frame(self, obj_data: dict[str, any], frame: np.ndarray) -> None: + def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None: pass - def process_audio(self, obj_data: dict[str, any], audio: np.ndarray) -> bool | None: + def process_audio(self, obj_data: dict[str, Any], audio: np.ndarray) -> bool | None: if audio is None or audio.size == 0: logger.debug("No audio data provided for transcription") return None @@ -269,13 +269,13 @@ class AudioTranscriptionRealTimeProcessor(RealTimeProcessorApi): ) def handle_request( - self, topic: str, request_data: dict[str, any] - ) -> dict[str, any] | None: + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: if topic == "clear_audio_recognizer": self.stream = None self.__build_recognizer() return {"message": "Audio recognizer cleared and rebuilt", "success": True} return None - def expire_object(self, object_id: str) -> None: + def expire_object(self, object_id: str, camera: str) -> None: pass diff --git a/frigate/data_processing/real_time/bird.py b/frigate/data_processing/real_time/bird.py index 38ff1a950..48663f971 100644 --- a/frigate/data_processing/real_time/bird.py +++ b/frigate/data_processing/real_time/bird.py @@ -14,7 +14,7 @@ from frigate.comms.event_metadata_updater import ( from frigate.config import FrigateConfig from frigate.const import MODEL_CACHE_DIR from frigate.log import suppress_stderr_during -from frigate.util.object import calculate_region +from frigate.util.image import calculate_region from ..types import DataProcessorMetrics from .api import RealTimeProcessorApi @@ -35,10 +35,10 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): metrics: DataProcessorMetrics, ): super().__init__(config, metrics) - self.interpreter: Interpreter = None + self.interpreter: Interpreter | None = None self.sub_label_publisher = sub_label_publisher - self.tensor_input_details: dict[str, Any] = None - self.tensor_output_details: dict[str, Any] = None + self.tensor_input_details: list[dict[str, Any]] | None = None + self.tensor_output_details: list[dict[str, Any]] | None = None self.detected_birds: dict[str, float] = {} self.labelmap: dict[int, str] = {} @@ -61,7 +61,7 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): self.downloader = ModelDownloader( model_name="bird", download_path=download_path, - file_names=self.model_files.keys(), + file_names=list(self.model_files.keys()), download_func=self.__download_models, complete_func=self.__build_detector, ) @@ -102,8 +102,12 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): i += 1 line = f.readline() - def process_frame(self, obj_data, frame): - if not self.interpreter: + def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None: + if ( + not self.interpreter + or not self.tensor_input_details + or not self.tensor_output_details + ): return if obj_data["label"] != "bird": @@ -145,7 +149,7 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): self.tensor_output_details[0]["index"] )[0] probs = res / res.sum(axis=0) - best_id = np.argmax(probs) + best_id = int(np.argmax(probs)) if best_id == 964: logger.debug("No bird classification was detected.") @@ -179,9 +183,11 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): self.config.classification = payload logger.debug("Bird classification config updated dynamically") - def handle_request(self, topic, request_data): + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: return None - def expire_object(self, object_id, camera): + def expire_object(self, object_id: str, camera: str) -> None: if object_id in self.detected_birds: self.detected_birds.pop(object_id) diff --git a/frigate/data_processing/real_time/custom_classification.py b/frigate/data_processing/real_time/custom_classification.py index 1a2512e43..1dcf59052 100644 --- a/frigate/data_processing/real_time/custom_classification.py +++ b/frigate/data_processing/real_time/custom_classification.py @@ -24,7 +24,8 @@ from frigate.const import CLIPS_DIR, MODEL_CACHE_DIR from frigate.log import suppress_stderr_during from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import EventsPerSecond, InferenceSpeed, load_labels -from frigate.util.object import box_overlaps, calculate_region +from frigate.util.image import calculate_region +from frigate.util.object import box_overlaps from ..types import DataProcessorMetrics from .api import RealTimeProcessorApi @@ -49,12 +50,16 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi): ): super().__init__(config, metrics) self.model_config = model_config + + if not self.model_config.name: + raise ValueError("Custom classification model name must be set.") + self.requestor = requestor self.model_dir = os.path.join(MODEL_CACHE_DIR, self.model_config.name) self.train_dir = os.path.join(CLIPS_DIR, self.model_config.name, "train") - self.interpreter: Interpreter = None - self.tensor_input_details: dict[str, Any] | None = None - self.tensor_output_details: dict[str, Any] | None = None + self.interpreter: Interpreter | None = None + self.tensor_input_details: list[dict[str, Any]] | None = None + self.tensor_output_details: list[dict[str, Any]] | None = None self.labelmap: dict[int, str] = {} self.classifications_per_second = EventsPerSecond() self.state_history: dict[str, dict[str, Any]] = {} @@ -63,7 +68,7 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi): self.metrics and self.model_config.name in self.metrics.classification_speeds ): - self.inference_speed = InferenceSpeed( + self.inference_speed: InferenceSpeed | None = InferenceSpeed( self.metrics.classification_speeds[self.model_config.name] ) else: @@ -172,12 +177,20 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi): return None - def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray): + def process_frame(self, frame_data: dict[str, Any], frame: np.ndarray) -> None: + if ( + not self.model_config.name + or not self.model_config.state_config + or not self.tensor_input_details + or not self.tensor_output_details + ): + return + if self.metrics and self.model_config.name in self.metrics.classification_cps: self.metrics.classification_cps[ self.model_config.name ].value = self.classifications_per_second.eps() - camera = frame_data.get("camera") + camera = str(frame_data.get("camera")) if camera not in self.model_config.state_config.cameras: return @@ -283,7 +296,7 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi): logger.debug( f"{self.model_config.name} Ran state classification with probabilities: {probs}" ) - best_id = np.argmax(probs) + best_id = int(np.argmax(probs)) score = round(probs[best_id], 2) self.__update_metrics(datetime.datetime.now().timestamp() - now) @@ -319,7 +332,9 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi): verified_state, ) - def handle_request(self, topic, request_data): + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: if topic == EmbeddingsRequestEnum.reload_classification_model.value: if request_data.get("model_name") == self.model_config.name: self.__build_detector() @@ -335,7 +350,7 @@ class CustomStateClassificationProcessor(RealTimeProcessorApi): else: return None - def expire_object(self, object_id, camera): + def expire_object(self, object_id: str, camera: str) -> None: pass @@ -350,13 +365,17 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): ): super().__init__(config, metrics) self.model_config = model_config + + if not self.model_config.name: + raise ValueError("Custom classification model name must be set.") + self.model_dir = os.path.join(MODEL_CACHE_DIR, self.model_config.name) self.train_dir = os.path.join(CLIPS_DIR, self.model_config.name, "train") - self.interpreter: Interpreter = None + self.interpreter: Interpreter | None = None self.sub_label_publisher = sub_label_publisher self.requestor = requestor - self.tensor_input_details: dict[str, Any] | None = None - self.tensor_output_details: dict[str, Any] | None = None + self.tensor_input_details: list[dict[str, Any]] | None = None + self.tensor_output_details: list[dict[str, Any]] | None = None self.classification_history: dict[str, list[tuple[str, float, float]]] = {} self.labelmap: dict[int, str] = {} self.classifications_per_second = EventsPerSecond() @@ -365,7 +384,7 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): self.metrics and self.model_config.name in self.metrics.classification_speeds ): - self.inference_speed = InferenceSpeed( + self.inference_speed: InferenceSpeed | None = InferenceSpeed( self.metrics.classification_speeds[self.model_config.name] ) else: @@ -431,8 +450,8 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): ) return None, 0.0 - label_counts = {} - label_scores = {} + label_counts: dict[str, int] = {} + label_scores: dict[str, list[float]] = {} total_attempts = len(history) for label, score, timestamp in history: @@ -443,7 +462,7 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): label_counts[label] += 1 label_scores[label].append(score) - best_label = max(label_counts, key=label_counts.get) + best_label = max(label_counts, key=lambda k: label_counts[k]) best_count = label_counts[best_label] consensus_threshold = total_attempts * 0.6 @@ -470,7 +489,15 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): ) return best_label, avg_score - def process_frame(self, obj_data, frame): + def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None: + if ( + not self.model_config.name + or not self.model_config.object_config + or not self.tensor_input_details + or not self.tensor_output_details + ): + return + if self.metrics and self.model_config.name in self.metrics.classification_cps: self.metrics.classification_cps[ self.model_config.name @@ -555,7 +582,7 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): logger.debug( f"{self.model_config.name} Ran object classification with probabilities: {probs}" ) - best_id = np.argmax(probs) + best_id = int(np.argmax(probs)) score = round(probs[best_id], 2) self.__update_metrics(datetime.datetime.now().timestamp() - now) @@ -650,7 +677,7 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): ), ) - def handle_request(self, topic, request_data): + def handle_request(self, topic: str, request_data: dict) -> dict | None: if topic == EmbeddingsRequestEnum.reload_classification_model.value: if request_data.get("model_name") == self.model_config.name: self.__build_detector() @@ -666,12 +693,11 @@ class CustomObjectClassificationProcessor(RealTimeProcessorApi): else: return None - def expire_object(self, object_id, camera): + def expire_object(self, object_id: str, camera: str) -> None: if object_id in self.classification_history: self.classification_history.pop(object_id) -@staticmethod def write_classification_attempt( folder: str, frame: np.ndarray, diff --git a/frigate/data_processing/real_time/face.py b/frigate/data_processing/real_time/face.py index d886a86e5..c6b6346b5 100644 --- a/frigate/data_processing/real_time/face.py +++ b/frigate/data_processing/real_time/face.py @@ -52,11 +52,11 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.face_config = config.face_recognition self.requestor = requestor self.sub_label_publisher = sub_label_publisher - self.face_detector: cv2.FaceDetectorYN = None + self.face_detector: cv2.FaceDetectorYN | None = None self.requires_face_detection = "face" not in self.config.objects.all_objects self.person_face_history: dict[str, list[tuple[str, float, int]]] = {} self.camera_current_people: dict[str, list[str]] = {} - self.recognizer: FaceRecognizer | None = None + self.recognizer: FaceRecognizer self.faces_per_second = EventsPerSecond() self.inference_speed = InferenceSpeed(self.metrics.face_rec_speed) @@ -78,7 +78,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.downloader = ModelDownloader( model_name="facedet", download_path=download_path, - file_names=self.model_files.keys(), + file_names=list(self.model_files.keys()), download_func=self.__download_models, complete_func=self.__build_detector, ) @@ -134,7 +134,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): def __detect_face( self, input: np.ndarray, threshold: float - ) -> tuple[int, int, int, int]: + ) -> tuple[int, int, int, int] | None: """Detect faces in input image.""" if not self.face_detector: return None @@ -153,7 +153,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): faces = self.face_detector.detect(input) if faces is None or faces[1] is None: - return None + return None # type: ignore[unreachable] face = None @@ -168,7 +168,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): h: int = int(raw_bbox[3] / scale_factor) bbox = (x, y, x + w, y + h) - if face is None or area(bbox) > area(face): + if face is None or area(bbox) > area(face): # type: ignore[unreachable] face = bbox return face @@ -177,7 +177,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.faces_per_second.update() self.inference_speed.update(duration) - def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray): + def process_frame(self, obj_data: dict[str, Any], frame: np.ndarray) -> None: """Look for faces in image.""" self.metrics.face_rec_fps.value = self.faces_per_second.eps() camera = obj_data["camera"] @@ -349,7 +349,9 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.__update_metrics(datetime.datetime.now().timestamp() - start) - def handle_request(self, topic, request_data) -> dict[str, Any] | None: + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: if topic == EmbeddingsRequestEnum.clear_face_classifier.value: self.recognizer.clear() return {"success": True, "message": "Face classifier cleared."} @@ -432,7 +434,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): img = cv2.imread(current_file) if img is None: - return { + return { # type: ignore[unreachable] "message": "Invalid image file.", "success": False, } @@ -469,7 +471,9 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): "score": score, } - def expire_object(self, object_id: str, camera: str): + return None + + def expire_object(self, object_id: str, camera: str) -> None: if object_id in self.person_face_history: self.person_face_history.pop(object_id) @@ -478,7 +482,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): def weighted_average( self, results_list: list[tuple[str, float, int]], max_weight: int = 4000 - ): + ) -> tuple[str | None, float]: """ Calculates a robust weighted average, capping the area weight and giving more weight to higher scores. @@ -493,8 +497,8 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): return None, 0.0 counts: dict[str, int] = {} - weighted_scores: dict[str, int] = {} - total_weights: dict[str, int] = {} + weighted_scores: dict[str, float] = {} + total_weights: dict[str, float] = {} for name, score, face_area in results_list: if name == "unknown": @@ -509,7 +513,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): counts[name] += 1 # Capped weight based on face area - weight = min(face_area, max_weight) + weight: float = min(face_area, max_weight) # Score-based weighting (higher scores get more weight) weight *= (score - self.face_config.unknown_score) * 10 @@ -519,7 +523,7 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): if not weighted_scores: return None, 0.0 - best_name = max(weighted_scores, key=weighted_scores.get) + best_name = max(weighted_scores, key=lambda k: weighted_scores[k]) # If the number of faces for this person < min_faces, we are not confident it is a correct result if counts[best_name] < self.face_config.min_faces: diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index 298989c82..c2ea28b23 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -61,14 +61,16 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess self, obj_data: dict[str, Any], frame: np.ndarray, - dedicated_lpr: bool | None = False, - ): + dedicated_lpr: bool = False, + ) -> None: """Look for license plates in image.""" self.lpr_process(obj_data, frame, dedicated_lpr) - def handle_request(self, topic, request_data) -> dict[str, Any] | None: - return + def handle_request( + self, topic: str, request_data: dict[str, Any] + ) -> dict[str, Any] | None: + return None - def expire_object(self, object_id: str, camera: str): + def expire_object(self, object_id: str, camera: str) -> None: """Expire lpr objects.""" self.lpr_expire(object_id, camera) diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index 263a8b987..5cd1f5008 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -1,8 +1,10 @@ """Embeddings types.""" +from __future__ import annotations + from enum import Enum -from multiprocessing.managers import SyncManager -from multiprocessing.sharedctypes import Synchronized +from multiprocessing.managers import DictProxy, SyncManager, ValueProxy +from typing import Any import sherpa_onnx @@ -10,22 +12,22 @@ from frigate.data_processing.real_time.whisper_online import FasterWhisperASR class DataProcessorMetrics: - image_embeddings_speed: Synchronized - image_embeddings_eps: Synchronized - text_embeddings_speed: Synchronized - text_embeddings_eps: Synchronized - face_rec_speed: Synchronized - face_rec_fps: Synchronized - alpr_speed: Synchronized - alpr_pps: Synchronized - yolov9_lpr_speed: Synchronized - yolov9_lpr_pps: Synchronized - review_desc_speed: Synchronized - review_desc_dps: Synchronized - object_desc_speed: Synchronized - object_desc_dps: Synchronized - classification_speeds: dict[str, Synchronized] - classification_cps: dict[str, Synchronized] + image_embeddings_speed: ValueProxy[float] + image_embeddings_eps: ValueProxy[float] + text_embeddings_speed: ValueProxy[float] + text_embeddings_eps: ValueProxy[float] + face_rec_speed: ValueProxy[float] + face_rec_fps: ValueProxy[float] + alpr_speed: ValueProxy[float] + alpr_pps: ValueProxy[float] + yolov9_lpr_speed: ValueProxy[float] + yolov9_lpr_pps: ValueProxy[float] + review_desc_speed: ValueProxy[float] + review_desc_dps: ValueProxy[float] + object_desc_speed: ValueProxy[float] + object_desc_dps: ValueProxy[float] + classification_speeds: DictProxy[str, ValueProxy[float]] + classification_cps: DictProxy[str, ValueProxy[float]] def __init__(self, manager: SyncManager, custom_classification_models: list[str]): self.image_embeddings_speed = manager.Value("d", 0.0) @@ -52,7 +54,7 @@ class DataProcessorMetrics: class DataProcessorModelRunner: - def __init__(self, requestor, device: str = "CPU", model_size: str = "large"): + def __init__(self, requestor: Any, device: str = "CPU", model_size: str = "large"): self.requestor = requestor self.device = device self.model_size = model_size diff --git a/frigate/db/sqlitevecq.py b/frigate/db/sqlitevecq.py index aa4928e84..a72e99b6a 100644 --- a/frigate/db/sqlitevecq.py +++ b/frigate/db/sqlitevecq.py @@ -1,18 +1,21 @@ import re import sqlite3 +from typing import Any from playhouse.sqliteq import SqliteQueueDatabase class SqliteVecQueueDatabase(SqliteQueueDatabase): - def __init__(self, *args, load_vec_extension: bool = False, **kwargs) -> None: + def __init__( + self, *args: Any, load_vec_extension: bool = False, **kwargs: Any + ) -> None: self.load_vec_extension: bool = load_vec_extension # no extension necessary, sqlite will load correctly for each platform self.sqlite_vec_path = "/usr/local/lib/vec0" super().__init__(*args, **kwargs) - def _connect(self, *args, **kwargs) -> sqlite3.Connection: - conn: sqlite3.Connection = super()._connect(*args, **kwargs) + def _connect(self, *args: Any, **kwargs: Any) -> sqlite3.Connection: + conn: sqlite3.Connection = super()._connect(*args, **kwargs) # type: ignore[misc] if self.load_vec_extension: self._load_vec_extension(conn) @@ -27,7 +30,7 @@ class SqliteVecQueueDatabase(SqliteQueueDatabase): conn.enable_load_extension(False) def _register_regexp(self, conn: sqlite3.Connection) -> None: - def regexp(expr: str, item: str) -> bool: + def regexp(expr: str, item: str | None) -> bool: if item is None: return False try: diff --git a/frigate/events/audio.py b/frigate/events/audio.py index 505874469..f6c41fa30 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -2,17 +2,19 @@ import datetime import logging +import subprocess import threading import time from multiprocessing.managers import DictProxy from multiprocessing.synchronize import Event as MpEvent -from typing import Tuple +from typing import Any, Tuple import numpy as np from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor -from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig +from frigate.config import CameraConfig, CameraInput, FrigateConfig +from frigate.config.camera.ffmpeg import CameraFfmpegConfig from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, @@ -35,8 +37,7 @@ from frigate.data_processing.real_time.audio_transcription import ( ) from frigate.ffmpeg_presets import parse_preset_input from frigate.log import LogPipe, suppress_stderr_during -from frigate.object_detection.base import load_labels -from frigate.util.builtin import get_ffmpeg_arg_list +from frigate.util.builtin import get_ffmpeg_arg_list, load_labels from frigate.util.ffmpeg import start_or_restart_ffmpeg, stop_ffmpeg from frigate.util.process import FrigateProcess @@ -49,7 +50,7 @@ except ModuleNotFoundError: logger = logging.getLogger(__name__) -def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: +def get_ffmpeg_command(ffmpeg: CameraFfmpegConfig) -> list[str]: ffmpeg_input: CameraInput = [i for i in ffmpeg.inputs if "audio" in i.roles][0] input_args = get_ffmpeg_arg_list(ffmpeg.global_args) + ( parse_preset_input(ffmpeg_input.input_args, 1) @@ -102,9 +103,11 @@ class AudioProcessor(FrigateProcess): threading.current_thread().name = "process:audio_manager" if self.config.audio_transcription.enabled: - self.transcription_model_runner = AudioTranscriptionModelRunner( - self.config.audio_transcription.device, - self.config.audio_transcription.model_size, + self.transcription_model_runner: AudioTranscriptionModelRunner | None = ( + AudioTranscriptionModelRunner( + self.config.audio_transcription.device or "AUTO", + self.config.audio_transcription.model_size, + ) ) else: self.transcription_model_runner = None @@ -118,7 +121,7 @@ class AudioProcessor(FrigateProcess): self.config, self.camera_metrics, self.transcription_model_runner, - self.stop_event, + self.stop_event, # type: ignore[arg-type] ) audio_threads.append(audio_thread) audio_thread.start() @@ -162,7 +165,7 @@ class AudioEventMaintainer(threading.Thread): self.logger = logging.getLogger(f"audio.{self.camera_config.name}") self.ffmpeg_cmd = get_ffmpeg_command(self.camera_config.ffmpeg) self.logpipe = LogPipe(f"ffmpeg.{self.camera_config.name}.audio") - self.audio_listener = None + self.audio_listener: subprocess.Popen[Any] | None = None self.audio_transcription_model_runner = audio_transcription_model_runner self.transcription_processor = None self.transcription_thread = None @@ -171,7 +174,7 @@ class AudioEventMaintainer(threading.Thread): self.requestor = InterProcessRequestor() self.config_subscriber = CameraConfigUpdateSubscriber( None, - {self.camera_config.name: self.camera_config}, + {str(self.camera_config.name): self.camera_config}, [ CameraConfigUpdateEnum.audio, CameraConfigUpdateEnum.enabled, @@ -180,7 +183,10 @@ class AudioEventMaintainer(threading.Thread): ) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio.value) - if self.config.audio_transcription.enabled: + if ( + self.config.audio_transcription.enabled + and self.audio_transcription_model_runner is not None + ): # init the transcription processor for this camera self.transcription_processor = AudioTranscriptionRealTimeProcessor( config=self.config, @@ -200,11 +206,11 @@ class AudioEventMaintainer(threading.Thread): self.was_enabled = camera.enabled - def detect_audio(self, audio) -> None: + def detect_audio(self, audio: np.ndarray) -> None: if not self.camera_config.audio.enabled or self.stop_event.is_set(): return - audio_as_float = audio.astype(np.float32) + audio_as_float: np.ndarray = audio.astype(np.float32) rms, dBFS = self.calculate_audio_levels(audio_as_float) self.camera_metrics[self.camera_config.name].audio_rms.value = rms @@ -261,7 +267,7 @@ class AudioEventMaintainer(threading.Thread): else: self.transcription_processor.check_unload_model() - def calculate_audio_levels(self, audio_as_float: np.float32) -> Tuple[float, float]: + def calculate_audio_levels(self, audio_as_float: np.ndarray) -> Tuple[float, float]: # Calculate RMS (Root-Mean-Square) which represents the average signal amplitude # Note: np.float32 isn't serializable, we must use np.float64 to publish the message rms = np.sqrt(np.mean(np.absolute(np.square(audio_as_float)))) @@ -296,6 +302,10 @@ class AudioEventMaintainer(threading.Thread): self.logpipe.dump() self.start_or_restart_ffmpeg() + if self.audio_listener is None or self.audio_listener.stdout is None: + log_and_restart() + return + try: chunk = self.audio_listener.stdout.read(self.chunk_size) @@ -341,7 +351,10 @@ class AudioEventMaintainer(threading.Thread): self.requestor.send_data( EXPIRE_AUDIO_ACTIVITY, self.camera_config.name ) - stop_ffmpeg(self.audio_listener, self.logger) + + if self.audio_listener: + stop_ffmpeg(self.audio_listener, self.logger) + self.audio_listener = None self.was_enabled = enabled continue @@ -367,7 +380,7 @@ class AudioEventMaintainer(threading.Thread): class AudioTfl: - def __init__(self, stop_event: threading.Event, num_threads=2): + def __init__(self, stop_event: threading.Event, num_threads: int = 2) -> None: self.stop_event = stop_event self.num_threads = num_threads self.labels = load_labels("/audio-labelmap.txt", prefill=521) @@ -382,7 +395,7 @@ class AudioTfl: self.tensor_input_details = self.interpreter.get_input_details() self.tensor_output_details = self.interpreter.get_output_details() - def _detect_raw(self, tensor_input): + def _detect_raw(self, tensor_input: np.ndarray) -> np.ndarray: self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.invoke() detections = np.zeros((20, 6), np.float32) @@ -410,8 +423,10 @@ class AudioTfl: return detections - def detect(self, tensor_input, threshold=AUDIO_MIN_CONFIDENCE): - detections = [] + def detect( + self, tensor_input: np.ndarray, threshold: float = AUDIO_MIN_CONFIDENCE + ) -> list[tuple[str, float, tuple[float, float, float, float]]]: + detections: list[tuple[str, float, tuple[float, float, float, float]]] = [] if self.stop_event.is_set(): return detections diff --git a/frigate/events/cleanup.py b/frigate/events/cleanup.py index 263c5f18e..b867bf947 100644 --- a/frigate/events/cleanup.py +++ b/frigate/events/cleanup.py @@ -29,7 +29,7 @@ class EventCleanup(threading.Thread): self.stop_event = stop_event self.db = db self.camera_keys = list(self.config.cameras.keys()) - self.removed_camera_labels: list[str] = None + self.removed_camera_labels: list[Event] | None = None self.camera_labels: dict[str, dict[str, Any]] = {} def get_removed_camera_labels(self) -> list[Event]: @@ -37,7 +37,7 @@ class EventCleanup(threading.Thread): if self.removed_camera_labels is None: self.removed_camera_labels = list( Event.select(Event.label) - .where(Event.camera.not_in(self.camera_keys)) + .where(Event.camera.not_in(self.camera_keys)) # type: ignore[arg-type,call-arg,misc] .distinct() .execute() ) @@ -61,7 +61,7 @@ class EventCleanup(threading.Thread): ), } - return self.camera_labels[camera]["labels"] + return self.camera_labels[camera]["labels"] # type: ignore[no-any-return] def expire_snapshots(self) -> list[str]: ## Expire events from unlisted cameras based on the global config @@ -74,7 +74,9 @@ class EventCleanup(threading.Thread): # loop over object types in db for event in distinct_labels: # get expiration time for this label - expire_days = retain_config.objects.get(event.label, retain_config.default) + expire_days = retain_config.objects.get( + str(event.label), retain_config.default + ) expire_after = ( datetime.datetime.now() - datetime.timedelta(days=expire_days) @@ -87,7 +89,7 @@ class EventCleanup(threading.Thread): Event.thumbnail, ) .where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.label == event.label, Event.retain_indefinitely == False, @@ -109,16 +111,16 @@ class EventCleanup(threading.Thread): # update the clips attribute for the db entry query = Event.select(Event.id).where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.label == event.label, Event.retain_indefinitely == False, ) - events_to_update = [] + events_to_update: list[str] = [] for event in query.iterator(): - events_to_update.append(event.id) + events_to_update.append(str(event.id)) if len(events_to_update) >= CHUNK_SIZE: logger.debug( f"Updating {update_params} for {len(events_to_update)} events" @@ -150,7 +152,7 @@ class EventCleanup(threading.Thread): for event in distinct_labels: # get expiration time for this label expire_days = retain_config.objects.get( - event.label, retain_config.default + str(event.label), retain_config.default ) expire_after = ( @@ -177,7 +179,7 @@ class EventCleanup(threading.Thread): # only snapshots are stored in /clips # so no need to delete mp4 files for event in expired_events: - events_to_update.append(event.id) + events_to_update.append(str(event.id)) deleted = delete_event_snapshot(event) if not deleted: @@ -214,7 +216,7 @@ class EventCleanup(threading.Thread): Event.camera, ) .where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.retain_indefinitely == False, ) @@ -245,7 +247,7 @@ class EventCleanup(threading.Thread): # update the clips attribute for the db entry query = Event.select(Event.id).where( - Event.camera.not_in(self.camera_keys), + Event.camera.not_in(self.camera_keys), # type: ignore[arg-type,call-arg,misc] Event.start_time < expire_after, Event.retain_indefinitely == False, ) @@ -358,7 +360,7 @@ class EventCleanup(threading.Thread): logger.debug(f"Found {len(events_to_delete)} events that can be expired") if len(events_to_delete) > 0: - ids_to_delete = [e.id for e in events_to_delete] + ids_to_delete = [str(e.id) for e in events_to_delete] for i in range(0, len(ids_to_delete), CHUNK_SIZE): chunk = ids_to_delete[i : i + CHUNK_SIZE] logger.debug(f"Deleting {len(chunk)} events from the database") diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index 6a8da45b2..80bdaccd3 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -2,7 +2,7 @@ import logging import threading from multiprocessing import Queue from multiprocessing.synchronize import Event as MpEvent -from typing import Dict +from typing import Any, Dict from frigate.comms.events_updater import EventEndPublisher, EventUpdateSubscriber from frigate.config import FrigateConfig @@ -15,7 +15,7 @@ from frigate.util.builtin import to_relative_box logger = logging.getLogger(__name__) -def should_update_db(prev_event: Event, current_event: Event) -> bool: +def should_update_db(prev_event: dict[str, Any], current_event: dict[str, Any]) -> bool: """If current_event has updated fields and (clip or snapshot).""" # If event is ending and was previously saved, always update to set end_time # This ensures events are properly ended even when alerts/detections are disabled @@ -47,7 +47,9 @@ def should_update_db(prev_event: Event, current_event: Event) -> bool: return False -def should_update_state(prev_event: Event, current_event: Event) -> bool: +def should_update_state( + prev_event: dict[str, Any], current_event: dict[str, Any] +) -> bool: """If current event should update state, but not necessarily update the db.""" if prev_event["stationary"] != current_event["stationary"]: return True @@ -74,7 +76,7 @@ class EventProcessor(threading.Thread): super().__init__(name="event_processor") self.config = config self.timeline_queue = timeline_queue - self.events_in_process: Dict[str, Event] = {} + self.events_in_process: Dict[str, dict[str, Any]] = {} self.stop_event = stop_event self.event_receiver = EventUpdateSubscriber() @@ -92,7 +94,7 @@ class EventProcessor(threading.Thread): if update == None: continue - source_type, event_type, camera, _, event_data = update + source_type, event_type, camera, _, event_data = update # type: ignore[misc] logger.debug( f"Event received: {source_type} {event_type} {camera} {event_data['id']}" @@ -140,7 +142,7 @@ class EventProcessor(threading.Thread): self, event_type: str, camera: str, - event_data: Event, + event_data: dict[str, Any], ) -> None: """handle tracked object event updates.""" updated_db = False @@ -150,8 +152,13 @@ class EventProcessor(threading.Thread): camera_config = self.config.cameras.get(camera) if camera_config is None: return + width = camera_config.detect.width height = camera_config.detect.height + + if width is None or height is None: + return + first_detector = list(self.config.detectors.values())[0] start_time = event_data["start_time"] @@ -222,8 +229,12 @@ class EventProcessor(threading.Thread): Event.thumbnail: event_data.get("thumbnail"), Event.has_clip: event_data["has_clip"], Event.has_snapshot: event_data["has_snapshot"], - Event.model_hash: first_detector.model.model_hash, - Event.model_type: first_detector.model.model_type, + Event.model_hash: first_detector.model.model_hash + if first_detector.model + else None, + Event.model_type: first_detector.model.model_type + if first_detector.model + else None, Event.detector_type: first_detector.type, Event.data: { "box": box, @@ -287,10 +298,10 @@ class EventProcessor(threading.Thread): if event_type == EventStateEnum.end: del self.events_in_process[event_data["id"]] - self.event_end_publisher.publish((event_data["id"], camera, updated_db)) + self.event_end_publisher.publish((event_data["id"], camera, updated_db)) # type: ignore[arg-type] def handle_external_detection( - self, event_type: EventStateEnum, event_data: Event + self, event_type: EventStateEnum, event_data: dict[str, Any] ) -> None: # Skip replay cameras if event_data.get("camera", "").startswith(REPLAY_CAMERA_PREFIX): diff --git a/frigate/mypy.ini b/frigate/mypy.ini index cde4a3fe6..3c643236f 100644 --- a/frigate/mypy.ini +++ b/frigate/mypy.ini @@ -24,18 +24,18 @@ no_implicit_reexport = true [mypy-frigate.*] ignore_errors = false +# Third-party code imported from https://github.com/ufal/whisper_streaming +[mypy-frigate.data_processing.real_time.whisper_online] +ignore_errors = true + +# TODO: Remove ignores for these modules as they are updated with type annotations. + [mypy-frigate.api.*] ignore_errors = true [mypy-frigate.config.*] ignore_errors = true -[mypy-frigate.data_processing.*] -ignore_errors = true - -[mypy-frigate.db.*] -ignore_errors = true - [mypy-frigate.debug_replay] ignore_errors = true @@ -45,9 +45,6 @@ ignore_errors = true [mypy-frigate.embeddings.*] ignore_errors = true -[mypy-frigate.events.*] -ignore_errors = true - [mypy-frigate.http] ignore_errors = true diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 42aa18c0a..bd45a4a1f 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -12,7 +12,7 @@ import shlex import struct import urllib.parse from collections.abc import Mapping -from multiprocessing.sharedctypes import Synchronized +from multiprocessing.managers import ValueProxy from pathlib import Path from typing import Any, Dict, Optional, Tuple, Union @@ -64,7 +64,7 @@ class EventsPerSecond: class InferenceSpeed: - def __init__(self, metric: Synchronized) -> None: + def __init__(self, metric: ValueProxy[float]) -> None: self.__metric = metric self.__initialized = False