From 4ef56159f7cdb1248e9b61378b5b64c772ead540 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Mon, 9 May 2022 10:07:18 +0100 Subject: [PATCH] Add recv check --- src/peer.cpp | 7 +++++-- src/peer.hpp | 1 + src/universe.cpp | 12 +++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/peer.cpp b/src/peer.cpp index fc5e6c6..2a44aab 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -319,9 +319,8 @@ void Peer::data() { recv_buf_.buffer_consumed(rc); - // FIXME: This might get skipped incorrectly. - // However, it should be extremely unlikely given that recv is done already //UNIQUE_LOCK(recv_mtx_, lk); + recv_checked_.clear(); if (!already_processing_.test_and_set()) { //lk.unlock(); @@ -363,9 +362,13 @@ bool Peer::_data() { msgpack::object_handle msg_handle; try { + recv_checked_.test_and_set(); bool has_next = _has_next() && recv_buf_.next(msg_handle); if (!has_next) { already_processing_.clear(); + if (!recv_checked_.test_and_set() && !already_processing_.test_and_set()) { + return _data(); + } return false; } } catch (const std::exception& ex) { diff --git a/src/peer.hpp b/src/peer.hpp index 72fad36..01c7cd6 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -245,6 +245,7 @@ private: // Functions } std::atomic_flag already_processing_ = ATOMIC_FLAG_INIT; + std::atomic_flag recv_checked_ = ATOMIC_FLAG_INIT; msgpack::unpacker recv_buf_; //MUTEX recv_mtx_; diff --git a/src/universe.cpp b/src/universe.cpp index 3599226..0ce6b5c 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -292,6 +292,7 @@ socket_t Universe::_setDescriptors() { SHARED_LOCK(net_mutex_, lk); impl_->pollfds.clear(); + impl_->idMap.clear(); //Set file descriptor for the listening sockets. for (auto &l : listeners_) { @@ -502,7 +503,8 @@ void Universe::_run() { #ifdef WIN32 int errNum = WSAGetLastError(); switch (errNum) { - default : LOG(WARNING) << "Unhandled poll error: " << errNum; + case WSAENOTSOCK : continue; // Socket was closed + default : LOG(WARNING) << "Unhandled poll error: " << errNum; } #else switch (errno) { @@ -551,7 +553,11 @@ void Universe::_run() { SOCKET sock = s->_socket(); if (sock == INVALID_SOCKET) continue; - if (impl_->pollfds[impl_->idMap[sock]].revents & POLLERR) { + if (impl_->idMap.count(sock) == 0) continue; + + const auto &fdstruct = impl_->pollfds[impl_->idMap[sock]]; + + if (fdstruct.revents & POLLERR) { if (s->socketError()) { //lk.unlock(); s->close(); @@ -560,7 +566,7 @@ void Universe::_run() { } } //If message received from this client then deal with it - if (impl_->pollfds[impl_->idMap[sock]].revents & POLLIN) { + if (fdstruct.revents & POLLIN) { //lk.unlock(); s->data(); //lk.lock(); -- GitLab