diff --git a/src/peer.cpp b/src/peer.cpp index 066b2a89c82cc05b1608ab868411a4d31da3357a..f0708cc8bcb6cdb650b616bdf6dd7a5c3567e0c6 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -258,7 +258,7 @@ NodeType Peer::getType() const { } void Peer::data() { - UNIQUE_LOCK(recv_mtx_,lk); + //UNIQUE_LOCK(recv_mtx_,lk); if (!sock_->is_valid()) { return; } @@ -278,8 +278,8 @@ void Peer::data() { // outside the lock and the second thread to call recv() re-acquires // the lock first, buffer_consumed() will be called first with second // thread's number of bytes (rc). - auto ctr = dbg_recv_begin_ctr_++; - lk.unlock(); + //auto ctr = dbg_recv_begin_ctr_++; + //lk.unlock(); try { rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity()); @@ -312,17 +312,17 @@ void Peer::data() { } // Re-acquire lock before processing buffer further - lk.lock(); + //lk.lock(); // buffer_consumed() will not be updated with correct value, race condition // described above has occurred - CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()"; + //CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()"; recv_buf_.buffer_consumed(rc); if (is_waiting_) { is_waiting_ = false; - lk.unlock(); + //lk.unlock(); ++job_count_; @@ -356,7 +356,7 @@ bool Peer::_has_next() { bool Peer::_data() { // lock before trying to acquire handle to buffer - UNIQUE_LOCK(recv_mtx_, lk); + //UNIQUE_LOCK(recv_mtx_, lk); // msgpack::object is valid as long as handle is msgpack::object_handle msg_handle; @@ -373,27 +373,16 @@ bool Peer::_data() { return false; } - lk.unlock(); + //lk.unlock(); msgpack::object obj = msg_handle.get(); - - // more data: repeat (loop) - ++job_count_; - ftl::pool.push([this](int id) { - try { - _data(); - } catch (const std::exception &e) { - LOG(ERROR) << "Error processing packet: " << e.what(); - } - --job_count_; - }); if (status_ == NodeStatus::kConnecting) { // If not connected, must lock to make sure no other thread performs this step - lk.lock(); + //lk.lock(); // Verify still not connected after lock - if (status_ == NodeStatus::kConnecting) { + //if (status_ == NodeStatus::kConnecting) { // First message must be a handshake try { tuple<uint32_t, std::string, msgpack::object> hs; @@ -403,26 +392,26 @@ bool Peer::_data() { LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'"; // Allow a small delay in case another thread is doing the handshake - lk.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - if (status_ == NodeStatus::kConnecting) { + //lk.unlock(); + //std::this_thread::sleep_for(std::chrono::milliseconds(10)); + //if (status_ == NodeStatus::kConnecting) { LOG(ERROR) << "failed to get handshake"; close(reconnect_on_protocol_error_); - lk.lock(); + //lk.lock(); return false; - } + //} } else { // Must handle immediately with no other thread able // to read next message before completion. // The handshake handler must not block. - disp_->dispatch(*this, obj); - return true; + //disp_->dispatch(*this, obj); + //return true; } } catch(...) { LOG(WARNING) << "Bad first message format... waiting"; // Allow a small delay in case another thread is doing the handshake - lk.unlock(); + //lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (status_ == NodeStatus::kConnecting) { LOG(ERROR) << "failed to get handshake"; @@ -430,16 +419,27 @@ bool Peer::_data() { return false; } } - } else { - lk.unlock(); - } + //} else { + //lk.unlock(); + //} } + // more data: repeat (loop) + ++job_count_; + ftl::pool.push([this](int id) { + try { + _data(); + } catch (const std::exception &e) { + LOG(ERROR) << "Error processing packet: " << e.what(); + } + --job_count_; + }); + disp_->dispatch(*this, obj); // Lock again before freeing msg_handle (destruction order). // msgpack::object_handle destructor modifies recv_buffer_ - lk.lock(); + //lk.lock(); return true; }