diff --git a/include/ftl/handle.hpp b/include/ftl/handle.hpp index 079f310705b45807073096b91803141d00e3fa55..bbcaea46b87925345faaa519966d5b76cd6a5257 100644 --- a/include/ftl/handle.hpp +++ b/include/ftl/handle.hpp @@ -88,7 +88,7 @@ struct Handler : BaseHandler { Handler() {} ~Handler() { // Ensure all thread pool jobs are done - while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); } /** @@ -109,12 +109,19 @@ struct Handler : BaseHandler { * return true. */ void trigger(ARGS ...args) { + bool hadFault = false; std::unique_lock<std::mutex> lk(mutex_); for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) { - bool keep = i->second(std::forward<ARGS>(args)...); + bool keep = true; + try { + keep = i->second(std::forward<ARGS>(args)...); + } catch(...) { + hadFault = true; + } if (!keep) i = callbacks_.erase(i); else ++i; } + if (hadFault) throw FTL_Error("Callback exception"); } /** @@ -122,13 +129,22 @@ struct Handler : BaseHandler { * single thread, not in parallel. */ void triggerAsync(ARGS ...args) { + ++jobs_; ftl::pool.push([this, args...](int id) { + bool hadFault = false; std::unique_lock<std::mutex> lk(mutex_); for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) { - bool keep = i->second(std::forward<ARGS>(args)...); + bool keep = true; + try { + keep = i->second(std::forward<ARGS>(args)...); + } catch (...) { + hadFault = true; + } if (!keep) i = callbacks_.erase(i); else ++i; } + --jobs_; + if (hadFault) throw FTL_Error("Callback exception"); }); } @@ -163,13 +179,17 @@ struct Handler : BaseHandler { callbacks_.erase(h.id()); } // Make sure any possible call to removed callback has finished. - while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); } void removeUnsafe(const Handle &h) override { callbacks_.erase(h.id()); // Make sure any possible call to removed callback has finished. - while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + while (jobs_ > 0 && ftl::pool.size() > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + + void clear() { + callbacks_.clear(); } private: diff --git a/include/ftl/protocol/node.hpp b/include/ftl/protocol/node.hpp index 061de515125c6ffb93ba3fb7c263ee77bd7c8636..ef67ea05a8d63828ee6a614ca28614a11bbc2ce7 100644 --- a/include/ftl/protocol/node.hpp +++ b/include/ftl/protocol/node.hpp @@ -57,7 +57,7 @@ class Node { * * @return True if all connections were successful, false if timeout or error. */ - bool waitConnection(); + bool waitConnection(int seconds = 1); /** * Make a reconnect attempt. Called internally by Universe object. diff --git a/src/node.cpp b/src/node.cpp index ceb4bb520e1acb796e70a1334e564e50cd90156e..d647de0d3970709e69485b4d206836f6f937dec2 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -16,8 +16,8 @@ bool Node::isConnected() const { return peer_->isConnected(); } -bool Node::waitConnection() { - return peer_->waitConnection(); +bool Node::waitConnection(int s) { + return peer_->waitConnection(s); } bool Node::reconnect() { diff --git a/src/peer.cpp b/src/peer.cpp index 549b2d97ac907e45ffc3baaff92726a1ab323094..465cc7d49207bbeb9a2a8b8dae3d6b62532cf223 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -40,7 +40,6 @@ using ftl::protocol::NodeStatus; using ftl::protocol::NodeType; std::atomic_int Peer::rpcid__ = 0; -std::atomic_int Peer::local_peer_ids__ = 0; int Peer::_socket() const { if (sock_->is_valid()) { @@ -105,10 +104,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) { _send_handshake(); } - // Ensure handlers called later or in new thread - ftl::pool.push([this](int id) { - net_->_notifyConnect(this); - }); + net_->_notifyConnect(this); } } @@ -130,7 +126,7 @@ void Peer::_bind_rpc() { } Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatcher* d) : - is_waiting_(true), outgoing_(false), local_id_(local_peer_ids__++), + is_waiting_(true), outgoing_(false), local_id_(0), uri_("0"), status_(NodeStatus::kConnecting), can_reconnect_(false), net_(u), sock_(std::move(s)) { @@ -144,10 +140,11 @@ Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatche _bind_rpc(); _send_handshake(); + ++net_->peer_instances_; } Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : - outgoing_(true), local_id_(local_peer_ids__++), uri_(uri), + outgoing_(true), local_id_(0), uri_(uri), status_(NodeStatus::kInvalid), can_reconnect_(true), net_(u) { /* Outgoing connection constructor */ @@ -159,6 +156,7 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : _bind_rpc(); _connect(); + ++net_->peer_instances_; } void Peer::_connect() { @@ -216,7 +214,7 @@ void Peer::close(bool retry) { } void Peer::_close(bool retry) { - if (status_ == NodeStatus::kDisconnected) return; + if (status_ != NodeStatus::kConnected && status_ != NodeStatus::kConnecting) return; status_ = NodeStatus::kDisconnected; if (sock_->is_valid()) { @@ -410,6 +408,7 @@ bool Peer::_data() { if (status_ == NodeStatus::kConnecting) { LOG(ERROR) << "failed to get handshake"; close(reconnect_on_protocol_error_); + lk.lock(); return false; } } else { @@ -431,6 +430,8 @@ bool Peer::_data() { return false; } } + } else { + lk.unlock(); } } @@ -501,7 +502,7 @@ void Peer::_waitCall(int id, std::condition_variable &cv, bool &hasreturned, con } } -bool Peer::waitConnection() { +bool Peer::waitConnection(int s) { if (status_ == NodeStatus::kConnected) return true; else if (status_ != NodeStatus::kConnecting) return false; @@ -516,7 +517,7 @@ bool Peer::waitConnection() { return true; }); - cv.wait_for(lk, seconds(1), [this]() { return status_ == NodeStatus::kConnected;}); + cv.wait_for(lk, seconds(s), [this]() { return status_ == NodeStatus::kConnected;}); return status_ == NodeStatus::kConnected; } @@ -553,6 +554,7 @@ int Peer::_send() { } Peer::~Peer() { + --net_->peer_instances_; { UNIQUE_LOCK(send_mtx_,lk1); UNIQUE_LOCK(recv_mtx_,lk2); @@ -560,7 +562,8 @@ Peer::~Peer() { } // Prevent deletion if there are any jobs remaining - while (job_count_ > 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + while (job_count_ > 0 && ftl::pool.size() > 0) { + LOG(INFO) << "waiting for peer jobs..."; + std::this_thread::sleep_for(std::chrono::milliseconds(2)); } } diff --git a/src/peer.hpp b/src/peer.hpp index ab8e2c66d0624858537adf23421e6ef6c0c2fdf1..4cdd1747eee2819f3fc37e86b61c154a007da5c0 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -48,7 +48,7 @@ struct virtual_caller { template <typename T> struct caller : virtual_caller { - explicit caller(std::function<void(const T&)> &f) : f_(f) {}; + explicit caller(const std::function<void(const T&)> &f) : f_(f) {}; void operator()(msgpack::object &o) override { T r = o.as<T>(); f_(r); }; std::function<void(const T&)> f_; }; @@ -87,7 +87,7 @@ class Peer { * * @return True if all connections were successful, false if timeout or error. */ - bool waitConnection(); + bool waitConnection(int seconds = 1); /** * Make a reconnect attempt. Called internally by Universe object. @@ -121,6 +121,8 @@ class Peer { * the same as the initial connection string on the client. */ std::string getURI() const { return uri_.to_string(); }; + + const ftl::URI &getURIObject() const { return uri_; } /** * Get the UUID for this peer. @@ -256,7 +258,7 @@ private: // Functions RECURSIVE_MUTEX cb_mtx_; const bool outgoing_; - const unsigned int local_id_; + unsigned int local_id_; ftl::URI uri_; // Original connection URI, or assumed URI ftl::UUID peerid_; // Received in handshake or allocated ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting.. @@ -281,7 +283,6 @@ private: // Functions bool reconnect_on_protocol_error_ = false; static std::atomic_int rpcid__; // Return ID for RPC calls - static std::atomic_int local_peer_ids__; }; // --- Inline Template Implementations ----------------------------------------- diff --git a/src/protocol.cpp b/src/protocol.cpp index d30058e3ec2a4a230a8bd09d6f1134106d7e0888..60091c0ef7d77eb26d8d60afbca273890d642bfe 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -4,7 +4,8 @@ static std::shared_ptr<ftl::net::Universe> universe; -ctpl::thread_pool ftl::pool(std::thread::hardware_concurrency()*2); +//ctpl::thread_pool ftl::pool(std::thread::hardware_concurrency()*2); +ctpl::thread_pool ftl::pool(4); void ftl::protocol::reset() { universe.reset(); diff --git a/src/protocol/connection.cpp b/src/protocol/connection.cpp index 172d996b92570f529a17eed1853f520cd67cf279..121048aa02d167fff040dc7c452bcd4f0ee31456 100644 --- a/src/protocol/connection.cpp +++ b/src/protocol/connection.cpp @@ -217,6 +217,7 @@ bool SocketServer::bind(int backlog) { } bool SocketServer::close() { + is_listening_ = false; return sock_.close(); } diff --git a/src/universe.cpp b/src/universe.cpp index 71505713e943540dbedf8bfb7c59751680918a16..539d3650680bd3d24a3640205a82ecbb56e215e0 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -75,9 +75,10 @@ Universe::Universe() : active_(true), this_peer(ftl::protocol::id), impl_(new ftl::net::NetImplDetail()), + peers_(10), phase_(0), periodic_time_(1.0), - reconnect_attempts_(50), + reconnect_attempts_(5), thread_(Universe::__start, this) { _installBindings(); @@ -99,16 +100,15 @@ Universe::Universe() : } return true; });*/ - - if (instance_ != nullptr) LOG(FATAL) << "Multiple net instances"; - //instance_ = this; } Universe::~Universe() { shutdown(); + CHECK(peer_instances_ == 0); } size_t Universe::getSendBufferSize(ftl::URI::scheme_t s) { + // TODO: Allow these to be configured again. switch(s) { case ftl::URI::scheme_t::SCHEME_WS: case ftl::URI::scheme_t::SCHEME_WSS: @@ -159,7 +159,11 @@ void Universe::shutdown() { LOG(INFO) << "Cleanup Network ..."; { - UNIQUE_LOCK(net_mutex_, lk); + SHARED_LOCK(net_mutex_, lk); + + for (auto &l : listeners_) { + l->close(); + } for (auto &s : peers_) { if (s) s->rawClose(); @@ -168,12 +172,6 @@ void Universe::shutdown() { active_ = false; thread_.join(); - - for (auto &l : listeners_) { - l->close(); - } - - listeners_.clear(); } bool Universe::listen(const ftl::URI &addr) { @@ -181,9 +179,12 @@ bool Universe::listen(const ftl::URI &addr) { auto l = create_listener(addr); l->bind(); - UNIQUE_LOCK(net_mutex_,lk); - LOG(INFO) << "listening on " << l->uri().to_string(); - listeners_.push_back(std::move(l)); + { + UNIQUE_LOCK(net_mutex_,lk); + LOG(INFO) << "listening on " << l->uri().to_string(); + listeners_.push_back(std::move(l)); + } + socket_cv_.notify_one(); return true; } catch (const std::exception &ex) { @@ -209,6 +210,24 @@ bool Universe::isConnected(const std::string &s) { return isConnected(uri); } +void Universe::_insertPeer(const std::shared_ptr<Peer> &ptr) { + UNIQUE_LOCK(net_mutex_,lk); + for (size_t i=0; i<peers_.size(); ++i) { + if (!peers_[i]) { + ++connection_count_; + peers_[i] = ptr; + peer_by_uri_[ptr->getURIObject().getBaseURI()] = i; + peer_ids_[ptr->id()] = i; + ptr->local_id_ = i; + + lk.unlock(); + socket_cv_.notify_one(); + return; + } + } + throw FTL_Error("Too many connections"); +} + std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) { // Check if already connected or if self (when could this happen?) @@ -228,9 +247,7 @@ std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) { auto p = std::make_shared<Peer>(u, this, &disp_); if (p->status() != NodeStatus::kInvalid) { - UNIQUE_LOCK(net_mutex_,lk); - peers_.push_back(p); - peer_by_uri_[u.getBaseURI()] = peers_.size() - 1; + _insertPeer(p); } else { LOG(ERROR) << "Peer in invalid state"; @@ -251,11 +268,9 @@ void Universe::unbind(const std::string &name) { int Universe::waitConnections() { SHARED_LOCK(net_mutex_, lk); - int count = 0; - for (auto p : peers_) { - if (p->waitConnection()) count++; - } - return count; + return std::count_if(peers_.begin(), peers_.end(), [](const auto &p) { + return p && p->waitConnection(); + }); } socket_t Universe::_setDescriptors() { @@ -265,8 +280,7 @@ socket_t Universe::_setDescriptors() { socket_t n = 0; - // TODO: Shared lock for some of the time... - UNIQUE_LOCK(net_mutex_, lk); + SHARED_LOCK(net_mutex_, lk); //Set file descriptor for the listening sockets. for (auto &l : listeners_) { @@ -277,8 +291,10 @@ socket_t Universe::_setDescriptors() { } } + // FIXME: Bug, it crashes here sometimes, segfault on reading the shared_ptr + //Set the file descriptors for each client - for (auto s : peers_) { + for (const auto &s : peers_) { // NOTE: s->isValid() should return true only and only if a valid OS // socket exists. @@ -288,7 +304,6 @@ socket_t Universe::_setDescriptors() { FD_SET(s->_socket(), &impl_->sfderror_); } } - _cleanupPeers(); return n; } @@ -301,37 +316,48 @@ void Universe::_installBindings() { } -// Note: should be called inside a net lock +void Universe::_removePeer(std::shared_ptr<Peer> &p) { + UNIQUE_LOCK(net_mutex_, ulk); + + if (p && (!p->isValid() || + p->status() == NodeStatus::kReconnecting || + p->status() == NodeStatus::kDisconnected)) { + + LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); + on_disconnect_.triggerAsync(p); + + auto ix = peer_ids_.find(p->id()); + if (ix != peer_ids_.end()) peer_ids_.erase(ix); + + for (auto j=peer_by_uri_.begin(); j != peer_by_uri_.end(); ++j) { + if (peers_[j->second] == p) { + peer_by_uri_.erase(j); + break; + } + } + + if (p->status() == NodeStatus::kReconnecting) { + reconnects_.push_back({reconnect_attempts_, 1.0f, p}); + } else { + garbage_.push_back(p); + } + + --connection_count_; + p.reset(); + } +} + void Universe::_cleanupPeers() { + SHARED_LOCK(net_mutex_, lk); auto i = peers_.begin(); while (i != peers_.end()) { auto &p = *i; if (p && (!p->isValid() || p->status() == NodeStatus::kReconnecting || p->status() == NodeStatus::kDisconnected)) { - - LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); - _notifyDisconnect(p.get()); - - auto ix = peer_ids_.find(p->id()); - if (ix != peer_ids_.end()) peer_ids_.erase(ix); - - for (auto j=peer_by_uri_.begin(); j != peer_by_uri_.end(); ++j) { - if (peers_[j->second] == p) { - peer_by_uri_.erase(j); - break; - } - } - - //i = peers_.erase(i); - - if (p->status() == NodeStatus::kReconnecting) { - reconnects_.push_back({reconnect_attempts_, 1.0f, p}); - } else { - garbage_.push_back(p); - } - - p.reset(); + lk.unlock(); + _removePeer(p); + lk.lock(); } else { i++; } @@ -347,12 +373,10 @@ std::shared_ptr<Peer> Universe::getPeer(const UUID &id) const { std::shared_ptr<Peer> Universe::getWebService() const { SHARED_LOCK(net_mutex_,lk); - for (const auto &p : peers_) { - if (p->getType() == NodeType::kWebService) { - return p; - } - } - return nullptr; + auto it = std::find_if(peers_.begin(), peers_.end(), [](const auto &p) { + return p && p->getType() == NodeType::kWebService; + }); + return (it != peers_.end()) ? *it : nullptr; } void Universe::_periodic() { @@ -383,8 +407,7 @@ void Universe::_periodic() { } if ((*i).peer->reconnect()) { - UNIQUE_LOCK(net_mutex_,lk); - peers_.push_back((*i).peer); + _insertPeer((*i).peer); i = reconnects_.erase(i); } else if ((*i).tries > 0) { @@ -416,6 +439,8 @@ void Universe::_run() { SOCKET n = _setDescriptors(); int selres = 1; + _cleanupPeers(); + // Do periodics auto now = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = now - start; @@ -426,7 +451,9 @@ void Universe::_run() { // It is an error to use "select" with no sockets ... so just sleep if (n == 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(300)); + std::shared_lock lk(net_mutex_); + socket_cv_.wait_for(lk, std::chrono::milliseconds(300), [this](){ return listeners_.size() > 0 || connection_count_ > 0; }); + //std::this_thread::sleep_for(std::chrono::milliseconds(100)); continue; } @@ -455,20 +482,22 @@ void Universe::_run() { //If connection request is waiting for (auto &l : listeners_) { - if (l && l->is_listening()) { - if (FD_ISSET(l->fd(), &(impl_->sfdread_))) { - lk.unlock(); - try { - UNIQUE_LOCK(net_mutex_,ulk); - auto csock = l->accept(); - auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); - peers_.push_back(p); - - } catch (const std::exception &ex) { - LOG(ERROR) << "Connection failed: " << ex.what(); - } - lk.lock(); + if (l && l->is_listening() && FD_ISSET(l->fd(), &(impl_->sfdread_))) { + std::unique_ptr<ftl::net::internal::SocketConnection> csock; + try { + csock = l->accept(); + } catch (const std::exception &ex) { + LOG(ERROR) << "Connection failed: " << ex.what(); + } + + lk.unlock(); + + if (csock) { + auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); + _insertPeer(p); } + + lk.lock(); } } @@ -507,6 +536,7 @@ void Universe::_run() { peers_.clear(); peer_by_uri_.clear(); peer_ids_.clear(); + listeners_.clear(); } garbage_.clear(); @@ -524,35 +554,28 @@ ftl::Handle Universe::onError(const std::function<bool(const std::shared_ptr<Pee return on_error_.on(cb); } -static std::shared_ptr<Peer> findPeer(const std::vector<std::shared_ptr<Peer>> &peers, const Peer *p) { - for (const auto &pp : peers) { +std::shared_ptr<Peer> Universe::_findPeer(const Peer *p) { + SHARED_LOCK(net_mutex_,lk); + for (const auto &pp : peers_) { if (pp.get() == p) return pp; } return nullptr; } void Universe::_notifyConnect(Peer *p) { - UNIQUE_LOCK(handler_mutex_,lk); - const auto ptr = findPeer(peers_, p); + const auto ptr = _findPeer(p); - peer_ids_[ptr->id()] = ptr->localID(); + // The peer could have been removed from valid peers already. + if (!ptr) return; - try { - on_connect_.trigger(ptr); - } catch(const std::exception &e) { - LOG(ERROR) << "Exception inside OnConnect hander: " << e.what(); - } + on_connect_.triggerAsync(ptr); } void Universe::_notifyDisconnect(Peer *p) { - UNIQUE_LOCK(handler_mutex_,lk); - const auto ptr = findPeer(peers_, p); + const auto ptr = _findPeer(p); + if (!ptr) return; - try { - on_disconnect_.trigger(ptr); - } catch(const std::exception &e) { - LOG(ERROR) << "Exception inside OnDisconnect hander: " << e.what(); - } + on_disconnect_.triggerAsync(ptr); } void Universe::_notifyError(Peer *p, const ftl::net::Error &e) { diff --git a/src/universe.hpp b/src/universe.hpp index dd15b16e03c24b63acaf392cdaacad5781c0bea4..9b8fb0dc4c8c3e93a7d5428f7cd6191f15a93c27 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -92,7 +92,7 @@ public: bool isConnected(const ftl::URI &uri); bool isConnected(const std::string &s); - size_t numberOfPeers() const { return peers_.size(); } + size_t numberOfPeers() const { return connection_count_; } /** * Will block until all currently registered connnections have completed. @@ -181,13 +181,16 @@ private: void _notifyDisconnect(ftl::net::Peer *); void _notifyError(ftl::net::Peer *, const ftl::net::Error &); void _periodic(); + std::shared_ptr<ftl::net::Peer> _findPeer(const ftl::net::Peer *p); + void _removePeer(std::shared_ptr<Peer> &p); + void _insertPeer(const std::shared_ptr<ftl::net::Peer> &ptr); static void __start(Universe *u); bool active_; ftl::UUID this_peer; mutable SHARED_MUTEX net_mutex_; - RECURSIVE_MUTEX handler_mutex_; + std::condition_variable_any socket_cv_; std::unique_ptr<NetImplDetail> impl_; @@ -206,6 +209,8 @@ private: size_t recv_size_; double periodic_time_; int reconnect_attempts_; + std::atomic_int connection_count_ = 0; // Active connections + std::atomic_int peer_instances_ = 0; // Actual peers dependent on Universe ftl::Handler<const std::shared_ptr<ftl::net::Peer>&> on_connect_; ftl::Handler<const std::shared_ptr<ftl::net::Peer>&> on_disconnect_; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index f4c8cf98791f6da6392aad46c91ef26a3f91ddf6..f6dd15f3c01fe2f3c61236f6b0c6408b3db2d8fc 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -24,7 +24,8 @@ add_executable(handle_unit ) target_include_directories(handle_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") target_link_libraries(handle_unit beyond-protocol - Threads::Threads ${OS_LIBS}) + Threads::Threads ${OS_LIBS} + ${URIPARSER_LIBRARIES}) add_test(HandleUnitTest handle_unit) diff --git a/test/net_integration.cpp b/test/net_integration.cpp index 7901256ca4dd5b39d7e5d2007396742c93119d32..e30c0aa80dd7ffcd69110b5e86ad9fe26d7aebdb 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -37,7 +37,7 @@ TEST_CASE("Listen and Connect", "[net]") { auto p = ftl::createNode(uri); REQUIRE( p ); - p->waitConnection(); + REQUIRE( p->waitConnection(5) ); REQUIRE( self->numberOfNodes() == 1 ); REQUIRE( ftl::getSelf()->numberOfNodes() == 1); @@ -48,7 +48,7 @@ TEST_CASE("Listen and Connect", "[net]") { auto p = ftl::createNode(uri); REQUIRE( p ); - p->waitConnection(); + REQUIRE( p->waitConnection(5) ); REQUIRE( self->numberOfNodes() == 1 ); REQUIRE( ftl::getSelf()->numberOfNodes() == 1); @@ -143,7 +143,7 @@ TEST_CASE("Self::onConnect()", "[net]") { return true; }); - auto n = ftl::createNode(uri)->waitConnection(); + REQUIRE( ftl::createNode(uri)->waitConnection(5) ); bool result = try_for(20, [&done]{ return done; }); REQUIRE( result ); @@ -157,7 +157,7 @@ TEST_CASE("Self::onConnect()", "[net]") { return true; }); - auto n = ftl::createNode(uri)->waitConnection(); + REQUIRE( ftl::createNode(uri)->waitConnection(5) ); REQUIRE( done ); }