diff --git a/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt b/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt index 5492e3b023a21cb74f6c0695916c8bcd8d8967e4..4dce2ec4f14a7cad948b7c5802274201f032fa8e 100644 --- a/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt +++ b/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt @@ -41,6 +41,7 @@ import io.github.davidepianca98.mqtt.packets.mqttv4.MQTT4Pubcomp import io.github.davidepianca98.mqtt.packets.mqttv4.MQTT4Publish import io.github.davidepianca98.mqtt.packets.mqttv4.MQTT4Suback import io.github.davidepianca98.mqtt.packets.mqttv4.MQTT4Subscribe +import io.github.davidepianca98.mqtt.packets.mqttv4.toReasonCode import io.github.davidepianca98.mqtt.packets.mqttv5.MQTT5Auth import io.github.davidepianca98.mqtt.packets.mqttv5.MQTT5Connack import io.github.davidepianca98.mqtt.packets.mqttv5.MQTT5Connect @@ -61,10 +62,13 @@ import io.github.davidepianca98.socket.SocketInterface import io.github.davidepianca98.socket.streams.EOFException import io.github.davidepianca98.socket.tls.TLSClientSettings import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.Job +import kotlinx.coroutines.yield /** * MQTT 3.1.1 and 5 client * + * @param autoInit if true then the Connection will be automatically initialized after creation, otherwise you should call MQTTClient init manually, default value is true * @param mqttVersion sets the version of MQTT for this client MQTTVersion.MQTT3_1_1 or MQTTVersion.MQTT5 * @param address the URL of the server without ws/wss/mqtt/mqtts * @param port the port of the server @@ -87,10 +91,12 @@ import kotlinx.coroutines.CoroutineExceptionHandler * @param onConnected called when the CONNACK packet has been received and the connection has been established * @param onDisconnected called when a DISCONNECT packet has been received or if the connection has been terminated * @param onSubscribed called when a SUBACK packet has been received + * @param onUnsubscribed called when a UNSUBACK packet has been received * @param debugLog set to print the hex packets sent and received * @param publishReceived called when a PUBLISH packet has been received */ public class MQTTClient( + private val autoInit: Boolean = true, private val mqttVersion: MQTTVersion, private val address: String, private val port: Int, @@ -113,10 +119,13 @@ public class MQTTClient( private val onConnected: (connack: MQTTConnack) -> Unit = {}, private val onDisconnected: (disconnect: MQTTDisconnect?) -> Unit = {}, private val onSubscribed: (suback: MQTTSuback) -> Unit = {}, + private val onUnsubscribed: (unsuback: MQTTUnsuback) -> Unit = {}, private val debugLog: Boolean = false, private val publishReceived: (publish: MQTTPublish) -> Unit ) { + private val initialized: AtomicBoolean = atomic(false) + private val maximumPacketSize = properties.maximumPacketSize?.toInt() ?: (1024 * 1024) private var socket: SocketInterface? = null private val running: AtomicBoolean = atomic(false) @@ -164,9 +173,18 @@ public class MQTTClient( throw IllegalArgumentException("Cannot set password without username") } - running.getAndSet(true) + if (autoInit) { + init() + } + } + + public fun init() { + if (!initialized.value) { + initialized.getAndSet(true) + running.getAndSet(true) - connectSocket(250, connectTimeout * 1000) + connectSocket(250, connectTimeout * 1000) + } } private fun connectSocket(readTimeout: Int, connectTimeout: Int) { @@ -184,6 +202,8 @@ public class MQTTClient( } } + public fun isInitialized(): Boolean = initialized.value + public fun isRunning(): Boolean = running.value public fun isConnackReceived(): Boolean = connackReceived.value @@ -320,17 +340,20 @@ public class MQTTClient( * * @param subscriptions the list of topic filters and relative settings (many settings are used only in MQTT5) * @param properties the properties to be included in the message (used only in MQTT5) + * @return the packet Id */ - public fun subscribe(subscriptions: List<Subscription>, properties: MQTT5Properties = MQTT5Properties()) { + public fun subscribe(subscriptions: List<Subscription>, properties: MQTT5Properties = MQTT5Properties()): UInt { if (!connackReceived.value && properties.authenticationData != null) { throw Exception("Not sending until connection complete") } + val packetId = generatePacketId() val subscribe = if (mqttVersion == MQTTVersion.MQTT3_1_1) { - MQTT4Subscribe(generatePacketId(), subscriptions) + MQTT4Subscribe(packetId, subscriptions) } else { - MQTT5Subscribe(generatePacketId(), subscriptions, properties) + MQTT5Subscribe(packetId, subscriptions, properties) } send(subscribe.toByteArray()) + return packetId } /** @@ -338,17 +361,20 @@ public class MQTTClient( * * @param topics the list of topic filters * @param properties the properties to be included in the message (used only in MQTT5) + * @return the packet Id */ - public fun unsubscribe(topics: List<String>, properties: MQTT5Properties = MQTT5Properties()) { + public fun unsubscribe(topics: List<String>, properties: MQTT5Properties = MQTT5Properties()): UInt { if (!connackReceived.value && properties.authenticationData != null) { throw Exception("Not sending until connection complete") } + val packetId = generatePacketId() val unsubscribe = if (mqttVersion == MQTTVersion.MQTT3_1_1) { - MQTT4Unsubscribe(generatePacketId(), topics) + MQTT4Unsubscribe(packetId, topics) } else { - MQTT5Unsubscribe(generatePacketId(), topics, properties) + MQTT5Unsubscribe(packetId, topics, properties) } send(unsubscribe.toByteArray()) + return packetId } /** @@ -404,7 +430,6 @@ public class MQTTClient( } catch (e: MQTTException) { lastException = e disconnect(e.reasonCode) - close() onDisconnected(null) throw e } catch (e: EOFException) { @@ -415,13 +440,11 @@ public class MQTTClient( } catch (e: IOException) { lastException = e disconnect(ReasonCode.UNSPECIFIED_ERROR) - close() onDisconnected(null) throw e } catch (e: Exception) { lastException = e disconnect(ReasonCode.IMPLEMENTATION_SPECIFIC_ERROR) - close() onDisconnected(null) throw e } @@ -492,6 +515,32 @@ public class MQTTClient( } } + /** + * Init and run the client with the ability to cancel + * This function runs the thread on the specified dispatcher until the client stops + * @param dispatcher the dispatcher on which to run the client + * @param exceptionHandler the exception handler for the coroutine scope + */ + public fun initAndRunSuspend( + dispatcher: CoroutineDispatcher = Dispatchers.Default, + exceptionHandler: CoroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->} + ): Job { + return CoroutineScope(dispatcher).launch(exceptionHandler) { + try { + init() + yield() + while (running.value) { + step() + yield() + } + } finally { + if (running.value) { + disconnect(ReasonCode.IMPLEMENTATION_SPECIFIC_ERROR) + } + } + } + } + private fun handlePacket(packet: MQTTPacket) { when (packet) { is MQTTConnack -> handleConnack(packet) @@ -539,7 +588,8 @@ public class MQTTClient( enhancedAuthCallback(packet.properties.authenticationData) } else if (packet is MQTT4Connack) { if (packet.connectReturnCode != ConnectReturnCode.CONNECTION_ACCEPTED) { - throw IOException("Connection failed with code: ${packet.connectReturnCode}") + throw MQTTException(packet.connectReturnCode.toReasonCode()) +// throw IOException("Connection failed with code: ${packet.connectReturnCode}") } } @@ -704,6 +754,7 @@ public class MQTTClient( if (packet is MQTT5Unsuback && properties.requestProblemInformation == 0u && (packet.properties.reasonString != null || packet.properties.userProperty.isNotEmpty())) { throw MQTTException(ReasonCode.PROTOCOL_ERROR) } + onUnsubscribed(packet) } private fun handlePingresp() { diff --git a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/ConnectReturnCode.kt b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/ConnectReturnCode.kt index 8346c226a76091a54e5da7d80e50462f9212ab3f..09ad45dea9403abf059039495a48e6f83dacca3c 100644 --- a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/ConnectReturnCode.kt +++ b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/ConnectReturnCode.kt @@ -1,5 +1,8 @@ package io.github.davidepianca98.mqtt.packets.mqttv4 +import io.github.davidepianca98.mqtt.packets.mqttv4.ConnectReturnCode.* +import io.github.davidepianca98.mqtt.packets.mqttv5.ReasonCode + public enum class ConnectReturnCode(public val value: Int) { CONNECTION_ACCEPTED(0), UNACCEPTABLE_PROTOCOL_VERSION(1), @@ -9,6 +12,15 @@ public enum class ConnectReturnCode(public val value: Int) { NOT_AUTHORIZED(5); public companion object { - public fun valueOf(value: Int): ConnectReturnCode? = values().firstOrNull { it.value == value } + public fun valueOf(value: Int): ConnectReturnCode? = entries.firstOrNull { it.value == value } } } + +public fun ConnectReturnCode.toReasonCode() : ReasonCode = when (this) { + CONNECTION_ACCEPTED -> ReasonCode.SUCCESS + UNACCEPTABLE_PROTOCOL_VERSION -> ReasonCode.UNSUPPORTED_PROTOCOL_VERSION + IDENTIFIER_REJECTED -> ReasonCode.CLIENT_IDENTIFIER_NOT_VALID + SERVER_UNAVAILABLE -> ReasonCode.SERVER_UNAVAILABLE + BAD_USERNAME_PASSWORD -> ReasonCode.BAD_USER_NAME_OR_PASSWORD + NOT_AUTHORIZED -> ReasonCode.NOT_AUTHORIZED +} diff --git a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/SubackReturnCode.kt b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/SubackReturnCode.kt index 0c8331fabc7ed530871afde4ba5b4a3e3748b9b0..797248372600fbd1e6dc5b40bf2cd01460a84015 100644 --- a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/SubackReturnCode.kt +++ b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv4/SubackReturnCode.kt @@ -9,7 +9,7 @@ public enum class SubackReturnCode(public val value: Int) { FAILURE(128); public companion object { - public fun valueOf(value: Int): SubackReturnCode? = values().firstOrNull { it.value == value } + public fun valueOf(value: Int): SubackReturnCode? = entries.firstOrNull { it.value == value } } } diff --git a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/MQTT5Connack.kt b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/MQTT5Connack.kt index b27e785db3ba1618db8949dae7357c67b2ea1f33..6ee2e81d845703a4fa43f6d6d3afdd5c21f4d80e 100644 --- a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/MQTT5Connack.kt +++ b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/MQTT5Connack.kt @@ -74,6 +74,11 @@ public class MQTT5Connack( ReasonCode.valueOf(inStream.readByte().toInt()) ?: throw MQTTException( ReasonCode.PROTOCOL_ERROR ) + + if (connectReasonCode !in validReasonCodes) throw MQTTException( + ReasonCode.PROTOCOL_ERROR + ) + val properties = inStream.deserializeProperties(validProperties) return MQTT5Connack( diff --git a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/ReasonCode.kt b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/ReasonCode.kt index dfba46b5feba0b3bead849013486f754d6396288..2b61f538b50abde89a1d9ddd8b91b2c28b0c2d00 100644 --- a/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/ReasonCode.kt +++ b/kmqtt-common/src/commonMain/kotlin/io/github/davidepianca98/mqtt/packets/mqttv5/ReasonCode.kt @@ -46,6 +46,6 @@ public enum class ReasonCode(public val value: Int) { WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED(162); public companion object { - public fun valueOf(value: Int): ReasonCode? = values().firstOrNull { it.value == value } + public fun valueOf(value: Int): ReasonCode? = entries.firstOrNull { it.value == value } } }