From 0bbe1530f3b98c6f3ff2f5c4e76d6dccf7c7a02a Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Wed, 11 May 2022 07:29:02 +0100 Subject: [PATCH] Add the remote close reconnect test --- include/ftl/protocol/node.hpp | 2 ++ include/ftl/protocol/self.hpp | 1 + src/node.cpp | 4 ++++ src/peer.cpp | 6 ++---- src/peer.hpp | 21 ++++++++++---------- src/self.cpp | 9 +++++++++ src/universe.cpp | 11 +++++++++-- src/universe.hpp | 2 ++ test/net_integration.cpp | 37 ++++++++++++++++------------------- 9 files changed, 57 insertions(+), 36 deletions(-) diff --git a/include/ftl/protocol/node.hpp b/include/ftl/protocol/node.hpp index 1df000e..e34278e 100644 --- a/include/ftl/protocol/node.hpp +++ b/include/ftl/protocol/node.hpp @@ -107,6 +107,8 @@ class Node { unsigned int localID(); + int connectionCount() const; + protected: ftl::net::PeerPtr peer_; }; diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp index 1eab3c9..c8918cd 100644 --- a/include/ftl/protocol/self.hpp +++ b/include/ftl/protocol/self.hpp @@ -69,6 +69,7 @@ class Self { std::shared_ptr<ftl::protocol::Node> getNode(const ftl::UUID &pid) const; /** get webservice peer pointer, returns nullptr if not connected to webservice */ std::shared_ptr<ftl::protocol::Node> getWebService() const; + std::list<std::shared_ptr<ftl::protocol::Node>> getNodes() const; ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); diff --git a/src/node.cpp b/src/node.cpp index 60f5c64..445a504 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -59,3 +59,7 @@ void Node::noReconnect() { unsigned int Node::localID() { return peer_->localID(); } + +int Node::connectionCount() const { + return peer_->connectionCount(); +} diff --git a/src/peer.cpp b/src/peer.cpp index 83d722a..b5f004d 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -104,6 +104,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) { _send_handshake(); } + ++connection_count_; net_->_notifyConnect(this); } } @@ -160,9 +161,6 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : } void Peer::_connect() { - dbg_recv_begin_ctr_ = 0; - dbg_recv_end_ctr_ = 0; - sock_ = ftl::net::internal::createConnection(uri_); // throws on bad uri _set_socket_options(); sock_->connect(uri_); // throws on error @@ -205,7 +203,7 @@ void Peer::rawClose() { void Peer::close(bool retry) { // Attempt to inform about disconnect - if (sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); } + if (!retry && sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); } UNIQUE_LOCK(send_mtx_, lk_send); //UNIQUE_LOCK(recv_mtx_, lk_recv); diff --git a/src/peer.hpp b/src/peer.hpp index 45fba38..38082ad 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -191,6 +191,8 @@ class Peer { inline void noReconnect() { can_reconnect_ = false; } inline unsigned int localID() const { return local_id_; } + + int connectionCount() const { return connection_count_; } public: static const int kMaxMessage = 10*1024*1024; // 10Mb currently @@ -258,21 +260,20 @@ private: // Functions const bool outgoing_; unsigned int local_id_; - ftl::URI uri_; // Original connection URI, or assumed URI - ftl::UUID peerid_; // Received in handshake or allocated - ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting.. - uint32_t version_; // Received protocol version in handshake - bool can_reconnect_; // Client connections can retry - ftl::net::Universe *net_; // Origin net universe + ftl::URI uri_; // Original connection URI, or assumed URI + ftl::UUID peerid_; // Received in handshake or allocated + ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting.. + uint32_t version_; // Received protocol version in handshake + bool can_reconnect_; // Client connections can retry + ftl::net::Universe *net_; // Origin net universe std::unique_ptr<internal::SocketConnection> sock_; std::unique_ptr<ftl::net::Dispatcher> disp_; // For RPC call dispatch std::map<int, std::unique_ptr<virtual_caller>> callbacks_; - // debug variables, see comments for data() in peer.cpp for details - std::atomic_uint64_t dbg_recv_begin_ctr_ = 0; - std::atomic_uint64_t dbg_recv_end_ctr_ = 0; - std::atomic_int job_count_ = 0; + std::atomic_int job_count_ = 0; // Ensure threads are done before destructing + std::atomic_int connection_count_ = 0; // Number of successful connections total + std::atomic_int retry_count_ = 0; // Current number of reconnection attempts // reconnect when clean disconnect received from remote bool reconnect_on_remote_disconnect_ = true; diff --git a/src/self.cpp b/src/self.cpp index 672a5d8..85e4940 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -78,6 +78,15 @@ std::shared_ptr<ftl::protocol::Node> Self::getWebService() const { return std::make_shared<ftl::protocol::Node>(universe_->getWebService()); } +std::list<std::shared_ptr<ftl::protocol::Node>> Self::getNodes() const { + std::list<std::shared_ptr<ftl::protocol::Node>> result; + auto peers = universe_->getPeers(); + std::transform(peers.begin(), peers.end(), std::back_inserter(result), [](const ftl::net::PeerPtr &ptr) { + return std::make_shared<ftl::protocol::Node>(ptr); + }); + return result; +} + ftl::Handle Self::onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) { return universe_->onConnect([cb](const ftl::net::PeerPtr &p) { return cb(std::make_shared<ftl::protocol::Node>(p)); diff --git a/src/universe.cpp b/src/universe.cpp index 99fc3f3..d7207fb 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -419,6 +419,13 @@ PeerPtr Universe::getWebService() const { return (it != peers_.end()) ? *it : nullptr; } +std::list<PeerPtr> Universe::getPeers() const { + SHARED_LOCK(net_mutex_,lk); + std::list<PeerPtr> result; + std::copy_if(peers_.begin(), peers_.end(), std::back_inserter(result), [](const PeerPtr &ptr){ return !!ptr; }); + return result; +} + void Universe::_periodic() { LOG(INFO) << "PERIODIC " << reconnects_.size(); auto i = reconnects_.begin(); @@ -571,14 +578,14 @@ void Universe::_run() { const auto &fdstruct = impl_->pollfds[impl_->idMap[sock]]; - /*if (fdstruct.revents & POLLERR) { + if (fdstruct.revents & POLLERR) { if (s->socketError()) { //lk.unlock(); s->close(); //lk.lock(); continue; // No point in reading data... } - }*/ + } //If message received from this client then deal with it if (fdstruct.revents & POLLIN) { //lk.unlock(); diff --git a/src/universe.hpp b/src/universe.hpp index f486b89..a245b8f 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -106,6 +106,8 @@ public: /** get webservice peer pointer, returns nullptr if not connected to webservice */ PeerPtr getWebService() const; + std::list<PeerPtr> getPeers() const; + /** * Bind a function to an RPC or service call name. This will implicitely * be called by any peer making the request. diff --git a/test/net_integration.cpp b/test/net_integration.cpp index b96b538..097f2de 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -63,8 +63,8 @@ TEST_CASE("Listen and Connect", "[net]") { } REQUIRE(throws); } - - /*SECTION("automatic reconnect, after clean disconnect") { + + SECTION("automatic reconnect from originating connection") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); @@ -73,24 +73,15 @@ TEST_CASE("Listen and Connect", "[net]") { auto p_connecting = ftl::connectNode(uri); REQUIRE(p_connecting); - - bool disconnected_once = false; - auto h = self->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { - if (!disconnected_once) { - // remote closes on first connection - disconnected_once = true; - p_listening->close(true); - cv.notify_one(); - } - return true; - }); + REQUIRE(p_connecting->waitConnection(5)); + p_connecting->close(true); - REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); + REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected); REQUIRE(p_connecting->waitConnection(5)); - }*/ + } - SECTION("automatic reconnect from originating connection") { + SECTION("automatic reconnect from remote termination") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); @@ -101,10 +92,16 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE(p_connecting); REQUIRE(p_connecting->waitConnection(5)); - p_connecting->close(true); + REQUIRE(p_connecting->connectionCount() == 1); + + auto nodes = self->getNodes(); + REQUIRE( nodes.size() == 1 ); + for (auto &node : nodes) { + node->close(); + } - REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected); - REQUIRE(p_connecting->waitConnection(5)); + bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() == 2; }); + REQUIRE( r ); } ftl::protocol::reset(); @@ -125,7 +122,7 @@ TEST_CASE("Self::onConnect()", "[net]") { return true; }); - REQUIRE( ftl::connectNode(uri)->waitConnection(5) ); + REQUIRE( ftl::connectNode(uri) ); bool result = try_for(20, [&done]{ return done; }); REQUIRE( result ); -- GitLab