Skip to content
Snippets Groups Projects
Commit 7a354767 authored by NB Grey's avatar NB Grey
Browse files

Adding the ability to track responses to subscriptions and unsubscriptions

parent 20649e9b
No related branches found
No related tags found
No related merge requests found
......@@ -91,6 +91,7 @@ import kotlinx.coroutines.yield
* @param onConnected called when the CONNACK packet has been received and the connection has been established
* @param onDisconnected called when a DISCONNECT packet has been received or if the connection has been terminated
* @param onSubscribed called when a SUBACK packet has been received
* @param onUnsubscribed called when a UNSUBACK packet has been received
* @param debugLog set to print the hex packets sent and received
* @param publishReceived called when a PUBLISH packet has been received
*/
......@@ -118,6 +119,7 @@ public class MQTTClient(
private val onConnected: (connack: MQTTConnack) -> Unit = {},
private val onDisconnected: (disconnect: MQTTDisconnect?) -> Unit = {},
private val onSubscribed: (suback: MQTTSuback) -> Unit = {},
private val onUnsubscribed: (unsuback: MQTTUnsuback) -> Unit = {},
private val debugLog: Boolean = false,
private val publishReceived: (publish: MQTTPublish) -> Unit
) {
......@@ -338,17 +340,20 @@ public class MQTTClient(
*
* @param subscriptions the list of topic filters and relative settings (many settings are used only in MQTT5)
* @param properties the properties to be included in the message (used only in MQTT5)
* @return the packet Id
*/
public fun subscribe(subscriptions: List<Subscription>, properties: MQTT5Properties = MQTT5Properties()) {
public fun subscribe(subscriptions: List<Subscription>, properties: MQTT5Properties = MQTT5Properties()): UInt {
if (!connackReceived.value && properties.authenticationData != null) {
throw Exception("Not sending until connection complete")
}
val packetId = generatePacketId()
val subscribe = if (mqttVersion == MQTTVersion.MQTT3_1_1) {
MQTT4Subscribe(generatePacketId(), subscriptions)
MQTT4Subscribe(packetId, subscriptions)
} else {
MQTT5Subscribe(generatePacketId(), subscriptions, properties)
MQTT5Subscribe(packetId, subscriptions, properties)
}
send(subscribe.toByteArray())
return packetId
}
/**
......@@ -356,17 +361,20 @@ public class MQTTClient(
*
* @param topics the list of topic filters
* @param properties the properties to be included in the message (used only in MQTT5)
* @return the packet Id
*/
public fun unsubscribe(topics: List<String>, properties: MQTT5Properties = MQTT5Properties()) {
public fun unsubscribe(topics: List<String>, properties: MQTT5Properties = MQTT5Properties()): UInt {
if (!connackReceived.value && properties.authenticationData != null) {
throw Exception("Not sending until connection complete")
}
val packetId = generatePacketId()
val unsubscribe = if (mqttVersion == MQTTVersion.MQTT3_1_1) {
MQTT4Unsubscribe(generatePacketId(), topics)
MQTT4Unsubscribe(packetId, topics)
} else {
MQTT5Unsubscribe(generatePacketId(), topics, properties)
MQTT5Unsubscribe(packetId, topics, properties)
}
send(unsubscribe.toByteArray())
return packetId
}
/**
......@@ -746,6 +754,7 @@ public class MQTTClient(
if (packet is MQTT5Unsuback && properties.requestProblemInformation == 0u && (packet.properties.reasonString != null || packet.properties.userProperty.isNotEmpty())) {
throw MQTTException(ReasonCode.PROTOCOL_ERROR)
}
onUnsubscribed(packet)
}
private fun handlePingresp() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment