Skip to content
Snippets Groups Projects
Commit e43102c8 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Change and improve locking in net

parent 1c72a716
No related branches found
No related tags found
No related merge requests found
......@@ -21,6 +21,7 @@ namespace ftl {
*/
class URI {
public:
URI(): m_valid(false) {}
explicit URI(uri_t puri);
explicit URI(const std::string &puri);
explicit URI(const URI &c);
......@@ -92,8 +93,8 @@ namespace ftl {
std::string m_base;
std::string m_userinfo;
std::vector<std::string> m_pathseg;
int m_port;
scheme_t m_proto;
int m_port = 0;
scheme_t m_proto = scheme_t::SCHEME_NONE;
std::string m_protostr;
// std::string m_query;
std::map<std::string, std::string> m_qmap;
......
......@@ -329,7 +329,11 @@ void Peer::data() {
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
LOG(ERROR) << "Error processing packet: " << e.what();
}
--job_count_;
});
}
......@@ -377,7 +381,14 @@ bool Peer::_data() {
// more data: repeat (loop)
++job_count_;
ftl::pool.push([this](int id) { _data(); --job_count_; });
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
LOG(ERROR) << "Error processing packet: " << e.what();
}
--job_count_;
});
if (status_ == NodeStatus::kConnecting) {
// If not connected, must lock to make sure no other thread performs this step
......
......@@ -158,16 +158,16 @@ void Universe::shutdown() {
if (!active_) return;
LOG(INFO) << "Cleanup Network ...";
active_ = false;
thread_.join();
{
UNIQUE_LOCK(net_mutex_, lk);
for (auto &s : peers_) {
if (s) s->rawClose();
}
}
peers_.clear();
active_ = false;
thread_.join();
for (auto &l : listeners_) {
l->close();
......@@ -193,16 +193,14 @@ bool Universe::listen(const ftl::URI &addr) {
}
std::vector<ftl::URI> Universe::getListeningURIs() {
UNIQUE_LOCK(net_mutex_, lk);
std::vector<ftl::URI> uris;
for (auto& l : listeners_) {
uris.push_back(l->uri());
}
SHARED_LOCK(net_mutex_, lk);
std::vector<ftl::URI> uris(listeners_.size());
std::transform(listeners_.begin(), listeners_.end(), uris.begin(), [](const auto &l){ return l->uri(); });
return uris;
}
bool Universe::isConnected(const ftl::URI &uri) {
UNIQUE_LOCK(net_mutex_,lk);
SHARED_LOCK(net_mutex_,lk);
return (peer_by_uri_.find(uri.getBaseURI()) != peer_by_uri_.end());
}
......@@ -215,18 +213,16 @@ std::shared_ptr<Peer> Universe::connect(const ftl::URI &u) {
// Check if already connected or if self (when could this happen?)
{
UNIQUE_LOCK(net_mutex_,lk);
SHARED_LOCK(net_mutex_,lk);
if (peer_by_uri_.find(u.getBaseURI()) != peer_by_uri_.end()) {
return peers_[peer_by_uri_.at(u.getBaseURI())];
}
//if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") {
//for (const auto &l : listeners_) {
//if (l->port() == u.getPort()) {
// throw FTL_Error("Cannot connect to self");
//} // TODO extend api
//}
//}
if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") {
if (std::any_of(listeners_.begin(), listeners_.end(), [u](const auto &l) { return l->port() == u.getPort(); })) {
throw FTL_Error("Cannot connect to self");
}
}
}
auto p = std::make_shared<Peer>(u, this, &disp_);
......@@ -455,16 +451,15 @@ void Universe::_run() {
continue;
}
{
// TODO:(Nick) Shared lock unless connection is made
UNIQUE_LOCK(net_mutex_,lk);
SHARED_LOCK(net_mutex_,lk);
//If connection request is waiting
for (auto &l : listeners_) {
if (l && l->is_listening()) {
if (FD_ISSET(l->fd(), &(impl_->sfdread_))) {
lk.unlock();
try {
UNIQUE_LOCK(net_mutex_,ulk);
auto csock = l->accept();
auto p = std::make_shared<Peer>(std::move(csock), this, &disp_);
peers_.push_back(p);
......@@ -472,13 +467,11 @@ void Universe::_run() {
} catch (const std::exception &ex) {
LOG(ERROR) << "Connection failed: " << ex.what();
}
}
lk.lock();
}
}
}
{
SHARED_LOCK(net_mutex_, lk);
// Also check each clients socket to see if any messages or errors are waiting
for (size_t p=0; p<peers_.size(); ++p) {
......@@ -504,7 +497,19 @@ void Universe::_run() {
}
++phase_;
}
// Garbage is a threadsafe container, moving there first allows the destructor to be called
// without the lock.
{
UNIQUE_LOCK(net_mutex_,lk);
garbage_.insert(garbage_.end(), peers_.begin(), peers_.end());
reconnects_.clear();
peers_.clear();
peer_by_uri_.clear();
peer_ids_.clear();
}
garbage_.clear();
}
ftl::Handle Universe::onConnect(const std::function<bool(const std::shared_ptr<Peer>&)> &cb) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment