From 221880f5fc6dc7d8b2977b39f4c4e00b563f3b3f Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Fri, 10 May 2019 13:03:02 +0100 Subject: [PATCH] Resolves #4 with waitConnection --- net/cpp/include/ftl/net/peer.hpp | 5 +++++ net/cpp/include/ftl/net/universe.hpp | 2 ++ net/cpp/src/peer.cpp | 16 +++++++++++++++ net/cpp/src/universe.cpp | 8 ++++++++ net/cpp/test/net_integration.cpp | 30 ++++++++++------------------ 5 files changed, 42 insertions(+), 19 deletions(-) diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 339528ead..bf0aae4c9 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -86,6 +86,11 @@ class Peer { bool isConnected() const { return sock_ != INVALID_SOCKET && status_ == kConnected; }; + + /** + * Block until the connection and handshake has completed. + */ + bool waitConnection(); bool isValid() const { return status_ != kInvalid && sock_ != INVALID_SOCKET; diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index a92a67bbc..c8b10b765 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -59,6 +59,8 @@ class Universe { Peer *connect(const std::string &addr); int numberOfPeers() const { return peers_.size(); } + + int waitConnections(); Peer *getPeer(const ftl::UUID &pid) const; diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index 2032b5acf..fb69ce699 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -39,6 +39,7 @@ #include <memory> #include <algorithm> #include <tuple> +#include <chrono> using std::tuple; using std::get; @@ -46,6 +47,7 @@ using ftl::net::Peer; using ftl::URI; using ftl::net::ws_connect; using ftl::net::Dispatcher; +using std::chrono::seconds; /*static std::string hexStr(const std::string &s) { @@ -481,6 +483,20 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { _send(); } +bool Peer::waitConnection() { + if (status_ == kConnected) return true; + + std::unique_lock<std::mutex> lk(send_mtx_); + std::condition_variable cv; + + onConnect([&](Peer &p) { + cv.notify_all(); + }); + + cv.wait_for(lk, seconds(5)); + return status_ == kConnected; +} + void Peer::onConnect(const std::function<void(Peer&)> &f) { if (status_ == kConnected) { f(*this); diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index 7a09a9b4d..da9f32531 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -84,6 +84,14 @@ Peer *Universe::connect(const string &addr) { return p; } +int Universe::waitConnections() { + int count = 0; + for (auto p : peers_) { + if (p->waitConnection()) count++; + } + return count; +} + int Universe::_setDescriptors() { //Reset all file descriptors FD_ZERO(&sfdread_); diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp index 87a23ee27..3532c5b7f 100644 --- a/net/cpp/test/net_integration.cpp +++ b/net/cpp/test/net_integration.cpp @@ -23,7 +23,7 @@ TEST_CASE("Universe::connect()", "[net]") { auto p = b.connect("tcp://127.0.0.1:7077"); REQUIRE( p ); - while (!p->isConnected()) sleep_for(milliseconds(20)); + p->waitConnection(); REQUIRE( a.numberOfPeers() == 1 ); REQUIRE( b.numberOfPeers() == 1 ); @@ -33,7 +33,7 @@ TEST_CASE("Universe::connect()", "[net]") { auto p = b.connect("tcp://localhost:7077"); REQUIRE( p ); - while (!p->isConnected()) sleep_for(milliseconds(20)); + p->waitConnection(); REQUIRE( a.numberOfPeers() == 1 ); REQUIRE( b.numberOfPeers() == 1 ); @@ -95,8 +95,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { } SECTION("no arguments to one peer") { - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); bool done = false; a.bind("hello", [&done]() { @@ -111,8 +110,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { } SECTION("one argument to one peer") { - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); int done = 0; a.bind("hello", [&done](int v) { @@ -129,9 +127,8 @@ TEST_CASE("Universe::broadcast()", "[net]") { SECTION("one argument to two peers") { Universe c; - b.connect("tcp://localhost:7077"); - c.connect("tcp://localhost:7077"); - while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); + c.connect("tcp://localhost:7077")->waitConnection(); int done1 = 0; b.bind("hello", [&done1](int v) { @@ -156,8 +153,7 @@ TEST_CASE("Universe::findOwner()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); SECTION("no owners exist") { REQUIRE( !b.findOwner("ftl://test") ); @@ -170,9 +166,8 @@ TEST_CASE("Universe::findOwner()", "") { SECTION("three peers and one owner") { Universe c; - c.connect("tcp://localhost:7077"); + c.connect("tcp://localhost:7077")->waitConnection(); b.setLocalID(ftl::UUID(7)); - while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); b.createResource("ftl://test"); REQUIRE( *(a.findOwner("ftl://test")) == ftl::UUID(7) ); @@ -180,9 +175,8 @@ TEST_CASE("Universe::findOwner()", "") { SECTION("three peers and one owner (2)") { Universe c; - c.connect("tcp://localhost:7077"); + c.connect("tcp://localhost:7077")->waitConnection(); c.setLocalID(ftl::UUID(7)); - while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); c.createResource("ftl://test"); auto r = a.findOwner("ftl://test"); @@ -195,8 +189,7 @@ TEST_CASE("Universe::subscribe()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); SECTION("no resource exists") { REQUIRE( !b.subscribe("ftl://test", []() {}) ); @@ -214,8 +207,7 @@ TEST_CASE("Universe::publish()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + b.connect("tcp://localhost:7077")->waitConnection(); SECTION("no subscribers") { a.createResource("ftl://test"); -- GitLab