diff --git a/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt b/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt index 3060b3efb21735c705a8c915d66dc37074a89876..634c95a2b544ab3a89bba4b05cdfe973208d9203 100644 --- a/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt +++ b/kmqtt-client/src/commonMain/kotlin/MQTTClient.kt @@ -79,6 +79,9 @@ public class MQTTClient( // QoS 2 messages which have been received from the Server, but have not been completely acknowledged private val qos2ListReceived = mutableListOf<UInt>() + // List of messages to be sent after CONNACK has been received + private val pendingSendMessages = atomic(mutableListOf<UByteArray>()) + // Connection private val topicAliasesClient = mutableMapOf<UInt, String>() // TODO reset all these on reconnection private val maximumQos = atomic(Qos.EXACTLY_ONCE) @@ -128,9 +131,13 @@ public class MQTTClient( public fun isConnackReceived(): Boolean = connackReceived.value - private fun send(data: UByteArray) { - socket?.send(data) ?: throw SocketClosedException("MQTT send failed") - lastActiveTimestamp.getAndSet(currentTimeMillis()) + private fun send(data: UByteArray, force: Boolean = false) { + if (connackReceived.value || force) { + socket?.send(data) ?: throw SocketClosedException("MQTT send failed") + lastActiveTimestamp.getAndSet(currentTimeMillis()) + } else { + pendingSendMessages.value += data + } } private fun sendConnect() { @@ -159,7 +166,7 @@ public class MQTTClient( password ) } - send(connect.toByteArray()) + send(connect.toByteArray(), true) } private fun generatePacketId(): UInt { @@ -289,6 +296,14 @@ public class MQTTClient( throw lastException ?: SocketClosedException("") } socket?.sendRemaining() + if (connackReceived.value) { + val pending = pendingSendMessages.value + for (data in pending) { + send(data) + } + pendingSendMessages.value.clear() + } + val data = socket?.read() if (data != null) { @@ -629,4 +644,4 @@ public class MQTTClient( connackReceived.getAndSet(false) socket = null } -} \ No newline at end of file +}