Compare commits

...

8 Commits

Author SHA1 Message Date
Daniel
88991ca16a
Merge f68ff53bd9 into e4eac4ac81 2025-11-11 15:45:59 +07:00
Daniel
f68ff53bd9 fix MQTT disconnect reason code comparison bug
The mypy fixes introduced a bug where disconnect callback was comparing
reason_code == 0 instead of reason_value == 0. Since reason_code is a
ReasonCode object, it never equals integer 0, causing clean disconnects
to never be detected and infinite retry loops to occur.

Fixed by using reason_value (extracted from reason_code.value) for
numeric comparison, consistent with how we extract the value for logging.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-14 20:07:22 -04:00
Daniel
b0f189056a fix MQTT retry loop race condition with cleanup disconnect
Previously, the reconnection loop was calling client.disconnect() during cleanup,
which triggered the disconnect callback with code 0 ("Normal disconnection").
The callback would then exit early, preventing further reconnection attempts.

This creates a cleanup flag that prevents the disconnect callback from
stopping reconnection when we're intentionally cleaning up the old client
during retry attempts.

Fixes issue where MQTT would get stuck in retry loop without actually
attempting fresh connections.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-14 19:03:34 -04:00
Dan
91d3a7b245 format MQTT code with ruff 2025-09-01 18:12:51 -04:00
Dan
44c07aac12 fix mypy errors with improved MQTT reliability implementation
- Add _reason_info helper method to extract reason names safely
- Update connect/disconnect callbacks to use string comparison instead of numeric
- Fix type annotations with proper Optional[Client] typing
- Resolve unreachable code warnings with type ignore comments for threading
- Improve error handling with robust try/finally blocks in stop method
- Maintain fresh client creation approach for reliable reconnection

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-01 18:08:01 -04:00
Dan
3952035579 format MQTT code with ruff 2025-09-01 15:54:48 -04:00
Dan
a45915517f improve MQTT reconnection with fresh client creation approach
- Replace client.reconnect() with fresh client creation for each retry attempt
- Add proper cleanup of old client before creating new one
- Enhanced logging with detailed debugging info for disconnect reasons
- Use proven aiomqtt-style retry pattern for better reliability
- Each reconnection attempt now creates completely new MQTT client instance

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-01 15:32:04 -04:00
Dan
f95a49b81d implement automatic MQTT reconnection with 10s retry interval
- Fix connection state management: only set connected=True on successful connection
- Add automatic reconnection loop that retries every 10 seconds indefinitely
- Proper cleanup of reconnection thread on stop
- Enhanced disconnect logging with reason codes
- Thread-safe reconnection handling to avoid blocking main MQTT thread

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-01 12:57:02 -04:00

View File

