diff --git a/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt b/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt index dd25daeaa4e902470fdb85b485e88127a3602352..0fa58d8c76e6287f886a3e1251fe5b49b96d2db5 100644 --- a/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt +++ b/kmqtt-client/src/commonMain/kotlin/io/github/davidepianca98/MQTTClient.kt @@ -61,11 +61,15 @@ import io.github.davidepianca98.socket.SocketClosedException import io.github.davidepianca98.socket.SocketInterface import io.github.davidepianca98.socket.streams.EOFException import io.github.davidepianca98.socket.tls.TLSClientSettings +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.Job +import kotlinx.coroutines.yield /** * MQTT 3.1.1 and 5 client * + * @param autoInit if true then the Connection will be automatically initialized after creation, otherwise you should call MQTTClient init manually, default value is true * @param mqttVersion sets the version of MQTT for this client MQTTVersion.MQTT3_1_1 or MQTTVersion.MQTT5 * @param address the URL of the server without ws/wss/mqtt/mqtts * @param port the port of the server @@ -92,6 +96,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler * @param publishReceived called when a PUBLISH packet has been received */ public class MQTTClient( + private val autoInit: Boolean = true, private val mqttVersion: MQTTVersion, private val address: String, private val port: Int, @@ -118,6 +123,8 @@ public class MQTTClient( private val publishReceived: (publish: MQTTPublish) -> Unit ) { + private val initialized: AtomicBoolean = atomic(false) + private val maximumPacketSize = properties.maximumPacketSize?.toInt() ?: (1024 * 1024) private var socket: SocketInterface? = null private val running: AtomicBoolean = atomic(false) @@ -165,9 +172,18 @@ public class MQTTClient( throw IllegalArgumentException("Cannot set password without username") } - running.getAndSet(true) + if (autoInit) { + init() + } + } + + public fun init() { + if (!initialized.value) { + initialized.getAndSet(true) + running.getAndSet(true) - connectSocket(250, connectTimeout * 1000) + connectSocket(250, connectTimeout * 1000) + } } private fun connectSocket(readTimeout: Int, connectTimeout: Int) { @@ -185,6 +201,8 @@ public class MQTTClient( } } + public fun isInitialized(): Boolean = initialized.value + public fun isRunning(): Boolean = running.value public fun isConnackReceived(): Boolean = connackReceived.value @@ -493,6 +511,32 @@ public class MQTTClient( } } + /** + * Init and run the client with the ability to cancel + * This function runs the thread on the specified dispatcher until the client stops + * @param dispatcher the dispatcher on which to run the client + * @param exceptionHandler the exception handler for the coroutine scope + */ + public fun initAndRunSuspend( + dispatcher: CoroutineDispatcher = Dispatchers.Default, + exceptionHandler: CoroutineExceptionHandler = CoroutineExceptionHandler { _, throwable ->} + ): Job { + return CoroutineScope(dispatcher).launch(exceptionHandler) { + try { + init() + yield() + while (running.value) { + step() + yield() + } + } catch (e: CancellationException) { + disconnect(ReasonCode.IMPLEMENTATION_SPECIFIC_ERROR) + close() + throw e + } + } + } + private fun handlePacket(packet: MQTTPacket) { when (packet) { is MQTTConnack -> handleConnack(packet)