diff --git a/include/ftl/protocol/node.hpp b/include/ftl/protocol/node.hpp index 1df000e566e9179b940631010e7b2025c8c13c90..e34278ebc9cb1bde0f5a5fc0966321c88e2e5be4 100644 --- a/include/ftl/protocol/node.hpp +++ b/include/ftl/protocol/node.hpp @@ -107,6 +107,8 @@ class Node { unsigned int localID(); + int connectionCount() const; + protected: ftl::net::PeerPtr peer_; }; diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp index 1eab3c9e44dcaf7c3509b02d4b0a9804938033d6..c8918cdeef5b3db31b306fa8a14c5669fe70c733 100644 --- a/include/ftl/protocol/self.hpp +++ b/include/ftl/protocol/self.hpp @@ -69,6 +69,7 @@ class Self { std::shared_ptr<ftl::protocol::Node> getNode(const ftl::UUID &pid) const; /** get webservice peer pointer, returns nullptr if not connected to webservice */ std::shared_ptr<ftl::protocol::Node> getWebService() const; + std::list<std::shared_ptr<ftl::protocol::Node>> getNodes() const; ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); diff --git a/src/node.cpp b/src/node.cpp index 60f5c64dd29d9d47489156a8c7838066d62b18a4..445a5041809f7e82b575dfc4c6f0be158f6d6428 100644 --- a/src/node.cpp +++ b/src/node.cpp @@ -59,3 +59,7 @@ void Node::noReconnect() { unsigned int Node::localID() { return peer_->localID(); } + +int Node::connectionCount() const { + return peer_->connectionCount(); +} diff --git a/src/peer.cpp b/src/peer.cpp index 83d722a4eb2084edb86f9ff617686193ca8efe8c..b5f004dfa943f4470b738c134f561cbda0006e48 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -104,6 +104,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) { _send_handshake(); } + ++connection_count_; net_->_notifyConnect(this); } } @@ -160,9 +161,6 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : } void Peer::_connect() { - dbg_recv_begin_ctr_ = 0; - dbg_recv_end_ctr_ = 0; - sock_ = ftl::net::internal::createConnection(uri_); // throws on bad uri _set_socket_options(); sock_->connect(uri_); // throws on error @@ -205,7 +203,7 @@ void Peer::rawClose() { void Peer::close(bool retry) { // Attempt to inform about disconnect - if (sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); } + if (!retry && sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); } UNIQUE_LOCK(send_mtx_, lk_send); //UNIQUE_LOCK(recv_mtx_, lk_recv); diff --git a/src/peer.hpp b/src/peer.hpp index 45fba380913e9ff7c37276165f4a1c88eb2f7bbb..38082ad9e861dcd2d39162a7d5c42da346f85ae6 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -191,6 +191,8 @@ class Peer { inline void noReconnect() { can_reconnect_ = false; } inline unsigned int localID() const { return local_id_; } + + int connectionCount() const { return connection_count_; } public: static const int kMaxMessage = 10*1024*1024; // 10Mb currently @@ -258,21 +260,20 @@ private: // Functions const bool outgoing_; unsigned int local_id_; - ftl::URI uri_; // Original connection URI, or assumed URI - ftl::UUID peerid_; // Received in handshake or allocated - ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting.. - uint32_t version_; // Received protocol version in handshake - bool can_reconnect_; // Client connections can retry - ftl::net::Universe *net_; // Origin net universe + ftl::URI uri_; // Original connection URI, or assumed URI + ftl::UUID peerid_; // Received in handshake or allocated + ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting.. + uint32_t version_; // Received protocol version in handshake + bool can_reconnect_; // Client connections can retry + ftl::net::Universe *net_; // Origin net universe std::unique_ptr<internal::SocketConnection> sock_; std::unique_ptr<ftl::net::Dispatcher> disp_; // For RPC call dispatch std::map<int, std::unique_ptr<virtual_caller>> callbacks_; - // debug variables, see comments for data() in peer.cpp for details - std::atomic_uint64_t dbg_recv_begin_ctr_ = 0; - std::atomic_uint64_t dbg_recv_end_ctr_ = 0; - std::atomic_int job_count_ = 0; + std::atomic_int job_count_ = 0; // Ensure threads are done before destructing + std::atomic_int connection_count_ = 0; // Number of successful connections total + std::atomic_int retry_count_ = 0; // Current number of reconnection attempts // reconnect when clean disconnect received from remote bool reconnect_on_remote_disconnect_ = true; diff --git a/src/self.cpp b/src/self.cpp index 672a5d801743315430f124807c0a8332aaafbd2e..85e49401143ad7f86f7bfbd55d7f5f060f9458ea 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -78,6 +78,15 @@ std::shared_ptr<ftl::protocol::Node> Self::getWebService() const { return std::make_shared<ftl::protocol::Node>(universe_->getWebService()); } +std::list<std::shared_ptr<ftl::protocol::Node>> Self::getNodes() const { + std::list<std::shared_ptr<ftl::protocol::Node>> result; + auto peers = universe_->getPeers(); + std::transform(peers.begin(), peers.end(), std::back_inserter(result), [](const ftl::net::PeerPtr &ptr) { + return std::make_shared<ftl::protocol::Node>(ptr); + }); + return result; +} + ftl::Handle Self::onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) { return universe_->onConnect([cb](const ftl::net::PeerPtr &p) { return cb(std::make_shared<ftl::protocol::Node>(p)); diff --git a/src/universe.cpp b/src/universe.cpp index 99fc3f3ce9c0b5aa4f43ef6145709575b0940c05..d7207fbf95af7af03196b0acd8c0d290d1e57d60 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -419,6 +419,13 @@ PeerPtr Universe::getWebService() const { return (it != peers_.end()) ? *it : nullptr; } +std::list<PeerPtr> Universe::getPeers() const { + SHARED_LOCK(net_mutex_,lk); + std::list<PeerPtr> result; + std::copy_if(peers_.begin(), peers_.end(), std::back_inserter(result), [](const PeerPtr &ptr){ return !!ptr; }); + return result; +} + void Universe::_periodic() { LOG(INFO) << "PERIODIC " << reconnects_.size(); auto i = reconnects_.begin(); @@ -571,14 +578,14 @@ void Universe::_run() { const auto &fdstruct = impl_->pollfds[impl_->idMap[sock]]; - /*if (fdstruct.revents & POLLERR) { + if (fdstruct.revents & POLLERR) { if (s->socketError()) { //lk.unlock(); s->close(); //lk.lock(); continue; // No point in reading data... } - }*/ + } //If message received from this client then deal with it if (fdstruct.revents & POLLIN) { //lk.unlock(); diff --git a/src/universe.hpp b/src/universe.hpp index f486b89de646805deb280ea077987e2b318f2ad1..a245b8fab043d800abcc82be36e80b310b2e6e36 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -106,6 +106,8 @@ public: /** get webservice peer pointer, returns nullptr if not connected to webservice */ PeerPtr getWebService() const; + std::list<PeerPtr> getPeers() const; + /** * Bind a function to an RPC or service call name. This will implicitely * be called by any peer making the request. diff --git a/test/net_integration.cpp b/test/net_integration.cpp index b96b538c91ec0fe452a44e371a3b011f045a809b..097f2def844165e98c38a0a32e057efd5cdb7dbe 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -63,8 +63,8 @@ TEST_CASE("Listen and Connect", "[net]") { } REQUIRE(throws); } - - /*SECTION("automatic reconnect, after clean disconnect") { + + SECTION("automatic reconnect from originating connection") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); @@ -73,24 +73,15 @@ TEST_CASE("Listen and Connect", "[net]") { auto p_connecting = ftl::connectNode(uri); REQUIRE(p_connecting); - - bool disconnected_once = false; - auto h = self->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { - if (!disconnected_once) { - // remote closes on first connection - disconnected_once = true; - p_listening->close(true); - cv.notify_one(); - } - return true; - }); + REQUIRE(p_connecting->waitConnection(5)); + p_connecting->close(true); - REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); + REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected); REQUIRE(p_connecting->waitConnection(5)); - }*/ + } - SECTION("automatic reconnect from originating connection") { + SECTION("automatic reconnect from remote termination") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); @@ -101,10 +92,16 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE(p_connecting); REQUIRE(p_connecting->waitConnection(5)); - p_connecting->close(true); + REQUIRE(p_connecting->connectionCount() == 1); + + auto nodes = self->getNodes(); + REQUIRE( nodes.size() == 1 ); + for (auto &node : nodes) { + node->close(); + } - REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected); - REQUIRE(p_connecting->waitConnection(5)); + bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() == 2; }); + REQUIRE( r ); } ftl::protocol::reset(); @@ -125,7 +122,7 @@ TEST_CASE("Self::onConnect()", "[net]") { return true; }); - REQUIRE( ftl::connectNode(uri)->waitConnection(5) ); + REQUIRE( ftl::connectNode(uri) ); bool result = try_for(20, [&done]{ return done; }); REQUIRE( result );