Skip to content
Snippets Groups Projects
Commit 3df2cd7d authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Try without peer locks

parent 44e7b3e2
No related branches found
No related tags found
No related merge requests found
...@@ -258,7 +258,7 @@ NodeType Peer::getType() const { ...@@ -258,7 +258,7 @@ NodeType Peer::getType() const {
} }
void Peer::data() { void Peer::data() {
UNIQUE_LOCK(recv_mtx_,lk); //UNIQUE_LOCK(recv_mtx_,lk);
if (!sock_->is_valid()) { return; } if (!sock_->is_valid()) { return; }
...@@ -278,8 +278,8 @@ void Peer::data() { ...@@ -278,8 +278,8 @@ void Peer::data() {
// outside the lock and the second thread to call recv() re-acquires // outside the lock and the second thread to call recv() re-acquires
// the lock first, buffer_consumed() will be called first with second // the lock first, buffer_consumed() will be called first with second
// thread's number of bytes (rc). // thread's number of bytes (rc).
auto ctr = dbg_recv_begin_ctr_++; //auto ctr = dbg_recv_begin_ctr_++;
lk.unlock(); //lk.unlock();
try { try {
rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity()); rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity());
...@@ -312,17 +312,17 @@ void Peer::data() { ...@@ -312,17 +312,17 @@ void Peer::data() {
} }
// Re-acquire lock before processing buffer further // Re-acquire lock before processing buffer further
lk.lock(); //lk.lock();
// buffer_consumed() will not be updated with correct value, race condition // buffer_consumed() will not be updated with correct value, race condition
// described above has occurred // described above has occurred
CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()"; //CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()";
recv_buf_.buffer_consumed(rc); recv_buf_.buffer_consumed(rc);
if (is_waiting_) { if (is_waiting_) {
is_waiting_ = false; is_waiting_ = false;
lk.unlock(); //lk.unlock();
++job_count_; ++job_count_;
...@@ -356,7 +356,7 @@ bool Peer::_has_next() { ...@@ -356,7 +356,7 @@ bool Peer::_has_next() {
bool Peer::_data() { bool Peer::_data() {
// lock before trying to acquire handle to buffer // lock before trying to acquire handle to buffer
UNIQUE_LOCK(recv_mtx_, lk); //UNIQUE_LOCK(recv_mtx_, lk);
// msgpack::object is valid as long as handle is // msgpack::object is valid as long as handle is
msgpack::object_handle msg_handle; msgpack::object_handle msg_handle;
...@@ -373,27 +373,16 @@ bool Peer::_data() { ...@@ -373,27 +373,16 @@ bool Peer::_data() {
return false; return false;
} }
lk.unlock(); //lk.unlock();
msgpack::object obj = msg_handle.get(); msgpack::object obj = msg_handle.get();
// more data: repeat (loop)
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
LOG(ERROR) << "Error processing packet: " << e.what();
}
--job_count_;
});
if (status_ == NodeStatus::kConnecting) { if (status_ == NodeStatus::kConnecting) {
// If not connected, must lock to make sure no other thread performs this step // If not connected, must lock to make sure no other thread performs this step
lk.lock(); //lk.lock();
// Verify still not connected after lock // Verify still not connected after lock
if (status_ == NodeStatus::kConnecting) { //if (status_ == NodeStatus::kConnecting) {
// First message must be a handshake // First message must be a handshake
try { try {
tuple<uint32_t, std::string, msgpack::object> hs; tuple<uint32_t, std::string, msgpack::object> hs;
...@@ -403,26 +392,26 @@ bool Peer::_data() { ...@@ -403,26 +392,26 @@ bool Peer::_data() {
LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'"; LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'";
// Allow a small delay in case another thread is doing the handshake // Allow a small delay in case another thread is doing the handshake
lk.unlock(); //lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10)); //std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (status_ == NodeStatus::kConnecting) { //if (status_ == NodeStatus::kConnecting) {
LOG(ERROR) << "failed to get handshake"; LOG(ERROR) << "failed to get handshake";
close(reconnect_on_protocol_error_); close(reconnect_on_protocol_error_);
lk.lock(); //lk.lock();
return false; return false;
} //}
} else { } else {
// Must handle immediately with no other thread able // Must handle immediately with no other thread able
// to read next message before completion. // to read next message before completion.
// The handshake handler must not block. // The handshake handler must not block.
disp_->dispatch(*this, obj); //disp_->dispatch(*this, obj);
return true; //return true;
} }
} catch(...) { } catch(...) {
LOG(WARNING) << "Bad first message format... waiting"; LOG(WARNING) << "Bad first message format... waiting";
// Allow a small delay in case another thread is doing the handshake // Allow a small delay in case another thread is doing the handshake
lk.unlock(); //lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (status_ == NodeStatus::kConnecting) { if (status_ == NodeStatus::kConnecting) {
LOG(ERROR) << "failed to get handshake"; LOG(ERROR) << "failed to get handshake";
...@@ -430,16 +419,27 @@ bool Peer::_data() { ...@@ -430,16 +419,27 @@ bool Peer::_data() {
return false; return false;
} }
} }
} else { //} else {
lk.unlock(); //lk.unlock();
//}
} }
// more data: repeat (loop)
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
LOG(ERROR) << "Error processing packet: " << e.what();
} }
--job_count_;
});
disp_->dispatch(*this, obj); disp_->dispatch(*this, obj);
// Lock again before freeing msg_handle (destruction order). // Lock again before freeing msg_handle (destruction order).
// msgpack::object_handle destructor modifies recv_buffer_ // msgpack::object_handle destructor modifies recv_buffer_
lk.lock(); //lk.lock();
return true; return true;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment