/** * @file universe.cpp * @copyright Copyright (c) 2020 University of Turku, MIT License * @author Nicolas Pope */ #include <chrono> #include <utility> #include <algorithm> #include <memory> #include <unordered_map> #include "universe.hpp" #include "socketImpl.hpp" #define LOGURU_REPLACE_GLOG 1 #include <ftl/lib/loguru.hpp> #include <nlohmann/json.hpp> #include "protocol/connection.hpp" #include "protocol/tcp.hpp" #ifdef WIN32 #include <winsock2.h> #include <Ws2tcpip.h> #endif #ifndef WIN32 #include <signal.h> #include <poll.h> #endif using std::string; using std::vector; using std::thread; using ftl::net::Peer; using ftl::net::PeerPtr; using ftl::net::Universe; using nlohmann::json; using ftl::UUID; using std::optional; using ftl::net::Callback; using ftl::net::internal::socket_t; using ftl::protocol::NodeStatus; using ftl::protocol::NodeType; using ftl::net::internal::SocketServer; using ftl::net::internal::Server_TCP; using std::chrono::milliseconds; constexpr int kDefaultMaxConnections = 10; namespace ftl { namespace net { std::unique_ptr<SocketServer> create_listener(const ftl::URI &uri) { if (uri.getProtocol() == ftl::URI::scheme_t::SCHEME_TCP) { return std::make_unique<Server_TCP>(uri.getHost(), uri.getPort()); } if (uri.getProtocol() == ftl::URI::scheme_t::SCHEME_WS) { throw FTL_Error("WebSocket listener not implemented"); } return nullptr; } struct NetImplDetail { std::vector<pollfd> pollfds; std::unordered_map<int, size_t> idMap; }; } // namespace net } // namespace ftl // TODO(Seb): move to ServerSocket and ClientSocket // Defaults, should be changed in config #define TCP_SEND_BUFFER_SIZE (1024*1024) #define TCP_RECEIVE_BUFFER_SIZE (1024*1024) // Perhaps try 24K? #define WS_SEND_BUFFER_SIZE (1024*1024) #define WS_RECEIVE_BUFFER_SIZE (62*1024) std::shared_ptr<Universe> Universe::instance_ = nullptr; Universe::Universe() : active_(true), this_peer(ftl::protocol::id), impl_(new ftl::net::NetImplDetail()), peers_(kDefaultMaxConnections), phase_(0), periodic_time_(1.0), reconnect_attempts_(5), thread_(Universe::__start, this) { _installBindings(); } Universe::~Universe() { shutdown(); CHECK_EQ(peer_instances_, 0); } void Universe::setMaxConnections(size_t m) { UNIQUE_LOCK(net_mutex_, lk); peers_.resize(m); } size_t Universe::getSendBufferSize(ftl::URI::scheme_t s) { // TODO(Nick): Allow these to be configured again. switch (s) { case ftl::URI::scheme_t::SCHEME_WS: case ftl::URI::scheme_t::SCHEME_WSS: return WS_SEND_BUFFER_SIZE; default: return TCP_SEND_BUFFER_SIZE; } } size_t Universe::getRecvBufferSize(ftl::URI::scheme_t s) { switch (s) { case ftl::URI::scheme_t::SCHEME_WS: case ftl::URI::scheme_t::SCHEME_WSS: return WS_RECEIVE_BUFFER_SIZE; default: return TCP_RECEIVE_BUFFER_SIZE; } } void Universe::start() { /*auto l = get<json_t>("listen"); if (l && (*l).is_array()) { for (auto &ll : *l) { listen(ftl::URI(ll)); } } else if (l && (*l).is_string()) { listen(ftl::URI((*l).get<string>())); } auto p = get<json_t>("peers"); if (p && (*p).is_array()) { for (auto &pp : *p) { try { connect(pp); } catch (const ftl::exception &ex) { LOG(ERROR) << "Could not connect to: " << std::string(pp); } } }*/ } void Universe::shutdown() { if (!active_) return; DLOG(1) << "Cleanup Network ..."; { SHARED_LOCK(net_mutex_, lk); for (auto &l : listeners_) { l->close(); } for (auto &s : peers_) { if (s) s->rawClose(); } } active_ = false; thread_.join(); // FIXME: This shouldn't be needed if (peer_instances_ > 0 && ftl::pool.size() > 0) { DLOG(WARNING) << "Waiting on peer destruction... " << peer_instances_; std::this_thread::sleep_for(std::chrono::milliseconds(2)); if (peer_instances_ > 0) LOG(FATAL) << "Peers not destroyed"; } } bool Universe::listen(const ftl::URI &addr) { try { auto l = create_listener(addr); l->bind(); { UNIQUE_LOCK(net_mutex_, lk); DLOG(1) << "listening on " << l->uri().to_string(); listeners_.push_back(std::move(l)); } socket_cv_.notify_one(); return true; } catch (const std::exception &ex) { DLOG(INFO) << "Can't listen " << addr.to_string() << ", " << ex.what(); _notifyError(nullptr, ftl::protocol::Error::kListen, ex.what()); return false; } } std::vector<ftl::URI> Universe::getListeningURIs() { SHARED_LOCK(net_mutex_, lk); std::vector<ftl::URI> uris(listeners_.size()); std::transform(listeners_.begin(), listeners_.end(), uris.begin(), [](const auto &l){ return l->uri(); }); return uris; } bool Universe::isConnected(const ftl::URI &uri) { SHARED_LOCK(net_mutex_, lk); return (peer_by_uri_.find(uri.getBaseURI()) != peer_by_uri_.end()); } bool Universe::isConnected(const std::string &s) { ftl::URI uri(s); return isConnected(uri); } void Universe::_insertPeer(const PeerPtr &ptr) { UNIQUE_LOCK(net_mutex_, lk); for (size_t i = 0; i < peers_.size(); ++i) { if (!peers_[i]) { ++connection_count_; peers_[i] = ptr; peer_by_uri_[ptr->getURIObject().getBaseURI()] = i; peer_ids_[ptr->id()] = i; ptr->local_id_ = i; lk.unlock(); socket_cv_.notify_one(); return; } } throw FTL_Error("Too many connections"); } PeerPtr Universe::connect(const ftl::URI &u) { // Check if already connected or if self (when could this happen?) { SHARED_LOCK(net_mutex_, lk); if (peer_by_uri_.find(u.getBaseURI()) != peer_by_uri_.end()) { return peers_[peer_by_uri_.at(u.getBaseURI())]; } if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { if (std::any_of( listeners_.begin(), listeners_.end(), [u](const auto &l) { return l->port() == u.getPort(); })) { throw FTL_Error("Cannot connect to self"); } } } auto p = std::make_shared<Peer>(u, this, &disp_); _insertPeer(p); _installBindings(p); p->start(); return p; } PeerPtr Universe::connect(const std::string& addr) { return connect(ftl::URI(addr)); } void Universe::unbind(const std::string &name) { UNIQUE_LOCK(net_mutex_, lk); disp_.unbind(name); } int Universe::waitConnections(int seconds) { SHARED_LOCK(net_mutex_, lk); auto peers = peers_; lk.unlock(); return std::count_if(peers.begin(), peers.end(), [seconds](const auto &p) { return p && p->waitConnection(seconds); }); } void Universe::_setDescriptors() { SHARED_LOCK(net_mutex_, lk); impl_->pollfds.clear(); impl_->idMap.clear(); // Set file descriptor for the listening sockets. for (auto &l : listeners_) { if (l) { auto sock = l->fd(); if (sock != INVALID_SOCKET) { pollfd fdentry; #ifdef WIN32 fdentry.events = POLLIN; #else fdentry.events = POLLIN; // | POLLERR; #endif fdentry.fd = sock; fdentry.revents = 0; impl_->pollfds.push_back(fdentry); impl_->idMap[sock] = impl_->pollfds.size() - 1; } } } // Set the file descriptors for each client for (const auto &s : peers_) { if (s && s->isValid()) { auto sock = s->_socket(); if (sock != INVALID_SOCKET) { pollfd fdentry; #ifdef WIN32 fdentry.events = POLLIN; #else fdentry.events = POLLIN; // | POLLERR; #endif fdentry.fd = sock; fdentry.revents = 0; impl_->pollfds.push_back(fdentry); impl_->idMap[sock] = impl_->pollfds.size() - 1; } } } } void Universe::_installBindings(const PeerPtr &p) {} void Universe::_installBindings() {} void Universe::_removePeer(PeerPtr &p) { UNIQUE_LOCK(net_mutex_, ulk); if (p && (!p->isValid() || p->status() == NodeStatus::kReconnecting || p->status() == NodeStatus::kDisconnected)) { auto ix = peer_ids_.find(p->id()); if (ix != peer_ids_.end()) peer_ids_.erase(ix); for (auto j=peer_by_uri_.begin(); j != peer_by_uri_.end(); ++j) { if (peers_[j->second] == p) { peer_by_uri_.erase(j); break; } } if (p->status() == NodeStatus::kReconnecting) { reconnects_.push_back({reconnect_attempts_, 1.0f, p}); } else { garbage_.push_back(p); } --connection_count_; DLOG(1) << "Removing disconnected peer: " << p->id().to_string(); on_disconnect_.triggerAsync(p); p.reset(); } } void Universe::_cleanupPeers() { SHARED_LOCK(net_mutex_, lk); auto i = peers_.begin(); while (i != peers_.end()) { auto &p = *i; if (p && (!p->isValid() || p->status() == NodeStatus::kReconnecting || p->status() == NodeStatus::kDisconnected)) { lk.unlock(); _removePeer(p); lk.lock(); } ++i; } } PeerPtr Universe::getPeer(const UUID &id) const { SHARED_LOCK(net_mutex_, lk); auto ix = peer_ids_.find(id); if (ix == peer_ids_.end()) return nullptr; else return peers_[ix->second]; } PeerPtr Universe::getWebService() const { SHARED_LOCK(net_mutex_, lk); auto it = std::find_if(peers_.begin(), peers_.end(), [](const auto &p) { return p && p->getType() == NodeType::kWebService; }); return (it != peers_.end()) ? *it : nullptr; } std::list<PeerPtr> Universe::getPeers() const { SHARED_LOCK(net_mutex_, lk); std::list<PeerPtr> result; std::copy_if(peers_.begin(), peers_.end(), std::back_inserter(result), [](const PeerPtr &ptr){ return !!ptr; }); return result; } void Universe::_periodic() { auto i = reconnects_.begin(); while (i != reconnects_.end()) { std::string addr = i->peer->getURI(); { SHARED_LOCK(net_mutex_, lk); ftl::URI u(addr); bool removed = false; if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { for (const auto &l : listeners_) { if (l->port() == u.getPort()) { _notifyError(nullptr, ftl::protocol::Error::kSelfConnect, "Cannot connect to self"); garbage_.push_back((*i).peer); i = reconnects_.erase(i); removed = true; break; } } } if (removed) continue; } auto peer = i->peer; _insertPeer(peer); peer->status_ = NodeStatus::kConnecting; i = reconnects_.erase(i); // ftl::pool.push([peer](int id) { peer->reconnect(); // }); /*if ((*i).peer->reconnect()) { _insertPeer((*i).peer); i = reconnects_.erase(i); } else if ((*i).tries > 0) { (*i).tries--; i++; } else { garbage_.push_back((*i).peer); i = reconnects_.erase(i); }*/ } // Garbage peers may not be needed any more if (garbage_.size() > 0) { UNIQUE_LOCK(net_mutex_, lk); // Only do garbage if processing is idle. if (ftl::pool.n_idle() == ftl::pool.size()) { if (garbage_.size() > 0) DLOG(1) << "Garbage collection"; while (garbage_.size() > 0) { garbage_.front().reset(); garbage_.pop_front(); } } } } void Universe::__start(Universe *u) { #ifndef WIN32 // TODO(Seb): move somewhere else (common initialization file?) signal(SIGPIPE, SIG_IGN); #endif // WIN32 u->_run(); } void Universe::_run() { auto start = std::chrono::high_resolution_clock::now(); while (active_) { _setDescriptors(); int selres = 1; _cleanupPeers(); // Do periodics auto now = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = now - start; if (elapsed.count() >= periodic_time_) { start = now; _periodic(); } // It is an error to use "select" with no sockets ... so just sleep if (impl_->pollfds.size() == 0) { std::shared_lock lk(net_mutex_); socket_cv_.wait_for( lk, milliseconds(100), [this](){ return listeners_.size() > 0 || connection_count_ > 0; }); continue; } #ifdef WIN32 selres = WSAPoll(impl_->pollfds.data(), impl_->pollfds.size(), 100); #else selres = poll(impl_->pollfds.data(), impl_->pollfds.size(), 100); #endif // Some kind of error occured, it is usually possible to recover from this. if (selres < 0) { #ifdef WIN32 int errNum = WSAGetLastError(); switch (errNum) { case WSAENOTSOCK : continue; // Socket was closed default : DLOG(WARNING) << "Unhandled poll error: " << errNum; } #else switch (errno) { case 9 : continue; // Bad file descriptor = socket closed case 4 : continue; // Interrupted system call ... no problem default : DLOG(WARNING) << "Unhandled poll error: " << strerror(errno) << "(" << errno << ")"; } #endif continue; } else if (selres == 0) { // Timeout, nothing to do... continue; } SHARED_LOCK(net_mutex_, lk); // If connection request is waiting for (auto &l : listeners_) { if (l && l->is_listening() && (impl_->pollfds[impl_->idMap[l->fd()]].revents & POLLIN)) { std::unique_ptr<ftl::net::internal::SocketConnection> csock; try { csock = l->accept(); } catch (const std::exception &ex) { _notifyError(nullptr, ftl::protocol::Error::kConnectionFailed, ex.what()); } lk.unlock(); if (csock) { auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); _insertPeer(p); p->start(); } lk.lock(); } } // Also check each clients socket to see if any messages or errors are waiting for (size_t p = 0; p < peers_.size(); ++p) { auto s = peers_[(p+phase_)%peers_.size()]; if (s && s->isValid()) { // Note: It is possible that the socket becomes invalid after check but before // looking at the FD sets, therefore cache the original socket SOCKET sock = s->_socket(); if (sock == INVALID_SOCKET) continue; if (impl_->idMap.count(sock) == 0) continue; const auto &fdstruct = impl_->pollfds[impl_->idMap[sock]]; // This is needed on Windows to detect socket close. if (fdstruct.revents & POLLERR) { if (s->socketError()) { continue; // No point in reading data... } } // If message received from this client then deal with it if (fdstruct.revents & POLLIN) { lk.unlock(); s->data(); lk.lock(); } } } ++phase_; } // Garbage is a threadsafe container, moving there first allows the destructor to be called // without the lock. { UNIQUE_LOCK(net_mutex_, lk); garbage_.insert(garbage_.end(), peers_.begin(), peers_.end()); reconnects_.clear(); peers_.clear(); peer_by_uri_.clear(); peer_ids_.clear(); listeners_.clear(); } garbage_.clear(); } ftl::Handle Universe::onConnect(const std::function<bool(const PeerPtr&)> &cb) { return on_connect_.on(cb); } ftl::Handle Universe::onDisconnect(const std::function<bool(const PeerPtr&)> &cb) { return on_disconnect_.on(cb); } ftl::Handle Universe::onError( const std::function<bool(const PeerPtr&, ftl::protocol::Error, const std::string &)> &cb) { return on_error_.on(cb); } PeerPtr Universe::injectFakePeer(std::unique_ptr<ftl::net::internal::SocketConnection> s) { auto p = std::make_shared<Peer>(std::move(s), this, &disp_); _insertPeer(p); _installBindings(p); return p; } PeerPtr Universe::_findPeer(const Peer *p) { SHARED_LOCK(net_mutex_, lk); for (const auto &pp : peers_) { if (pp.get() == p) return pp; } return nullptr; } void Universe::_notifyConnect(Peer *p) { const auto ptr = _findPeer(p); // The peer could have been removed from valid peers already. if (!ptr) return; { UNIQUE_LOCK(net_mutex_, lk); peer_ids_[ptr->id()] = ptr->local_id_; } on_connect_.triggerAsync(ptr); } void Universe::_notifyDisconnect(Peer *p) { const auto ptr = _findPeer(p); if (!ptr) return; on_disconnect_.triggerAsync(ptr); } void Universe::_notifyError(Peer *p, ftl::protocol::Error e, const std::string &errstr) { LOG(ERROR) << "Net Error (" << int(e) << "): " << errstr; const auto ptr = (p) ? _findPeer(p) : nullptr; on_error_.triggerAsync(ptr, e, errstr); }