From 2f2120f8b679c60c8b59b77f64532494e60283a7 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Thu, 6 Jun 2019 10:14:41 +0300 Subject: [PATCH] Net disconnect notifications --- components/net/cpp/include/ftl/net/peer.hpp | 20 ++- components/net/cpp/src/peer.cpp | 139 ++++---------------- components/net/cpp/src/universe.cpp | 2 +- 3 files changed, 41 insertions(+), 120 deletions(-) diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 6d28d3b84..cffb6362d 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -74,7 +74,10 @@ class Peer { /** * Close the peer if open. Setting retry parameter to true will initiate - * backoff retry attempts. + * backoff retry attempts. This is used to deliberately close a connection + * and not for error conditions where different close semantics apply. + * + * @param retry Should reconnection be attempted? */ void close(bool retry=false); @@ -83,10 +86,19 @@ class Peer { }; /** - * Block until the connection and handshake has completed. + * Block until the connection and handshake has completed. You should use + * onConnect callbacks instead of blocking, mostly this is intended for + * the unit tests to keep them synchronous. + * + * @return True if all connections were successful, false if timeout or error. */ bool waitConnection(); + /** + * Test if the connection is valid. This returns true in all conditions + * except where the socket has been disconnected permenantly or was never + * able to connect, perhaps due to an invalid address. + */ bool isValid() const { return status_ != kInvalid && sock_ != INVALID_SOCKET; }; @@ -116,6 +128,8 @@ class Peer { /** * Non-blocking Remote Procedure Call using a callback function. + * + * @return A call id for use with cancelCall() if needed. */ template <typename T, typename... ARGS> int asyncCall(const std::string &name, @@ -159,6 +173,8 @@ class Peer { void error(int e); bool _data(); + + void _badClose(bool retry=true); void _dispatchResponse(uint32_t id, msgpack::object &obj); void _sendResponse(uint32_t id, const msgpack::object &obj); diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index bb29691e5..ed732f1b9 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -151,7 +151,7 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { LOG(INFO) << "Handshake 2 received"; if (magic != ftl::net::kMagic) { - close(); + _badClose(false); LOG(ERROR) << "Invalid magic during handshake"; } else { status_ = kConnected; @@ -163,6 +163,10 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { } }); + bind("__disconnect__", [this]() { + _badClose(false); + }); + send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); } } @@ -186,7 +190,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { LOG(ERROR) << "Websocket connection failed"; - close(); + _badClose(false); } } else { LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); @@ -204,7 +208,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { LOG(INFO) << "Handshake 1 received"; if (magic != ftl::net::kMagic) { - close(); + _badClose(false); LOG(ERROR) << "Invalid magic during handshake"; } else { status_ = kConnected; @@ -247,6 +251,16 @@ void Peer::_updateURI() { } void Peer::close(bool retry) { + if (sock_ != INVALID_SOCKET) { + + // Attempt to inform about disconnect + send("__disconnect__"); + + _badClose(retry); + } +} + +void Peer::_badClose(bool retry) { if (sock_ != INVALID_SOCKET) { #ifndef WIN32 ::close(sock_); @@ -257,6 +271,7 @@ void Peer::close(bool retry) { status_ = kDisconnected; // Attempt auto reconnect? + if (retry) LOG(INFO) << "Should attempt reconnect..."; //auto i = find(sockets.begin(),sockets.end(),this); //sockets.erase(i); @@ -330,12 +345,12 @@ bool Peer::_data() { obj.convert(hs); if (get<1>(hs) != "__handshake__") { - close(); + _badClose(false); LOG(ERROR) << "Missing handshake"; return false; } } catch(...) { - close(); + _badClose(false); LOG(ERROR) << "Bad first message format"; return false; } @@ -345,116 +360,6 @@ bool Peer::_data() { return false; } -/*bool Socket::data() { - //Read data from socket - size_t n = 0; - int c = 0; - uint32_t len = 0; - - if (pos_ < 4) { - n = 4 - pos_; - } else { - len = *(int*)buffer_; - n = len+4-pos_; - } - - while (pos_ < len+4) { - if (len > MAX_MESSAGE) { - close(); - LOG(ERROR) << "Socket: " << uri_ << " - message attack"; - return false; - } - - const int rc = ftl::net::internal::recv(sock_, buffer_+pos_, n, 0); - - if (rc > 0) { - pos_ += static_cast<size_t>(rc); - - if (pos_ < 4) { - n = 4 - pos_; - } else { - len = *(int*)buffer_; - n = len+4-pos_; - } - } else if (rc == EWOULDBLOCK || rc == 0) { - // Data not yet available - if (c == 0) { - LOG(INFO) << "Socket disconnected " << uri_; - close(); - } - return false; - } else { - LOG(ERROR) << "Socket: " << uri_ << " - error " << rc; - close(); - return false; - } - c++; - } - - // Route the message... - uint32_t service = ((uint32_t*)buffer_)[1]; - auto d = std::string(buffer_+8, len-4); - - pos_ = 0; // DODGY, processing messages inside handlers is dangerous. - gpos_ = 0; - - if (service == FTL_PROTOCOL_HS1 && !connected_) { - handshake1(); - } else if (service == FTL_PROTOCOL_HS2 && !connected_) { - handshake2(); - } else if (service == FTL_PROTOCOL_RPC) { - if (proto_) proto_->dispatchRPC(*this, d); - else LOG(WARNING) << "No protocol set for socket " << uri_; - } else if (service == FTL_PROTOCOL_RPCRETURN) { - _dispatchReturn(d); - } else { - if (proto_) proto_->dispatchRaw(service, *this); - else LOG(WARNING) << "No protocol set for socket " << uri_; - } - - return true; -}*/ - -/*int Socket::read(char *b, size_t count) { - if (count > size()) LOG(WARNING) << "Reading too much data for service " << header_->service; - count = (count > size() || count==0) ? size() : count; - // TODO, utilise recv directly here... - memcpy(b,data_+gpos_,count); - gpos_+=count; - return count; -} - -int Socket::read(std::string &s, size_t count) { - count = (count > size() || count==0) ? size() : count; - s = std::string(data_+gpos_,count); - return count; -} - -void Socket::handshake1() { - Handshake header; - read(header); - - std::string peer; - if (header.name_size > 0) read(peer,header.name_size); - - std::string protouri; - if (header.proto_size > 0) read(protouri,header.proto_size); - - if (protouri.size() > 0) { - remote_proto_ = protouri; - // TODO Validate protocols with local protocol? - } - - send(FTL_PROTOCOL_HS2); // TODO Counterpart protocol. - LOG(INFO) << "Handshake (" << protouri << ") confirmed from " << uri_; - _connected(); -} - -void Socket::handshake2() { - LOG(INFO) << "Handshake finalised for " << uri_; - _connected(); -}*/ - void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... @@ -548,7 +453,7 @@ int Peer::_send() { // We are blocking, so -1 should mean actual error if (c == -1) { socketError(); - close(); + _badClose(); } return c; @@ -557,7 +462,7 @@ int Peer::_send() { Peer::~Peer() { std::unique_lock<std::mutex> lk1(send_mtx_); std::unique_lock<std::mutex> lk2(recv_mtx_); - close(); + _badClose(false); delete disp_; } diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 578570083..2b28a36b0 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -142,7 +142,7 @@ int Universe::_setDescriptors() { } void Universe::_installBindings(Peer *p) { - + } void Universe::_installBindings() { -- GitLab