diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 339528ead285188c8448f9a9eeab45288ec039b0..bf0aae4c9c937d02aef8566a1cc365e373b9c416 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -86,6 +86,11 @@ class Peer { bool isConnected() const { return sock_ != INVALID_SOCKET && status_ == kConnected; }; + + /** + * Block until the connection and handshake has completed. + */ + bool waitConnection(); bool isValid() const { return status_ != kInvalid && sock_ != INVALID_SOCKET; diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index a92a67bbc84eabbbfd82beb76c747bbbc5b915da..c8b10b7651a9cc9cf64ec8237156b0fcf0d0596b 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -59,6 +59,8 @@ class Universe { Peer *connect(const std::string &addr); int numberOfPeers() const { return peers_.size(); } + + int waitConnections(); Peer *getPeer(const ftl::UUID &pid) const; diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index 2032b5acfc07a0dd52da55e238d3953ce59ef29d..fb69ce699850bed6550098fdfae0e3a652ba3ae6 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -39,6 +39,7 @@ #include <memory> #include <algorithm> #include <tuple> +#include <chrono> using std::tuple; using std::get; @@ -46,6 +47,7 @@ using ftl::net::Peer; using ftl::URI; using ftl::net::ws_connect; using ftl::net::Dispatcher; +using std::chrono::seconds; /*static std::string hexStr(const std::string &s) { @@ -481,6 +483,20 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { _send(); } +bool Peer::waitConnection() { + if (status_ == kConnected) return true; + + std::unique_lock<std::mutex> lk(send_mtx_); + std::condition_variable cv; + + onConnect([&](Peer &p) { + cv.notify_all(); + }); + + cv.wait_for(lk, seconds(5)); + return status_ == kConnected; +} + void Peer::onConnect(const std::function<void(Peer&)> &f) { if (status_ == kConnected) { f(*this); diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index 7a09a9b4da86e2ab3b9b70f2b44581e5b8b6c3e4..da9f3253127439c423a1a907952d648713d72aeb 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -84,6 +84,14 @@ Peer *Universe::connect(const string &addr) { return p; } +int Universe::waitConnections() { + int count = 0; + for (auto p : peers_) { + if (p->waitConnection()) count++; + } + return count; +} + int Universe::_setDescriptors() { //Reset all file descriptors FD_ZERO(&sfdread_); diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp index 87a23ee27a3d6cba311efcb39c4c87cb7880fd43..3532c5b7f85fd1f49959a733119506ddf82b4b4e 100644 --- a/net/cpp/test/net_integration.cpp +++ b/net/cpp/test/net_integration.cpp @@ -23,7 +23,7 @@ TEST_CASE("Universe::connect()", "[net]") { auto p = b.connect("tcp://127.0.0.1:7077"); REQUIRE( p ); - while (!p->isConnected()) sleep_for(milliseconds(20)); + p->waitConnection(); REQUIRE( a.numberOfPeers() == 1 ); REQUIRE( b.numberOfPeers() == 1 ); @@ -33,7 +33,7 @@ TEST_CASE("Universe::connect()", "[net]") { auto p = b.connect("tcp://localhost:7077"); REQUIRE( p ); - while (!p->isConnected()) sleep_for(milliseconds(20)); + p->waitConnection(); REQUIRE( a.numberOfPeers() == 1 ); REQUIRE( b.numberOfPeers() == 1 ); @@ -95,8 +95,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { } SECTION("no arguments to one peer") { - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); bool done = false; a.bind("hello", [&done]() { @@ -111,8 +110,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { } SECTION("one argument to one peer") { - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); int done = 0; a.bind("hello", [&done](int v) { @@ -129,9 +127,8 @@ TEST_CASE("Universe::broadcast()", "[net]") { SECTION("one argument to two peers") { Universe c; - b.connect("tcp://localhost:7077"); - c.connect("tcp://localhost:7077"); - while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); + c.connect("tcp://localhost:7077")->waitConnection(); int done1 = 0; b.bind("hello", [&done1](int v) { @@ -156,8 +153,7 @@ TEST_CASE("Universe::findOwner()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); SECTION("no owners exist") { REQUIRE( !b.findOwner("ftl://test") ); @@ -170,9 +166,8 @@ TEST_CASE("Universe::findOwner()", "") { SECTION("three peers and one owner") { Universe c; - c.connect("tcp://localhost:7077"); + c.connect("tcp://localhost:7077")->waitConnection(); b.setLocalID(ftl::UUID(7)); - while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); b.createResource("ftl://test"); REQUIRE( *(a.findOwner("ftl://test")) == ftl::UUID(7) ); @@ -180,9 +175,8 @@ TEST_CASE("Universe::findOwner()", "") { SECTION("three peers and one owner (2)") { Universe c; - c.connect("tcp://localhost:7077"); + c.connect("tcp://localhost:7077")->waitConnection(); c.setLocalID(ftl::UUID(7)); - while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); c.createResource("ftl://test"); auto r = a.findOwner("ftl://test"); @@ -195,8 +189,7 @@ TEST_CASE("Universe::subscribe()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); SECTION("no resource exists") { REQUIRE( !b.subscribe("ftl://test", []() {}) ); @@ -214,8 +207,7 @@ TEST_CASE("Universe::publish()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); SECTION("no subscribers") { a.createResource("ftl://test");