diff --git a/frigate/app.py b/frigate/app.py index d3bae31b6..9ccc3a0f9 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -13,14 +13,16 @@ from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase -from frigate.config import DetectorTypeEnum, FrigateConfig +from frigate.communication.dispatcher import Communicator, Dispatcher +from frigate.communication.mqtt import MqttClient +from frigate.communication.ws import WebSocketClient +from frigate.config import FrigateConfig from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.object_detection import ObjectDetectProcess from frigate.events import EventCleanup, EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer from frigate.models import Event, Recordings -from frigate.mqtt import FrigateMqttClient, MqttSocketRelay from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.plus import PlusApi @@ -168,14 +170,16 @@ class FrigateApp: self.restream = RestreamApi(self.config) self.restream.add_cameras() - def init_mqtt(self) -> None: - self.mqtt_client = FrigateMqttClient(self.config, self.camera_metrics) + def init_dispatcher(self) -> None: + comms: list[Communicator] = [] - def start_mqtt_relay(self) -> None: - self.mqtt_relay = MqttSocketRelay( - self.mqtt_client, self.config.mqtt.topic_prefix - ) - self.mqtt_relay.start() + if self.config.mqtt.enabled: + comms.append(MqttClient(self.config)) + + self.ws_client = WebSocketClient(self.config) + self.ws_client.start() + comms.append(self.ws_client) + self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) def start_detectors(self) -> None: for name in self.config.cameras.keys(): @@ -214,7 +218,7 @@ class FrigateApp: def start_detected_frames_processor(self) -> None: self.detected_frames_processor = TrackedObjectProcessor( self.config, - self.mqtt_client, + self.dispatcher, self.config.mqtt.topic_prefix, self.detected_frames_queue, self.event_queue, @@ -312,7 +316,7 @@ class FrigateApp: self.stats_emitter = StatsEmitter( self.config, self.stats_tracking, - self.mqtt_client, + self.dispatcher, self.config.mqtt.topic_prefix, self.stop_event, ) @@ -350,7 +354,7 @@ class FrigateApp: self.set_log_levels() self.init_queues() self.init_database() - self.init_mqtt() + self.init_dispatcher except Exception as e: print(e) self.log_process.terminate() @@ -363,7 +367,6 @@ class FrigateApp: self.start_camera_capture_processes() self.init_stats() self.init_web_server() - self.start_mqtt_relay() self.start_event_processor() self.start_event_cleanup() self.start_recording_maintainer() @@ -390,7 +393,7 @@ class FrigateApp: logger.info(f"Stopping...") self.stop_event.set() - self.mqtt_relay.stop() + self.ws_client.stop() self.detected_frames_processor.join() self.event_processor.join() self.event_cleanup.join() diff --git a/frigate/communication/mqtt.py b/frigate/communication/mqtt.py index 1a979b4d2..37f89b729 100644 --- a/frigate/communication/mqtt.py +++ b/frigate/communication/mqtt.py @@ -11,15 +11,12 @@ from frigate.types import CameraMetricsTypes logger = logging.getLogger(__name__) -class FrigateMqttClient(Communicator): +class MqttClient(Communicator): """Frigate wrapper for mqtt client.""" - def __init__( - self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes] - ) -> None: + def __init__(self, config: FrigateConfig) -> None: self.config = config self.mqtt_config = config.mqtt - self.camera_metrics = camera_metrics self.connected: bool = False self._start() diff --git a/frigate/communication/ws.py b/frigate/communication/ws.py index 8a184f443..3c5c0f99a 100644 --- a/frigate/communication/ws.py +++ b/frigate/communication/ws.py @@ -14,11 +14,12 @@ from ws4py.server.wsgiutils import WebSocketWSGIApplication from ws4py.websocket import WebSocket from frigate.communication.dispatcher import Communicator +from frigate.config import FrigateConfig logger = logging.getLogger(__name__) -class WebSocketHandler(WebSocket): +class _WebSocketHandler(WebSocket): def received_message(self, message): try: json_message = json.loads(message.data.decode("utf-8")) @@ -44,8 +45,8 @@ class WebSocketHandler(WebSocket): class WebSocketClient(Communicator): """Frigate wrapper for ws client.""" - def __init__(self) -> None: - pass + def __init__(self, config: FrigateConfig) -> None: + self.config = config def start(self): @@ -56,7 +57,7 @@ class WebSocketClient(Communicator): 5002, server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=WebSocketHandler), + app=WebSocketWSGIApplication(handler_cls=_WebSocketHandler), ) self.websocket_server.initialize_websockets_manager() self.websocket_thread = threading.Thread( diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 575e697e5..5fe5e387b 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -12,6 +12,7 @@ from typing import Callable import cv2 import numpy as np +from frigate.communication.dispatcher import Dispatcher from frigate.config import ( CameraConfig, MqttConfig, @@ -20,7 +21,6 @@ from frigate.config import ( FrigateConfig, ) from frigate.const import CLIPS_DIR -from frigate.mqtt import FrigateMqttClient from frigate.util import ( SharedMemoryFrameManager, calculate_region, @@ -633,7 +633,7 @@ class TrackedObjectProcessor(threading.Thread): def __init__( self, config: FrigateConfig, - client: FrigateMqttClient, + dispatcher: Dispatcher, topic_prefix, tracked_objects_queue, event_queue, @@ -645,7 +645,7 @@ class TrackedObjectProcessor(threading.Thread): threading.Thread.__init__(self) self.name = "detected_frames_processor" self.config = config - self.client = client + self.dispatcher = dispatcher self.topic_prefix = topic_prefix self.tracked_objects_queue = tracked_objects_queue self.event_queue = event_queue @@ -669,7 +669,7 @@ class TrackedObjectProcessor(threading.Thread): "after": after, "type": "new" if obj.previous["false_positive"] else "update", } - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/events", json.dumps(message), retain=False ) obj.previous = after @@ -724,7 +724,7 @@ class TrackedObjectProcessor(threading.Thread): "after": obj.to_dict(), "type": "end", } - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/events", json.dumps(message), retain=False ) @@ -746,14 +746,14 @@ class TrackedObjectProcessor(threading.Thread): f"Unable to send mqtt snapshot for {obj.obj_data['id']}." ) else: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/{obj.obj_data['label']}/snapshot", jpg_bytes, retain=True, ) def object_status(camera, object_name, status): - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False ) @@ -853,7 +853,7 @@ class TrackedObjectProcessor(threading.Thread): if motion_boxes: # only send ON if motion isn't already active if self.last_motion_detected.get(camera, 0) == 0: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/motion", "ON", retain=False, @@ -866,7 +866,7 @@ class TrackedObjectProcessor(threading.Thread): # If no motion, make sure the off_delay has passed if frame_time - self.last_motion_detected.get(camera, 0) >= mqtt_delay: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{camera}/motion", "OFF", retain=False, @@ -962,7 +962,7 @@ class TrackedObjectProcessor(threading.Thread): ) new_count = sum(zone_label.values()) if new_count != current_count: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/{label}", new_count, retain=False, @@ -975,7 +975,7 @@ class TrackedObjectProcessor(threading.Thread): else: if label in obj_counter: zone_label[camera] = obj_counter[label] - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/{label}", obj_counter[label], retain=False, @@ -992,7 +992,7 @@ class TrackedObjectProcessor(threading.Thread): new_count = sum(zone_label.values()) if new_count != current_count: - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/all", new_count, retain=False, @@ -1000,7 +1000,7 @@ class TrackedObjectProcessor(threading.Thread): # if this is a new zone all label for this camera else: zone_label[camera] = total_label_count - self.client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/{zone}/all", total_label_count, retain=False, diff --git a/frigate/stats.py b/frigate/stats.py index e8b278e27..e45cbf72d 100644 --- a/frigate/stats.py +++ b/frigate/stats.py @@ -9,9 +9,9 @@ import requests from typing import Optional, Any from multiprocessing.synchronize import Event as MpEvent +from frigate.communication.dispatcher import Dispatcher from frigate.config import FrigateConfig from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR -from frigate.mqtt import FrigateMqttClient from frigate.types import StatsTrackingTypes, CameraMetricsTypes from frigate.version import VERSION from frigate.util import get_cpu_stats @@ -146,7 +146,7 @@ class StatsEmitter(threading.Thread): self, config: FrigateConfig, stats_tracking: StatsTrackingTypes, - mqtt_client: FrigateMqttClient, + dispatcher: Dispatcher, topic_prefix: str, stop_event: MpEvent, ): @@ -154,7 +154,7 @@ class StatsEmitter(threading.Thread): self.name = "frigate_stats_emitter" self.config = config self.stats_tracking = stats_tracking - self.mqtt_client = mqtt_client + self.dispatcher = dispatcher self.topic_prefix = topic_prefix self.stop_event = stop_event @@ -162,7 +162,7 @@ class StatsEmitter(threading.Thread): time.sleep(10) while not self.stop_event.wait(self.config.mqtt.stats_interval): stats = stats_snapshot(self.stats_tracking) - self.mqtt_client.publish( + self.dispatcher.publish( f"{self.topic_prefix}/stats", json.dumps(stats), retain=False ) logger.info(f"Exiting watchdog...")