Skip to content
Snippets Groups Projects
Commit 0bbe1530 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Add the remote close reconnect test

parent 87fe1ada
No related branches found
No related tags found
No related merge requests found
......@@ -107,6 +107,8 @@ class Node {
unsigned int localID();
int connectionCount() const;
protected:
ftl::net::PeerPtr peer_;
};
......
......@@ -69,6 +69,7 @@ class Self {
std::shared_ptr<ftl::protocol::Node> getNode(const ftl::UUID &pid) const;
/** get webservice peer pointer, returns nullptr if not connected to webservice */
std::shared_ptr<ftl::protocol::Node> getWebService() const;
std::list<std::shared_ptr<ftl::protocol::Node>> getNodes() const;
ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&);
ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&);
......
......@@ -59,3 +59,7 @@ void Node::noReconnect() {
unsigned int Node::localID() {
return peer_->localID();
}
int Node::connectionCount() const {
return peer_->connectionCount();
}
......@@ -104,6 +104,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) {
_send_handshake();
}
++connection_count_;
net_->_notifyConnect(this);
}
}
......@@ -160,9 +161,6 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) :
}
void Peer::_connect() {
dbg_recv_begin_ctr_ = 0;
dbg_recv_end_ctr_ = 0;
sock_ = ftl::net::internal::createConnection(uri_); // throws on bad uri
_set_socket_options();
sock_->connect(uri_); // throws on error
......@@ -205,7 +203,7 @@ void Peer::rawClose() {
void Peer::close(bool retry) {
// Attempt to inform about disconnect
if (sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); }
if (!retry && sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); }
UNIQUE_LOCK(send_mtx_, lk_send);
//UNIQUE_LOCK(recv_mtx_, lk_recv);
......
......@@ -191,6 +191,8 @@ class Peer {
inline void noReconnect() { can_reconnect_ = false; }
inline unsigned int localID() const { return local_id_; }
int connectionCount() const { return connection_count_; }
public:
static const int kMaxMessage = 10*1024*1024; // 10Mb currently
......@@ -258,21 +260,20 @@ private: // Functions
const bool outgoing_;
unsigned int local_id_;
ftl::URI uri_; // Original connection URI, or assumed URI
ftl::UUID peerid_; // Received in handshake or allocated
ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting..
uint32_t version_; // Received protocol version in handshake
bool can_reconnect_; // Client connections can retry
ftl::net::Universe *net_; // Origin net universe
ftl::URI uri_; // Original connection URI, or assumed URI
ftl::UUID peerid_; // Received in handshake or allocated
ftl::protocol::NodeStatus status_; // Connected, errored, reconnecting..
uint32_t version_; // Received protocol version in handshake
bool can_reconnect_; // Client connections can retry
ftl::net::Universe *net_; // Origin net universe
std::unique_ptr<internal::SocketConnection> sock_;
std::unique_ptr<ftl::net::Dispatcher> disp_; // For RPC call dispatch
std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
// debug variables, see comments for data() in peer.cpp for details
std::atomic_uint64_t dbg_recv_begin_ctr_ = 0;
std::atomic_uint64_t dbg_recv_end_ctr_ = 0;
std::atomic_int job_count_ = 0;
std::atomic_int job_count_ = 0; // Ensure threads are done before destructing
std::atomic_int connection_count_ = 0; // Number of successful connections total
std::atomic_int retry_count_ = 0; // Current number of reconnection attempts
// reconnect when clean disconnect received from remote
bool reconnect_on_remote_disconnect_ = true;
......
......@@ -78,6 +78,15 @@ std::shared_ptr<ftl::protocol::Node> Self::getWebService() const {
return std::make_shared<ftl::protocol::Node>(universe_->getWebService());
}
std::list<std::shared_ptr<ftl::protocol::Node>> Self::getNodes() const {
std::list<std::shared_ptr<ftl::protocol::Node>> result;
auto peers = universe_->getPeers();
std::transform(peers.begin(), peers.end(), std::back_inserter(result), [](const ftl::net::PeerPtr &ptr) {
return std::make_shared<ftl::protocol::Node>(ptr);
});
return result;
}
ftl::Handle Self::onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) {
return universe_->onConnect([cb](const ftl::net::PeerPtr &p) {
return cb(std::make_shared<ftl::protocol::Node>(p));
......
......@@ -419,6 +419,13 @@ PeerPtr Universe::getWebService() const {
return (it != peers_.end()) ? *it : nullptr;
}
std::list<PeerPtr> Universe::getPeers() const {
SHARED_LOCK(net_mutex_,lk);
std::list<PeerPtr> result;
std::copy_if(peers_.begin(), peers_.end(), std::back_inserter(result), [](const PeerPtr &ptr){ return !!ptr; });
return result;
}
void Universe::_periodic() {
LOG(INFO) << "PERIODIC " << reconnects_.size();
auto i = reconnects_.begin();
......@@ -571,14 +578,14 @@ void Universe::_run() {
const auto &fdstruct = impl_->pollfds[impl_->idMap[sock]];
/*if (fdstruct.revents & POLLERR) {
if (fdstruct.revents & POLLERR) {
if (s->socketError()) {
//lk.unlock();
s->close();
//lk.lock();
continue; // No point in reading data...
}
}*/
}
//If message received from this client then deal with it
if (fdstruct.revents & POLLIN) {
//lk.unlock();
......
......@@ -106,6 +106,8 @@ public:
/** get webservice peer pointer, returns nullptr if not connected to webservice */
PeerPtr getWebService() const;
std::list<PeerPtr> getPeers() const;
/**
* Bind a function to an RPC or service call name. This will implicitely
* be called by any peer making the request.
......
......@@ -63,8 +63,8 @@ TEST_CASE("Listen and Connect", "[net]") {
}
REQUIRE(throws);
}
/*SECTION("automatic reconnect, after clean disconnect") {
SECTION("automatic reconnect from originating connection") {
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
......@@ -73,24 +73,15 @@ TEST_CASE("Listen and Connect", "[net]") {
auto p_connecting = ftl::connectNode(uri);
REQUIRE(p_connecting);
bool disconnected_once = false;
auto h = self->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) {
if (!disconnected_once) {
// remote closes on first connection
disconnected_once = true;
p_listening->close(true);
cv.notify_one();
}
return true;
});
REQUIRE(p_connecting->waitConnection(5));
p_connecting->close(true);
REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout);
REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected);
REQUIRE(p_connecting->waitConnection(5));
}*/
}
SECTION("automatic reconnect from originating connection") {
SECTION("automatic reconnect from remote termination") {
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
......@@ -101,10 +92,16 @@ TEST_CASE("Listen and Connect", "[net]") {
REQUIRE(p_connecting);
REQUIRE(p_connecting->waitConnection(5));
p_connecting->close(true);
REQUIRE(p_connecting->connectionCount() == 1);
auto nodes = self->getNodes();
REQUIRE( nodes.size() == 1 );
for (auto &node : nodes) {
node->close();
}
REQUIRE(p_connecting->status() != ftl::protocol::NodeStatus::kConnected);
REQUIRE(p_connecting->waitConnection(5));
bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() == 2; });
REQUIRE( r );
}
ftl::protocol::reset();
......@@ -125,7 +122,7 @@ TEST_CASE("Self::onConnect()", "[net]") {
return true;
});
REQUIRE( ftl::connectNode(uri)->waitConnection(5) );
REQUIRE( ftl::connectNode(uri) );
bool result = try_for(20, [&done]{ return done; });
REQUIRE( result );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment