Skip to content
Snippets Groups Projects
Commit 4ef56159 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Add recv check

parent 750a36c3
Branches
Tags
No related merge requests found
...@@ -319,9 +319,8 @@ void Peer::data() { ...@@ -319,9 +319,8 @@ void Peer::data() {
recv_buf_.buffer_consumed(rc); recv_buf_.buffer_consumed(rc);
// FIXME: This might get skipped incorrectly.
// However, it should be extremely unlikely given that recv is done already
//UNIQUE_LOCK(recv_mtx_, lk); //UNIQUE_LOCK(recv_mtx_, lk);
recv_checked_.clear();
if (!already_processing_.test_and_set()) { if (!already_processing_.test_and_set()) {
//lk.unlock(); //lk.unlock();
...@@ -363,9 +362,13 @@ bool Peer::_data() { ...@@ -363,9 +362,13 @@ bool Peer::_data() {
msgpack::object_handle msg_handle; msgpack::object_handle msg_handle;
try { try {
recv_checked_.test_and_set();
bool has_next = _has_next() && recv_buf_.next(msg_handle); bool has_next = _has_next() && recv_buf_.next(msg_handle);
if (!has_next) { if (!has_next) {
already_processing_.clear(); already_processing_.clear();
if (!recv_checked_.test_and_set() && !already_processing_.test_and_set()) {
return _data();
}
return false; return false;
} }
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
......
...@@ -245,6 +245,7 @@ private: // Functions ...@@ -245,6 +245,7 @@ private: // Functions
} }
std::atomic_flag already_processing_ = ATOMIC_FLAG_INIT; std::atomic_flag already_processing_ = ATOMIC_FLAG_INIT;
std::atomic_flag recv_checked_ = ATOMIC_FLAG_INIT;
msgpack::unpacker recv_buf_; msgpack::unpacker recv_buf_;
//MUTEX recv_mtx_; //MUTEX recv_mtx_;
......
...@@ -292,6 +292,7 @@ socket_t Universe::_setDescriptors() { ...@@ -292,6 +292,7 @@ socket_t Universe::_setDescriptors() {
SHARED_LOCK(net_mutex_, lk); SHARED_LOCK(net_mutex_, lk);
impl_->pollfds.clear(); impl_->pollfds.clear();
impl_->idMap.clear();
//Set file descriptor for the listening sockets. //Set file descriptor for the listening sockets.
for (auto &l : listeners_) { for (auto &l : listeners_) {
...@@ -502,6 +503,7 @@ void Universe::_run() { ...@@ -502,6 +503,7 @@ void Universe::_run() {
#ifdef WIN32 #ifdef WIN32
int errNum = WSAGetLastError(); int errNum = WSAGetLastError();
switch (errNum) { switch (errNum) {
case WSAENOTSOCK : continue; // Socket was closed
default : LOG(WARNING) << "Unhandled poll error: " << errNum; default : LOG(WARNING) << "Unhandled poll error: " << errNum;
} }
#else #else
...@@ -551,7 +553,11 @@ void Universe::_run() { ...@@ -551,7 +553,11 @@ void Universe::_run() {
SOCKET sock = s->_socket(); SOCKET sock = s->_socket();
if (sock == INVALID_SOCKET) continue; if (sock == INVALID_SOCKET) continue;
if (impl_->pollfds[impl_->idMap[sock]].revents & POLLERR) { if (impl_->idMap.count(sock) == 0) continue;
const auto &fdstruct = impl_->pollfds[impl_->idMap[sock]];
if (fdstruct.revents & POLLERR) {
if (s->socketError()) { if (s->socketError()) {
//lk.unlock(); //lk.unlock();
s->close(); s->close();
...@@ -560,7 +566,7 @@ void Universe::_run() { ...@@ -560,7 +566,7 @@ void Universe::_run() {
} }
} }
//If message received from this client then deal with it //If message received from this client then deal with it
if (impl_->pollfds[impl_->idMap[sock]].revents & POLLIN) { if (fdstruct.revents & POLLIN) {
//lk.unlock(); //lk.unlock();
s->data(); s->data();
//lk.lock(); //lk.lock();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment