diff --git a/frigate/app.py b/frigate/app.py index 9ccc3a0f9..0084c61a7 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -177,7 +177,6 @@ class FrigateApp: comms.append(MqttClient(self.config)) self.ws_client = WebSocketClient(self.config) - self.ws_client.start() comms.append(self.ws_client) self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms) @@ -354,7 +353,7 @@ class FrigateApp: self.set_log_levels() self.init_queues() self.init_database() - self.init_dispatcher + self.init_dispatcher() except Exception as e: print(e) self.log_process.terminate() diff --git a/frigate/communication/mqtt.py b/frigate/communication/mqtt.py index 37f89b729..502054d14 100644 --- a/frigate/communication/mqtt.py +++ b/frigate/communication/mqtt.py @@ -56,7 +56,7 @@ class MqttClient(Communicator): ) self.client.message_callback_add( - f"{self.mqtt_config.topic_prefix}/restart", self.on_restart_command + f"{self.mqtt_config.topic_prefix}/restart", self.on_mqtt_command ) def publish(self, topic: str, payload, retain: bool = False) -> None: diff --git a/frigate/communication/ws.py b/frigate/communication/ws.py index 3c5c0f99a..561460fff 100644 --- a/frigate/communication/ws.py +++ b/frigate/communication/ws.py @@ -19,36 +19,42 @@ from frigate.config import FrigateConfig logger = logging.getLogger(__name__) -class _WebSocketHandler(WebSocket): - def received_message(self, message): - try: - json_message = json.loads(message.data.decode("utf-8")) - json_message = { - "topic": f"{self.topic_prefix}/{json_message['topic']}", - "payload": json_message.get("payload"), - "retain": json_message.get("retain", False), - } - except Exception as e: - logger.warning("Unable to parse websocket message as valid json.") - return - - logger.debug( - f"Publishing mqtt message from websockets at {json_message['topic']}." - ) - self.publish( - json_message["topic"], - json_message["payload"], - retain=json_message["retain"], - ) - - class WebSocketClient(Communicator): """Frigate wrapper for ws client.""" def __init__(self, config: FrigateConfig) -> None: self.config = config + def subscribe(self, receiver) -> None: + self._dispatcher = receiver + self.start() + def start(self): + """Start the websocket client.""" + + class _WebSocketHandler(WebSocket): + receiver = self._dispatcher + + def received_message(self, message): + try: + json_message = json.loads(message.data.decode("utf-8")) + json_message = { + "topic": f"{self.topic_prefix}/{json_message['topic']}", + "payload": json_message.get("payload"), + "retain": json_message.get("retain", False), + } + except Exception as e: + logger.warning("Unable to parse websocket message as valid json.") + return + + logger.debug( + f"Publishing mqtt message from websockets at {json_message['topic']}." + ) + self.receiver( + json_message["topic"], + json_message["payload"], + retain=json_message["retain"], + ) # start a websocket server on 5002 WebSocketWSGIHandler.http_version = "1.1" @@ -65,7 +71,7 @@ class WebSocketClient(Communicator): ) self.websocket_thread.start() - def publish(self, topic: str, payload: str) -> None: + def publish(self, topic: str, payload: str, _) -> None: try: ws_message = json.dumps( {