Skip to content
Snippets Groups Projects
Commit 88d11999 authored by Davide Pianca's avatar Davide Pianca
Browse files

Avoid sending MQTT messages before CONNACK has been received

This fixes the problem with AWS broker which doesn't accept SUBSCRIBE packets before the CONNACK packet has been received, instead returning UNSPECIFIED_ERROR in SUBACK. This is not standard compliant which allows the client to send control packets right after CONNECT, before CONNACK has been received. But with the new behaviour we increase compatibility.
This problem manifests itself mainly with the Android Emulator.
parent 6cbcb823
No related branches found
No related tags found
No related merge requests found
...@@ -79,6 +79,9 @@ public class MQTTClient( ...@@ -79,6 +79,9 @@ public class MQTTClient(
// QoS 2 messages which have been received from the Server, but have not been completely acknowledged // QoS 2 messages which have been received from the Server, but have not been completely acknowledged
private val qos2ListReceived = mutableListOf<UInt>() private val qos2ListReceived = mutableListOf<UInt>()
// List of messages to be sent after CONNACK has been received
private val pendingSendMessages = atomic(mutableListOf<UByteArray>())
// Connection // Connection
private val topicAliasesClient = mutableMapOf<UInt, String>() // TODO reset all these on reconnection private val topicAliasesClient = mutableMapOf<UInt, String>() // TODO reset all these on reconnection
private val maximumQos = atomic(Qos.EXACTLY_ONCE) private val maximumQos = atomic(Qos.EXACTLY_ONCE)
...@@ -128,9 +131,13 @@ public class MQTTClient( ...@@ -128,9 +131,13 @@ public class MQTTClient(
public fun isConnackReceived(): Boolean = connackReceived.value public fun isConnackReceived(): Boolean = connackReceived.value
private fun send(data: UByteArray) { private fun send(data: UByteArray, force: Boolean = false) {
if (connackReceived.value || force) {
socket?.send(data) ?: throw SocketClosedException("MQTT send failed") socket?.send(data) ?: throw SocketClosedException("MQTT send failed")
lastActiveTimestamp.getAndSet(currentTimeMillis()) lastActiveTimestamp.getAndSet(currentTimeMillis())
} else {
pendingSendMessages.value += data
}
} }
private fun sendConnect() { private fun sendConnect() {
...@@ -159,7 +166,7 @@ public class MQTTClient( ...@@ -159,7 +166,7 @@ public class MQTTClient(
password password
) )
} }
send(connect.toByteArray()) send(connect.toByteArray(), true)
} }
private fun generatePacketId(): UInt { private fun generatePacketId(): UInt {
...@@ -289,6 +296,14 @@ public class MQTTClient( ...@@ -289,6 +296,14 @@ public class MQTTClient(
throw lastException ?: SocketClosedException("") throw lastException ?: SocketClosedException("")
} }
socket?.sendRemaining() socket?.sendRemaining()
if (connackReceived.value) {
val pending = pendingSendMessages.value
for (data in pending) {
send(data)
}
pendingSendMessages.value.clear()
}
val data = socket?.read() val data = socket?.read()
if (data != null) { if (data != null) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment