From 225f2a0d97e729fea544fe77997bc477877fb16d Mon Sep 17 00:00:00 2001 From: Davide Pianca <davidepianca98@gmail.com> Date: Thu, 13 Jun 2024 12:07:50 +0200 Subject: [PATCH] Add brokerStarted and brokerStopped callbacks in MQTT Broker --- .../commonMain/kotlin/mqtt/broker/Broker.kt | 28 ++++++++++++++++++- .../mqtt/broker/interfaces/MiscCallbacks.kt | 14 ++++++++++ .../kotlin/socket/ServerSocketLoop.kt | 4 +++ 3 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt diff --git a/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt index 180e181..3e2fe0c 100644 --- a/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt +++ b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt @@ -43,7 +43,8 @@ public class Broker( public val persistence: Persistence? = null, public val cluster: ClusterSettings? = null, public val enableUdp: Boolean = false, - public val webSocketPort: Int? = null + public val webSocketPort: Int? = null, + private val miscCallbacks: MiscCallbacks? = null ) { private val server = ServerSocketLoop(this) @@ -55,6 +56,9 @@ public class Broker( internal val lock = reentrantLock() + private var startCallbackCalled = false + private var stopCallbackCalled = false + init { if (enableUdp && maximumPacketSize > 65535u) { throw IllegalArgumentException("When UDP is enabled the maximum packet size can't be bigger than the datagram maximum size") @@ -65,14 +69,32 @@ public class Broker( * Starts the broker (blocking run) */ public fun listen() { + if (!startCallbackCalled) { + miscCallbacks?.brokerStarted() + startCallbackCalled = true + } server.run() + if (!stopCallbackCalled) { + miscCallbacks?.brokerStopped() + stopCallbackCalled = true + } } /** * Run a single iteration of the broker (non blocking run) */ public fun step() { + if (!startCallbackCalled) { + miscCallbacks?.brokerStarted() + startCallbackCalled = true + } server.step() + if (!server.isRunning()) { + if (!stopCallbackCalled) { + miscCallbacks?.brokerStopped() + stopCallbackCalled = true + } + } } internal fun sendWill(session: Session?) { @@ -414,6 +436,10 @@ public class Broker( } server.stop() } + if (!stopCallbackCalled) { + miscCallbacks?.brokerStopped() + stopCallbackCalled = true + } } internal fun addClusterConnection(address: String) { diff --git a/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt new file mode 100644 index 0000000..f549e59 --- /dev/null +++ b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt @@ -0,0 +1,14 @@ +package mqtt.broker.interfaces + +public interface MiscCallbacks { + + /** + * Called when the broker has started listening + */ + public fun brokerStarted() + + /** + * Called when the broker has stopped listening + */ + public fun brokerStopped() +} \ No newline at end of file diff --git a/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt b/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt index b8a7d16..e351ff6 100644 --- a/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt +++ b/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt @@ -34,6 +34,10 @@ internal class ServerSocketLoop(private val broker: Broker) { } } + fun isRunning(): Boolean { + return serverSocket.isRunning() + } + private fun selectCallback(attachment: Any?, state: SocketState): Boolean { return broker.lock.withLock { when (attachment) { -- GitLab