diff --git a/src/peer.cpp b/src/peer.cpp index e0a1fbcd96185d3c4da0d5f294d475c2ef264f31..3b97d72ff65701894db2c86dcd5cf434ef1e6ab1 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -262,7 +262,10 @@ void Peer::data() { int rc = 0; - recv_buf_.reserve_buffer(kMaxMessage); + { + UNIQUE_LOCK(recv_mtx_,lk); + recv_buf_.reserve_buffer(kMaxMessage); + } if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity"); @@ -360,7 +363,11 @@ bool Peer::_data() { try { recv_checked_.test_and_set(); + + UNIQUE_LOCK(recv_mtx_,lk); bool has_next = _has_next() && recv_buf_.next(msg_handle); + lk.unlock(); + if (!has_next) { already_processing_.clear(); if (!recv_checked_.test_and_set() && !already_processing_.test_and_set()) { diff --git a/src/peer.hpp b/src/peer.hpp index db1566846b2392ddc27119700f5ab95905efebf1..45fba380913e9ff7c37276165f4a1c88eb2f7bbb 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -248,7 +248,7 @@ private: // Functions std::atomic_flag recv_checked_ = ATOMIC_FLAG_INIT; msgpack::unpacker recv_buf_; - //MUTEX recv_mtx_; + MUTEX recv_mtx_; // Send buffers msgpack::vrefbuffer send_buf_; diff --git a/src/universe.cpp b/src/universe.cpp index d9c43e37ddb5342a4f00910e15bb4d74e1b05ecf..bbfdd41c12e7a638e5a8378ae3940e59e2c03be6 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -418,6 +418,7 @@ PeerPtr Universe::getWebService() const { } void Universe::_periodic() { + LOG(INFO) << "PERIODIC " << reconnects_.size(); auto i = reconnects_.begin(); while (i != reconnects_.end()) {