From e43102c8deedad10a3f6dd2237e7cf3272c68205 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Sat, 7 May 2022 20:08:54 +0100 Subject: [PATCH] Change and improve locking in net --- include/ftl/uri.hpp | 5 +- src/peer.cpp | 15 +++++- src/universe.cpp | 129 +++++++++++++++++++++++--------------------- 3 files changed, 83 insertions(+), 66 deletions(-) diff --git a/include/ftl/uri.hpp b/include/ftl/uri.hpp index 159c600..4d8b7e7 100644 --- a/include/ftl/uri.hpp +++ b/include/ftl/uri.hpp @@ -21,6 +21,7 @@ namespace ftl { */ class URI { public: + URI(): m_valid(false) {} explicit URI(uri_t puri); explicit URI(const std::string &puri); explicit URI(const URI &c); @@ -92,8 +93,8 @@ namespace ftl { std::string m_base; std::string m_userinfo; std::vector<std::string> m_pathseg; - int m_port; - scheme_t m_proto; + int m_port = 0; + scheme_t m_proto = scheme_t::SCHEME_NONE; std::string m_protostr; // std::string m_query; std::map<std::string, std::string> m_qmap; diff --git a/src/peer.cpp b/src/peer.cpp index 91c2375..549b2d9 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -329,7 +329,11 @@ void Peer::data() { ++job_count_; ftl::pool.push([this](int id) { - _data(); + try { + _data(); + } catch (const std::exception &e) { + LOG(ERROR) << "Error processing packet: " << e.what(); + } --job_count_; }); } @@ -377,7 +381,14 @@ bool Peer::_data() { // more data: repeat (loop) ++job_count_; - ftl::pool.push([this](int id) { _data(); --job_count_; }); + ftl::pool.push([this](int id) { + try { + _data(); + } catch (const std::exception &e) { + LOG(ERROR) << "Error processing packet: " << e.what(); + } + --job_count_; + }); if (status_ == NodeStatus::kConnecting) { // If not connected, must lock to make sure no other thread performs this step diff --git a/src/universe.cpp b/src/universe.cpp index f21d47d..7150571 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -158,16 +158,16 @@ void Universe::shutdown() { if (!active_) return; LOG(INFO) << "Cleanup Network ..."; - active_ = false; - thread_.join(); - - UNIQUE_LOCK(net_mutex_, lk); + { + UNIQUE_LOCK(net_mutex_, lk); - for (auto &s : peers_) { - if (s) s->rawClose(); + for (auto &s : peers_) { + if (s) s->rawClose(); + } } - - peers_.clear(); + + active_ = false; + thread_.join(); for (auto &l : listeners_) { l->close(); @@ -193,16 +193,14 @@ bool Universe::listen(const ftl::URI &addr) { } std::vector<ftl::URI> Universe::getListeningURIs() { - UNIQUE_LOCK(net_mutex_, lk); - std::vector<ftl::URI> uris; - for (auto& l : listeners_) { - uris.push_back(l->uri()); - } + SHARED_LOCK(net_mutex_, lk); + std::vector<ftl::URI> uris(listeners_.size()); + std::transform(listeners_.begin(), listeners_.end(), uris.begin(), [](const auto &l){ return l->uri(); }); return uris; } bool Universe::isConnected(const ftl::URI &uri) { - UNIQUE_LOCK(net_mutex_,lk); + SHARED_LOCK(net_mutex_,lk); return (peer_by_uri_.find(uri.getBaseURI()) != peer_by_uri_.end()); } @@ -215,18 +213,16 @@ std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) { // Check if already connected or if self (when could this happen?) { - UNIQUE_LOCK(net_mutex_,lk); + SHARED_LOCK(net_mutex_,lk); if (peer_by_uri_.find(u.getBaseURI()) != peer_by_uri_.end()) { return peers_[peer_by_uri_.at(u.getBaseURI())]; } - //if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { - //for (const auto &l : listeners_) { - //if (l->port() == u.getPort()) { - // throw FTL_Error("Cannot connect to self"); - //} // TODO extend api - //} - //} + if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { + if (std::any_of(listeners_.begin(), listeners_.end(), [u](const auto &l) { return l->port() == u.getPort(); })) { + throw FTL_Error("Cannot connect to self"); + } + } } auto p = std::make_shared<Peer>(u, this, &disp_); @@ -455,56 +451,65 @@ void Universe::_run() { continue; } - { - // TODO:(Nick) Shared lock unless connection is made - UNIQUE_LOCK(net_mutex_,lk); - - //If connection request is waiting - for (auto &l : listeners_) { - if (l && l->is_listening()) { - if (FD_ISSET(l->fd(), &(impl_->sfdread_))) { - - try { - auto csock = l->accept(); - auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); - peers_.push_back(p); - - } catch (const std::exception &ex) { - LOG(ERROR) << "Connection failed: " << ex.what(); - } + SHARED_LOCK(net_mutex_,lk); + + //If connection request is waiting + for (auto &l : listeners_) { + if (l && l->is_listening()) { + if (FD_ISSET(l->fd(), &(impl_->sfdread_))) { + lk.unlock(); + try { + UNIQUE_LOCK(net_mutex_,ulk); + auto csock = l->accept(); + auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); + peers_.push_back(p); + + } catch (const std::exception &ex) { + LOG(ERROR) << "Connection failed: " << ex.what(); } + lk.lock(); } } } - { - SHARED_LOCK(net_mutex_, lk); - - // Also check each clients socket to see if any messages or errors are waiting - for (size_t p=0; p<peers_.size(); ++p) { - auto s = peers_[(p+phase_)%peers_.size()]; - - if (s != NULL && s->isValid()) { - // Note: It is possible that the socket becomes invalid after check but before - // looking at the FD sets, therefore cache the original socket - SOCKET sock = s->_socket(); - if (sock == INVALID_SOCKET) continue; - - if (FD_ISSET(sock, &impl_->sfderror_)) { - if (s->socketError()) { - s->close(); - continue; // No point in reading data... - } - } - //If message received from this client then deal with it - if (FD_ISSET(sock, &impl_->sfdread_)) { - s->data(); + + // Also check each clients socket to see if any messages or errors are waiting + for (size_t p=0; p<peers_.size(); ++p) { + auto s = peers_[(p+phase_)%peers_.size()]; + + if (s != NULL && s->isValid()) { + // Note: It is possible that the socket becomes invalid after check but before + // looking at the FD sets, therefore cache the original socket + SOCKET sock = s->_socket(); + if (sock == INVALID_SOCKET) continue; + + if (FD_ISSET(sock, &impl_->sfderror_)) { + if (s->socketError()) { + s->close(); + continue; // No point in reading data... } } + //If message received from this client then deal with it + if (FD_ISSET(sock, &impl_->sfdread_)) { + s->data(); + } } - ++phase_; } + ++phase_; + } + + // Garbage is a threadsafe container, moving there first allows the destructor to be called + // without the lock. + { + UNIQUE_LOCK(net_mutex_,lk); + garbage_.insert(garbage_.end(), peers_.begin(), peers_.end()); + reconnects_.clear(); + peers_.clear(); + peer_by_uri_.clear(); + peer_ids_.clear(); } + + garbage_.clear(); } ftl::Handle Universe::onConnect(const std::function<bool(const std::shared_ptr<Peer>&)> &cb) { -- GitLab