diff --git a/frigate/app.py b/frigate/app.py index 3417bea0f..8a2ad1c56 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -6,7 +6,7 @@ import os import signal import sys import threading -from typing import Optional +from typing import Optional, Any from types import FrameType import traceback @@ -23,7 +23,7 @@ from frigate.detectors import ObjectDetectProcess from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer -from frigate.majordomo import QueueBroker +from frigate.majordomo import QueueBroker, BrokerWorker from frigate.models import Event, Recordings from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames @@ -186,7 +186,9 @@ class FrigateApp: self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) def start_queue_broker(self) -> None: - def detect_no_shm(worker, service_name, body): + def detect_no_shm( + worker: BrokerWorker, service_name: bytes, body: list[bytes] + ) -> list[bytes]: in_shm = self.detection_shms[str(service_name, "ascii")] tensor_input = in_shm.buf body = body[0:2] + [tensor_input] diff --git a/frigate/config.py b/frigate/config.py index 56d5cead5..0234152a0 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -827,7 +827,7 @@ class FrigateConfig(FrigateBaseModel): server: ServerConfig = Field( default_factory=ServerConfig, title="Server configuration" ) - mqtt: Optional[MqttConfig] = Field(title="MQTT Configuration.") + mqtt: MqttConfig = Field(default={}, title="MQTT Configuration.") database: DatabaseConfig = Field( default_factory=DatabaseConfig, title="Database configuration." ) @@ -872,7 +872,7 @@ class FrigateConfig(FrigateBaseModel): detect: DetectConfig = Field( default_factory=DetectConfig, title="Global object tracking configuration." ) - cameras: Optional[Dict[str, CameraConfig]] = Field(title="Camera configuration.") + cameras: Dict[str, CameraConfig] = Field(default={}, title="Camera configuration.") timestamp_style: TimestampStyleConfig = Field( default_factory=TimestampStyleConfig, title="Global timestamp style configuration.", diff --git a/frigate/detectors/__init__.py b/frigate/detectors/__init__.py index 0015a8eab..eb520da6c 100644 --- a/frigate/detectors/__init__.py +++ b/frigate/detectors/__init__.py @@ -8,3 +8,17 @@ from .detector_config import ( from .detection_client import ObjectDetectionClient from .detector_types import DetectorTypeEnum, api_types, create_detector from .detection_worker import ObjectDetectionWorker, ObjectDetectProcess + +__all__ = [ + "DetectionApi", + "PixelFormatEnum", + "InputTensorEnum", + "ModelConfig", + "BaseDetectorConfig", + "ObjectDetectionClient", + "ObjectDetectionWorker", + "ObjectDetectProcess", + "DetectorTypeEnum", + "api_types", + "create_detector", +] diff --git a/frigate/detectors/detection_worker.py b/frigate/detectors/detection_worker.py index d806cca89..bd3d4f5f7 100644 --- a/frigate/detectors/detection_worker.py +++ b/frigate/detectors/detection_worker.py @@ -183,7 +183,7 @@ class ObjectDetectProcess: self.avg_inference_speed = mp.Value("d", 0.01) self.detection_start = mp.Value("d", 0.0) - self.detect_process = None + self.detect_process: mp.Process self.start_or_restart() diff --git a/frigate/majordomo.py b/frigate/majordomo.py index b842f28ac..813324f46 100644 --- a/frigate/majordomo.py +++ b/frigate/majordomo.py @@ -22,7 +22,7 @@ from majortomo import error, protocol from majortomo.config import DEFAULT_BIND_URL from majortomo.broker import ( Broker, - Worker as BrokerWorker, + Worker as MdpBrokerWorker, ServicesContainer, id_to_int, ) @@ -30,6 +30,17 @@ from majortomo.util import TextOrBytes, text_to_ascii_bytes from majortomo.worker import DEFAULT_ZMQ_LINGER, Worker as MdpWorker +class BrokerWorker(MdpBrokerWorker): + """Worker objects represent a connected / known MDP worker process""" + + def __init__( + self, worker_id: bytes, service: bytes, expire_at: float, next_heartbeat: float + ): + super().__init__(worker_id, service, expire_at, next_heartbeat) + self.request_handler: str + self.request_params: list[str] = [] + + class QueueServicesContainer(ServicesContainer): def __init__(self, busy_workers_timeout=protocol.DEFAULT_BUSY_WORKER_TIMEOUT): super().__init__(busy_workers_timeout) @@ -190,25 +201,6 @@ class QueueBroker(Broker): return body -class MultiBindableBroker: - def __init__( - self, - bind: Union[str, List[str]] = [DEFAULT_BIND_URL], - shms: dict[str, SharedMemory] = {}, - ): - super().__init__(bind) - self.shms = shms - - def on_worker_request( - self, worker: BrokerWorker, service_name: bytes, body: List[bytes] - ) -> List[bytes]: - if "DETECT_NO_SHM" == worker.request_handler: - in_shm = self.shms[str(service_name, "ascii")] - tensor_input = in_shm.buf - body = body[0:2] + [tensor_input] - return body - - class QueueWorker(MdpWorker): def __init__( self,