From 3df2cd7d07bcba074fd4a73fe5e8687e44222bfb Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Sun, 8 May 2022 21:26:10 +0100
Subject: [PATCH] Try without peer locks

---
 src/peer.cpp | 66 ++++++++++++++++++++++++++--------------------------
 1 file changed, 33 insertions(+), 33 deletions(-)

diff --git a/src/peer.cpp b/src/peer.cpp
index 066b2a8..f0708cc 100644
--- a/src/peer.cpp
+++ b/src/peer.cpp
@@ -258,7 +258,7 @@ NodeType Peer::getType() const {
 }
 
 void Peer::data() {
-	UNIQUE_LOCK(recv_mtx_,lk);
+	//UNIQUE_LOCK(recv_mtx_,lk);
 
 	if (!sock_->is_valid()) { return; }
 
@@ -278,8 +278,8 @@ void Peer::data() {
 	//       outside the lock and the second thread to call recv() re-acquires 
 	//       the lock first, buffer_consumed() will be called first with second
 	//       thread's number of bytes (rc).
-	auto ctr = dbg_recv_begin_ctr_++;
-	lk.unlock();
+	//auto ctr = dbg_recv_begin_ctr_++;
+	//lk.unlock();
 
 	try {
 		rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity());
@@ -312,17 +312,17 @@ void Peer::data() {
 	}
 
 	// Re-acquire lock before processing buffer further
-	lk.lock();
+	//lk.lock();
 
 	// buffer_consumed() will not be updated with correct value, race condition
 	// described above has occurred
-	CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()";
+	//CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()";
 
 	recv_buf_.buffer_consumed(rc);
 	
 	if (is_waiting_) {
 		is_waiting_ = false;
-		lk.unlock();
+		//lk.unlock();
 
 		++job_count_;
 
@@ -356,7 +356,7 @@ bool Peer::_has_next() {
 
 bool Peer::_data() {
 	// lock before trying to acquire handle to buffer
-	UNIQUE_LOCK(recv_mtx_, lk);
+	//UNIQUE_LOCK(recv_mtx_, lk);
 
 	// msgpack::object is valid as long as handle is
 	msgpack::object_handle msg_handle;
@@ -373,27 +373,16 @@ bool Peer::_data() {
 		return false;
 	}
 
-	lk.unlock();
+	//lk.unlock();
 
 	msgpack::object obj = msg_handle.get();
-	
-	// more data: repeat (loop)
-	++job_count_;
-	ftl::pool.push([this](int id) {
-		try {
-			_data();
-		} catch (const std::exception &e) {
-			LOG(ERROR) << "Error processing packet: " << e.what();	
-		}
-		--job_count_;
-	});
 
 	if (status_ == NodeStatus::kConnecting) {
 		// If not connected, must lock to make sure no other thread performs this step
-		lk.lock();
+		//lk.lock();
 
 		// Verify still not connected after lock
-		if (status_ == NodeStatus::kConnecting) {
+		//if (status_ == NodeStatus::kConnecting) {
 			// First message must be a handshake
 			try {
 				tuple<uint32_t, std::string, msgpack::object> hs;
@@ -403,26 +392,26 @@ bool Peer::_data() {
 					LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'";
 
 					// Allow a small delay in case another thread is doing the handshake
-					lk.unlock();
-					std::this_thread::sleep_for(std::chrono::milliseconds(10));
-					if (status_ == NodeStatus::kConnecting) {
+					//lk.unlock();
+					//std::this_thread::sleep_for(std::chrono::milliseconds(10));
+					//if (status_ == NodeStatus::kConnecting) {
 						LOG(ERROR) << "failed to get handshake";
 						close(reconnect_on_protocol_error_);
-						lk.lock();
+						//lk.lock();
 						return false;
-					}
+					//}
 				} else {
 					// Must handle immediately with no other thread able
 					// to read next message before completion.
 					// The handshake handler must not block.
-					disp_->dispatch(*this, obj);
-					return true;
+					//disp_->dispatch(*this, obj);
+					//return true;
 				}
 			} catch(...) {
 				LOG(WARNING) << "Bad first message format... waiting";
 				// Allow a small delay in case another thread is doing the handshake
 
-				lk.unlock();
+				//lk.unlock();
 				std::this_thread::sleep_for(std::chrono::milliseconds(10));
 				if (status_ == NodeStatus::kConnecting) {
 					LOG(ERROR) << "failed to get handshake";
@@ -430,16 +419,27 @@ bool Peer::_data() {
 					return false;
 				}
 			}
-		} else {
-			lk.unlock();
-		}
+		//} else {
+			//lk.unlock();
+		//}
 	}
 	
+	// more data: repeat (loop)
+	++job_count_;
+	ftl::pool.push([this](int id) {
+		try {
+			_data();
+		} catch (const std::exception &e) {
+			LOG(ERROR) << "Error processing packet: " << e.what();	
+		}
+		--job_count_;
+	});
+	
 	disp_->dispatch(*this, obj);
 
 	// Lock again before freeing msg_handle (destruction order).
 	// msgpack::object_handle destructor modifies recv_buffer_
-	lk.lock();
+	//lk.lock();
 	return true;
 }
 
-- 
GitLab