diff --git a/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt index 180e181106741c916cda147482b1f2bff19ac374..3e2fe0c77b5fa03835b0f8fb1afa7df286f23993 100644 --- a/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt +++ b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/Broker.kt @@ -43,7 +43,8 @@ public class Broker( public val persistence: Persistence? = null, public val cluster: ClusterSettings? = null, public val enableUdp: Boolean = false, - public val webSocketPort: Int? = null + public val webSocketPort: Int? = null, + private val miscCallbacks: MiscCallbacks? = null ) { private val server = ServerSocketLoop(this) @@ -55,6 +56,9 @@ public class Broker( internal val lock = reentrantLock() + private var startCallbackCalled = false + private var stopCallbackCalled = false + init { if (enableUdp && maximumPacketSize > 65535u) { throw IllegalArgumentException("When UDP is enabled the maximum packet size can't be bigger than the datagram maximum size") @@ -65,14 +69,32 @@ public class Broker( * Starts the broker (blocking run) */ public fun listen() { + if (!startCallbackCalled) { + miscCallbacks?.brokerStarted() + startCallbackCalled = true + } server.run() + if (!stopCallbackCalled) { + miscCallbacks?.brokerStopped() + stopCallbackCalled = true + } } /** * Run a single iteration of the broker (non blocking run) */ public fun step() { + if (!startCallbackCalled) { + miscCallbacks?.brokerStarted() + startCallbackCalled = true + } server.step() + if (!server.isRunning()) { + if (!stopCallbackCalled) { + miscCallbacks?.brokerStopped() + stopCallbackCalled = true + } + } } internal fun sendWill(session: Session?) { @@ -414,6 +436,10 @@ public class Broker( } server.stop() } + if (!stopCallbackCalled) { + miscCallbacks?.brokerStopped() + stopCallbackCalled = true + } } internal fun addClusterConnection(address: String) { diff --git a/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt new file mode 100644 index 0000000000000000000000000000000000000000..f549e594ef6cffffb2282ab590f2014364ee2d51 --- /dev/null +++ b/kmqtt-broker/src/commonMain/kotlin/mqtt/broker/interfaces/MiscCallbacks.kt @@ -0,0 +1,14 @@ +package mqtt.broker.interfaces + +public interface MiscCallbacks { + + /** + * Called when the broker has started listening + */ + public fun brokerStarted() + + /** + * Called when the broker has stopped listening + */ + public fun brokerStopped() +} \ No newline at end of file diff --git a/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt b/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt index b8a7d16f6e63196af15b47601fce59fdad244a8e..e351ff65e9802670989a0464f0a77c355aa99b59 100644 --- a/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt +++ b/kmqtt-broker/src/commonMain/kotlin/socket/ServerSocketLoop.kt @@ -34,6 +34,10 @@ internal class ServerSocketLoop(private val broker: Broker) { } } + fun isRunning(): Boolean { + return serverSocket.isRunning() + } + private fun selectCallback(attachment: Any?, state: SocketState): Boolean { return broker.lock.withLock { when (attachment) {