From 88d11999f8b2120d511424de18d7e31e4efd5b02 Mon Sep 17 00:00:00 2001
From: Davide Pianca <davidepianca98@gmail.com>
Date: Sun, 21 Apr 2024 10:45:39 +0200
Subject: [PATCH] Avoid sending MQTT messages before CONNACK has been received

This fixes the problem with AWS broker which doesn't accept SUBSCRIBE packets before the CONNACK packet has been received, instead returning UNSPECIFIED_ERROR in SUBACK. This is not standard compliant which allows the client to send control packets right after CONNECT, before CONNACK has been received. But with the new behaviour we increase compatibility.
This problem manifests itself mainly with the Android Emulator.
---
 .../src/commonMain/kotlin/MQTTClient.kt       | 25 +++++++++++++++----
 1 file changed, 20 insertions(+), 5 deletions(-)

diff --git a/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt b/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt
index 3060b3e..634c95a 100644
--- a/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt
+++ b/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt
@@ -79,6 +79,9 @@ public class MQTTClient(
     // QoS 2 messages which have been received from the Server, but have not been completely acknowledged
     private val qos2ListReceived = mutableListOf<UInt>()
 
+    // List of messages to be sent after CONNACK has been received
+    private val pendingSendMessages = atomic(mutableListOf<UByteArray>())
+
     // Connection
     private val topicAliasesClient = mutableMapOf<UInt, String>() // TODO reset all these on reconnection
     private val maximumQos = atomic(Qos.EXACTLY_ONCE)
@@ -128,9 +131,13 @@ public class MQTTClient(
 
     public fun isConnackReceived(): Boolean = connackReceived.value
 
-    private fun send(data: UByteArray) {
-        socket?.send(data) ?: throw SocketClosedException("MQTT send failed")
-        lastActiveTimestamp.getAndSet(currentTimeMillis())
+    private fun send(data: UByteArray, force: Boolean = false) {
+        if (connackReceived.value || force) {
+            socket?.send(data) ?: throw SocketClosedException("MQTT send failed")
+            lastActiveTimestamp.getAndSet(currentTimeMillis())
+        } else {
+            pendingSendMessages.value += data
+        }
     }
 
     private fun sendConnect() {
@@ -159,7 +166,7 @@ public class MQTTClient(
                 password
             )
         }
-        send(connect.toByteArray())
+        send(connect.toByteArray(), true)
     }
 
     private fun generatePacketId(): UInt {
@@ -289,6 +296,14 @@ public class MQTTClient(
             throw lastException ?: SocketClosedException("")
         }
         socket?.sendRemaining()
+        if (connackReceived.value) {
+            val pending = pendingSendMessages.value
+            for (data in pending) {
+                send(data)
+            }
+            pendingSendMessages.value.clear()
+        }
+
         val data = socket?.read()
 
         if (data != null) {
@@ -629,4 +644,4 @@ public class MQTTClient(
         connackReceived.getAndSet(false)
         socket = null
     }
-}
\ No newline at end of file
+}
-- 
GitLab