Skip to content
Snippets Groups Projects
Commit 225f2a0d authored by Davide Pianca's avatar Davide Pianca
Browse files

Add brokerStarted and brokerStopped callbacks in MQTT Broker

parent 8e7fd8f6
No related branches found
No related tags found
No related merge requests found
......@@ -43,7 +43,8 @@ public class Broker(
public val persistence: Persistence? = null,
public val cluster: ClusterSettings? = null,
public val enableUdp: Boolean = false,
public val webSocketPort: Int? = null
public val webSocketPort: Int? = null,
private val miscCallbacks: MiscCallbacks? = null
) {
private val server = ServerSocketLoop(this)
......@@ -55,6 +56,9 @@ public class Broker(
internal val lock = reentrantLock()
private var startCallbackCalled = false
private var stopCallbackCalled = false
init {
if (enableUdp && maximumPacketSize > 65535u) {
throw IllegalArgumentException("When UDP is enabled the maximum packet size can't be bigger than the datagram maximum size")
......@@ -65,14 +69,32 @@ public class Broker(
* Starts the broker (blocking run)
*/
public fun listen() {
if (!startCallbackCalled) {
miscCallbacks?.brokerStarted()
startCallbackCalled = true
}
server.run()
if (!stopCallbackCalled) {
miscCallbacks?.brokerStopped()
stopCallbackCalled = true
}
}
/**
* Run a single iteration of the broker (non blocking run)
*/
public fun step() {
if (!startCallbackCalled) {
miscCallbacks?.brokerStarted()
startCallbackCalled = true
}
server.step()
if (!server.isRunning()) {
if (!stopCallbackCalled) {
miscCallbacks?.brokerStopped()
stopCallbackCalled = true
}
}
}
internal fun sendWill(session: Session?) {
......@@ -414,6 +436,10 @@ public class Broker(
}
server.stop()
}
if (!stopCallbackCalled) {
miscCallbacks?.brokerStopped()
stopCallbackCalled = true
}
}
internal fun addClusterConnection(address: String) {
......
package mqtt.broker.interfaces
public interface MiscCallbacks {
/**
* Called when the broker has started listening
*/
public fun brokerStarted()
/**
* Called when the broker has stopped listening
*/
public fun brokerStopped()
}
\ No newline at end of file
......@@ -34,6 +34,10 @@ internal class ServerSocketLoop(private val broker: Broker) {
}
}
fun isRunning(): Boolean {
return serverSocket.isRunning()
}
private fun selectCallback(attachment: Any?, state: SocketState): Boolean {
return broker.lock.withLock {
when (attachment) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment