Skip to content
Snippets Groups Projects
Commit 221880f5 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #4 with waitConnection

parent 70129a79
No related branches found
No related tags found
No related merge requests found
Pipeline #10567 passed
......@@ -86,6 +86,11 @@ class Peer {
bool isConnected() const {
return sock_ != INVALID_SOCKET && status_ == kConnected;
};
/**
* Block until the connection and handshake has completed.
*/
bool waitConnection();
bool isValid() const {
return status_ != kInvalid && sock_ != INVALID_SOCKET;
......
......@@ -59,6 +59,8 @@ class Universe {
Peer *connect(const std::string &addr);
int numberOfPeers() const { return peers_.size(); }
int waitConnections();
Peer *getPeer(const ftl::UUID &pid) const;
......
......@@ -39,6 +39,7 @@
#include <memory>
#include <algorithm>
#include <tuple>
#include <chrono>
using std::tuple;
using std::get;
......@@ -46,6 +47,7 @@ using ftl::net::Peer;
using ftl::URI;
using ftl::net::ws_connect;
using ftl::net::Dispatcher;
using std::chrono::seconds;
/*static std::string hexStr(const std::string &s)
{
......@@ -481,6 +483,20 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
_send();
}
bool Peer::waitConnection() {
if (status_ == kConnected) return true;
std::unique_lock<std::mutex> lk(send_mtx_);
std::condition_variable cv;
onConnect([&](Peer &p) {
cv.notify_all();
});
cv.wait_for(lk, seconds(5));
return status_ == kConnected;
}
void Peer::onConnect(const std::function<void(Peer&)> &f) {
if (status_ == kConnected) {
f(*this);
......
......@@ -84,6 +84,14 @@ Peer *Universe::connect(const string &addr) {
return p;
}
int Universe::waitConnections() {
int count = 0;
for (auto p : peers_) {
if (p->waitConnection()) count++;
}
return count;
}
int Universe::_setDescriptors() {
//Reset all file descriptors
FD_ZERO(&sfdread_);
......
......@@ -23,7 +23,7 @@ TEST_CASE("Universe::connect()", "[net]") {
auto p = b.connect("tcp://127.0.0.1:7077");
REQUIRE( p );
while (!p->isConnected()) sleep_for(milliseconds(20));
p->waitConnection();
REQUIRE( a.numberOfPeers() == 1 );
REQUIRE( b.numberOfPeers() == 1 );
......@@ -33,7 +33,7 @@ TEST_CASE("Universe::connect()", "[net]") {
auto p = b.connect("tcp://localhost:7077");
REQUIRE( p );
while (!p->isConnected()) sleep_for(milliseconds(20));
p->waitConnection();
REQUIRE( a.numberOfPeers() == 1 );
REQUIRE( b.numberOfPeers() == 1 );
......@@ -95,8 +95,7 @@ TEST_CASE("Universe::broadcast()", "[net]") {
}
SECTION("no arguments to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
b.connect("tcp://localhost:7077")->waitConnection();
bool done = false;
a.bind("hello", [&done]() {
......@@ -111,8 +110,7 @@ TEST_CASE("Universe::broadcast()", "[net]") {
}
SECTION("one argument to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
b.connect("tcp://localhost:7077")->waitConnection();
int done = 0;
a.bind("hello", [&done](int v) {
......@@ -129,9 +127,8 @@ TEST_CASE("Universe::broadcast()", "[net]") {
SECTION("one argument to two peers") {
Universe c;
b.connect("tcp://localhost:7077");
c.connect("tcp://localhost:7077");
while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
b.connect("tcp://localhost:7077")->waitConnection();
c.connect("tcp://localhost:7077")->waitConnection();
int done1 = 0;
b.bind("hello", [&done1](int v) {
......@@ -156,8 +153,7 @@ TEST_CASE("Universe::findOwner()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
b.connect("tcp://localhost:7077")->waitConnection();
SECTION("no owners exist") {
REQUIRE( !b.findOwner("ftl://test") );
......@@ -170,9 +166,8 @@ TEST_CASE("Universe::findOwner()", "") {
SECTION("three peers and one owner") {
Universe c;
c.connect("tcp://localhost:7077");
c.connect("tcp://localhost:7077")->waitConnection();
b.setLocalID(ftl::UUID(7));
while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
b.createResource("ftl://test");
REQUIRE( *(a.findOwner("ftl://test")) == ftl::UUID(7) );
......@@ -180,9 +175,8 @@ TEST_CASE("Universe::findOwner()", "") {
SECTION("three peers and one owner (2)") {
Universe c;
c.connect("tcp://localhost:7077");
c.connect("tcp://localhost:7077")->waitConnection();
c.setLocalID(ftl::UUID(7));
while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
c.createResource("ftl://test");
auto r = a.findOwner("ftl://test");
......@@ -195,8 +189,7 @@ TEST_CASE("Universe::subscribe()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
b.connect("tcp://localhost:7077")->waitConnection();
SECTION("no resource exists") {
REQUIRE( !b.subscribe("ftl://test", []() {}) );
......@@ -214,8 +207,7 @@ TEST_CASE("Universe::publish()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
b.connect("tcp://localhost:7077")->waitConnection();
SECTION("no subscribers") {
a.createResource("ftl://test");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment