From 0ca8219fbdaac31e2ccba28ab8836454adeb2f02 Mon Sep 17 00:00:00 2001 From: Nick Mowen Date: Tue, 22 Nov 2022 13:29:28 -0700 Subject: [PATCH] Make ws client conform to communicator --- frigate/communication/ws.py | 106 +++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/frigate/communication/ws.py b/frigate/communication/ws.py index 494c2c942..8a184f443 100644 --- a/frigate/communication/ws.py +++ b/frigate/communication/ws.py @@ -1,42 +1,53 @@ """Websocket communicator.""" +import json +import logging +import threading + +from wsgiref.simple_server import make_server from ws4py.server.wsgirefserver import ( WebSocketWSGIHandler, WebSocketWSGIRequestHandler, WSGIServer, ) +from ws4py.server.wsgiutils import WebSocketWSGIApplication +from ws4py.websocket import WebSocket from frigate.communication.dispatcher import Communicator -class MqttSocketRelay(Communicator): - def __init__(self, topic_prefix: str): - self.topic_prefix = topic_prefix +logger = logging.getLogger(__name__) + + +class WebSocketHandler(WebSocket): + def received_message(self, message): + try: + json_message = json.loads(message.data.decode("utf-8")) + json_message = { + "topic": f"{self.topic_prefix}/{json_message['topic']}", + "payload": json_message.get("payload"), + "retain": json_message.get("retain", False), + } + except Exception as e: + logger.warning("Unable to parse websocket message as valid json.") + return + + logger.debug( + f"Publishing mqtt message from websockets at {json_message['topic']}." + ) + self.publish( + json_message["topic"], + json_message["payload"], + retain=json_message["retain"], + ) + + +class WebSocketClient(Communicator): + """Frigate wrapper for ws client.""" + + def __init__(self) -> None: + pass def start(self): - class MqttWebSocket(WebSocket): - topic_prefix = self.topic_prefix - mqtt_client = self.mqtt_client - - def received_message(self, message): - try: - json_message = json.loads(message.data.decode("utf-8")) - json_message = { - "topic": f"{self.topic_prefix}/{json_message['topic']}", - "payload": json_message.get("payload"), - "retain": json_message.get("retain", False), - } - except Exception as e: - logger.warning("Unable to parse websocket message as valid json.") - return - - logger.debug( - f"Publishing mqtt message from websockets at {json_message['topic']}." - ) - self.mqtt_client.publish( - json_message["topic"], - json_message["payload"], - retain=json_message["retain"], - ) # start a websocket server on 5002 WebSocketWSGIHandler.http_version = "1.1" @@ -45,39 +56,32 @@ class MqttSocketRelay(Communicator): 5002, server_class=WSGIServer, handler_class=WebSocketWSGIRequestHandler, - app=WebSocketWSGIApplication(handler_cls=MqttWebSocket), + app=WebSocketWSGIApplication(handler_cls=WebSocketHandler), ) self.websocket_server.initialize_websockets_manager() self.websocket_thread = threading.Thread( target=self.websocket_server.serve_forever ) - - def send(client, userdata, message): - """Sends mqtt messages to clients.""" - try: - logger.debug(f"Received mqtt message on {message.topic}.") - ws_message = json.dumps( - { - "topic": message.topic.replace(f"{self.topic_prefix}/", ""), - "payload": message.payload.decode(), - } - ) - except Exception as e: - # if the payload can't be decoded don't relay to clients - logger.debug( - f"MQTT payload for {message.topic} wasn't text. Skipping..." - ) - return - - self.websocket_server.manager.broadcast(ws_message) - - self.mqtt_client.add_topic_callback(f"{self.topic_prefix}/#", send) - self.websocket_thread.start() + def publish(self, topic: str, payload: str) -> None: + try: + ws_message = json.dumps( + { + "topic": topic, + "payload": payload, + } + ) + except Exception as e: + # if the payload can't be decoded don't relay to clients + logger.debug(f"payload for {topic} wasn't text. Skipping...") + return + + self.websocket_server.manager.broadcast(ws_message) + def stop(self): self.websocket_server.manager.close_all() self.websocket_server.manager.stop() self.websocket_server.manager.join() self.websocket_server.shutdown() - self.websocket_thread.join() \ No newline at end of file + self.websocket_thread.join()