@ -1,6 +1,7 @@
import logging import logging
import threading import threading
from typing import Any, Callable import time
from typing import Any, Callable, Optional
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion from paho.mqtt.enums import CallbackAPIVersion
@ -18,15 +19,20 @@ class MqttClient(Communicator):
self.config = config self.config = config
self.mqtt_config = config.mqtt self.mqtt_config = config.mqtt
self.connected = False self.connected = False
self.client: Optional[mqtt.Client] = None
self._dispatcher: Callable[[str, str], None] = lambda *_: None
self._reconnect_thread: Optional[threading.Thread] = None
self._reconnect_delay = 10 # Retry every 10 seconds
self._stop_reconnect: bool = False
def subscribe(self, receiver: Callable) -> None: def subscribe(self, receiver: Callable[[str, str], None]) -> None:
"""Wrapper for allowing dispatcher to subscribe.""" """Wrapper for allowing dispatcher to subscribe."""
self._dispatcher = receiver self._dispatcher = receiver
self._start() self._start()
def publish(self, topic: str, payload: Any, retain: bool = False) -> None: def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state.""" """Wrapper for publishing when client is in valid state."""
if not self.connected: if not self.connected or self.client is None:
logger.debug(f"Unable to publish to {topic}: client is not connected") logger.debug(f"Unable to publish to {topic}: client is not connected")
return return
@ -38,7 +44,17 @@ class MqttClient(Communicator):
) )
def stop(self) -> None: def stop(self) -> None:
self.client.disconnect() self._stop_reconnect = True
if self._reconnect_thread is not None and self._reconnect_thread.is_alive():
self._reconnect_thread.join(timeout=5)
if self.client is not None:
try:
self.client.disconnect()
finally:
try:
self.client.loop_stop()
except Exception:
pass
def _set_initial_topics(self) -> None: def _set_initial_topics(self) -> None:
"""Set initial state topics.""" """Set initial state topics."""
@ -142,6 +158,22 @@ class MqttClient(Communicator):
self.publish("available", "online", retain=True) self.publish("available", "online", retain=True)
@staticmethod
def _reason_info(reason_code: object) -> str:
"""Return human_readable_name for a Paho reason code."""
# Name string
if hasattr(reason_code, "getName") and callable(
getattr(reason_code, "getName")
):
try:
name = str(getattr(reason_code, "getName")())
except Exception:
name = str(reason_code)
else:
name = str(reason_code)
return name
def on_mqtt_command( def on_mqtt_command(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None: ) -> None:
@ -155,27 +187,31 @@ class MqttClient(Communicator):
client: mqtt.Client, client: mqtt.Client,
userdata: Any, userdata: Any,
flags: Any, flags: Any,
reason_code: mqtt.ReasonCode, # type: ignore[name-defined] reason_code: object,
properties: Any, properties: Any,
) -> None: ) -> None:
"""Mqtt connection callback.""" """Mqtt connection callback."""
threading.current_thread().name = "mqtt" threading.current_thread().name = "mqtt"
if reason_code != 0: reason_name = self._reason_info(reason_code)
if reason_code == "Server unavailable":
# Check for connection failure by comparing reason name
if reason_name != "Success":
if reason_name == "Server unavailable":
logger.error( logger.error(
"Unable to connect to MQTT server: MQTT Server unavailable" "Unable to connect to MQTT server: MQTT Server unavailable"
) )
elif reason_code == "Bad user name or password": elif reason_name == "Bad user name or password":
logger.error( logger.error(
"Unable to connect to MQTT server: MQTT Bad username or password" "Unable to connect to MQTT server: MQTT Bad username or password"
) )
elif reason_code == "Not authorized": elif reason_name == "Not authorized":
logger.error("Unable to connect to MQTT server: MQTT Not authorized") logger.error("Unable to connect to MQTT server: MQTT Not authorized")
else: else:
logger.error( logger.error(
"Unable to connect to MQTT server: Connection refused. Error code: " f"Unable to connect to MQTT server: Connection refused. Error: {reason_name}"
+ reason_code.getName()
) )
# Don't set connected = True on connection failure
return
self.connected = True self.connected = True
logger.debug("MQTT connected") logger.debug("MQTT connected")
@ -192,7 +228,34 @@ class MqttClient(Communicator):
) -> None: ) -> None:
"""Mqtt disconnection callback.""" """Mqtt disconnection callback."""
self.connected = False self.connected = False
logger.error("MQTT disconnected") # Debug reason code thoroughly
reason_name = (
reason_code.getName()
if hasattr(reason_code, "getName")
else str(reason_code)
)
reason_value = getattr(reason_code, "value", reason_code)
logger.error(
f"MQTT disconnected - reason: '{reason_name}', code: {reason_value}, type: {type(reason_code)}"
)
# Don't attempt reconnection if we're stopping or if it was a clean disconnect
if self._stop_reconnect:
logger.error("MQTT not reconnecting - stop flag set")
return
if reason_value == 0:
logger.error("MQTT not reconnecting - clean disconnect (code 0)")
return
logger.error("MQTT will attempt reconnection...")
# Start reconnection in a separate thread to avoid blocking
if self._reconnect_thread is None or not self._reconnect_thread.is_alive():
self._reconnect_thread = threading.Thread(
target=self._reconnect_loop, name="mqtt-reconnect", daemon=True
)
self._reconnect_thread.start()
def _start(self) -> None: def _start(self) -> None:
"""Start mqtt client.""" """Start mqtt client."""
@ -281,3 +344,66 @@ class MqttClient(Communicator):
except Exception as e: except Exception as e:
logger.error(f"Unable to connect to MQTT server: {e}") logger.error(f"Unable to connect to MQTT server: {e}")
return return
def _reconnect_loop(self) -> None:
"""Handle MQTT reconnection using fresh client creation, retrying every 10 seconds indefinitely."""
logger.error("MQTT reconnection loop started")
attempt = 0
while not self._stop_reconnect and not self.connected:
attempt += 1
logger.error(
f"Will attempt MQTT reconnection in {self._reconnect_delay} seconds (attempt {attempt})"
)
# Wait with ability to exit early if stopping
delay_count = 0
while delay_count < self._reconnect_delay:
if self._stop_reconnect:
logger.error("MQTT reconnection stopped during delay") # type: ignore[unreachable]
return
time.sleep(1)
delay_count += 1
# Double-check stop flag after delay
if self._stop_reconnect:
logger.error("MQTT reconnection stopped after delay") # type: ignore[unreachable]
return
try:
logger.error(
f"Creating fresh MQTT client for reconnection attempt {attempt}..."
)
# Clean up old client if it exists
if self.client is not None:
try:
self.client.disconnect()
self.client.loop_stop()
except Exception:
pass # Ignore cleanup errors
# Create completely fresh client and attempt connection
self._start()
# Give the connection attempt some time to complete
wait_count = 0
while wait_count < 5: # Wait up to 5 seconds for connection
if self.connected:
logger.error( # type: ignore[unreachable]
f"MQTT fresh connection successful on attempt {attempt}!"
)
return
time.sleep(1)
wait_count += 1
logger.error(
f"MQTT fresh connection attempt {attempt} timed out, will retry"
)
# Continue the outer while loop to retry
except Exception as e:
logger.error(f"MQTT fresh connection attempt {attempt} failed: {e}")
# Continue the outer while loop to retry
logger.error("MQTT reconnection loop finished")