diff --git a/frigate/mqtt.py b/frigate/mqtt.py index 9e3308583..79fb0713f 100644 --- a/frigate/mqtt.py +++ b/frigate/mqtt.py @@ -1,3 +1,4 @@ +import datetime import json import logging import threading @@ -29,9 +30,10 @@ class FrigateMqttClient: self.mqtt_config = config.mqtt self.camera_metrics: dict[str, CameraMetricsTypes] = camera_metrics self.client: mqtt.Client = None - self.__start() + self.connected: bool = False + self._start() - def set_initial_topics(self) -> None: + def _set_initial_topics(self) -> None: """Set initial state topics.""" for camera_name, camera in self.config.cameras.items(): self.publish( @@ -75,6 +77,10 @@ class FrigateMqttClient: retain=False, ) + self.publish( + self.mqtt_config.topic_prefix + "/available", "online", retain=True + ) + def on_recordings_command( self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage ) -> None: @@ -278,8 +284,9 @@ class FrigateMqttClient: """Callback to restart frigate.""" restart_frigate() - def on_connect(self, client: mqtt.Client, userdata, flags, rc) -> None: + def _on_connect(self, client: mqtt.Client, userdata, flags, rc) -> None: """Mqtt connection callback.""" + self.connected = True threading.current_thread().name = "mqtt" if rc != 0: if rc == 3: @@ -300,14 +307,17 @@ class FrigateMqttClient: logger.debug("MQTT connected") client.subscribe(f"{self.mqtt_config.topic_prefix}/#") - self.publish( - self.mqtt_config.topic_prefix + "/available", "online", retain=True - ) + self._set_initial_topics() - def __start(self) -> None: + def _on_disconnect(self, client: mqtt.Client, userdata, flags, rc) -> None: + """Mqtt disconnection callback.""" + self.connected = False + logger.error("MQTT disconnected") + + def _start(self) -> None: """Start mqtt client.""" self.client = mqtt.Client(client_id=self.mqtt_config.client_id) - self.client.on_connect = self.on_connect + self.client.on_connect = self._on_connect self.client.will_set( self.mqtt_config.topic_prefix + "/available", payload="offline", @@ -369,27 +379,23 @@ class FrigateMqttClient: self.mqtt_config.user, password=self.mqtt_config.password ) try: - self.client.connect(self.mqtt_config.host, self.mqtt_config.port, 60) + # https://stackoverflow.com/a/55390477 + # with connect_async, retries are handled automatically + self.client.connect_async(self.mqtt_config.host, self.mqtt_config.port, 60) + self.client.loop_start() except Exception as e: logger.error(f"Unable to connect to MQTT server: {e}") - self.client = None return - self.client.loop_start() - self.set_initial_topics() - def publish(self, topic: str, payload, retain: bool = False) -> None: """Wrapper for publishing when client is in valid state.""" - if not self.client: + if not self.connected: logger.error(f"Unable to publish to {topic}: client is not connected") return self.client.publish(topic, payload, retain=retain) def add_topic_callback(self, topic: str, callback) -> None: - if not self.client: - return - self.client.message_callback_add(topic, callback)