From 33724b2030d58e4795dd42e7ff96d7d94e4ff83e Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Fri, 6 May 2022 23:04:40 +0100 Subject: [PATCH] Fix peer threading bugs --- src/peer.cpp | 20 ++++++++++++++++---- src/peer.hpp | 1 + test/net_integration.cpp | 7 ++++--- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/peer.cpp b/src/peer.cpp index 0dfa85c..e2a3c64 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -216,6 +216,7 @@ void Peer::close(bool retry) { } void Peer::_close(bool retry) { + if (status_ == NodeStatus::kDisconnected) return; status_ = NodeStatus::kDisconnected; if (sock_->is_valid()) { @@ -325,8 +326,11 @@ void Peer::data() { is_waiting_ = false; lk.unlock(); + ++job_count_; + ftl::pool.push([this](int id) { _data(); + --job_count_; }); } } @@ -372,7 +376,8 @@ bool Peer::_data() { msgpack::object obj = msg_handle.get(); // more data: repeat (loop) - ftl::pool.push([this](int id) { _data(); }); + ++job_count_; + ftl::pool.push([this](int id) { _data(); --job_count_; }); if (status_ == NodeStatus::kConnecting) { // If not connected, must lock to make sure no other thread performs this step @@ -537,7 +542,14 @@ int Peer::_send() { } Peer::~Peer() { - UNIQUE_LOCK(send_mtx_,lk1); - UNIQUE_LOCK(recv_mtx_,lk2); - _close(false); + { + UNIQUE_LOCK(send_mtx_,lk1); + UNIQUE_LOCK(recv_mtx_,lk2); + _close(false); + } + + // Prevent deletion if there are any jobs remaining + while (job_count_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } diff --git a/src/peer.hpp b/src/peer.hpp index 06fcb35..e26c1de 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -271,6 +271,7 @@ private: // Functions // debug variables, see comments for data() in peer.cpp for details std::atomic_uint64_t dbg_recv_begin_ctr_ = 0; std::atomic_uint64_t dbg_recv_end_ctr_ = 0; + std::atomic_int job_count_ = 0; // reconnect when clean disconnect received from remote bool reconnect_on_remote_disconnect_ = true; diff --git a/test/net_integration.cpp b/test/net_integration.cpp index 6095d62..94f6963 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -3,6 +3,7 @@ #include <ftl/protocol/self.hpp> #include <ftl/protocol/node.hpp> #include <ftl/uri.hpp> +#include <ftl/exception.hpp> #include <thread> #include <chrono> @@ -53,10 +54,10 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE( ftl::getSelf()->numberOfNodes() == 1); } - /*SECTION("invalid protocol") { + SECTION("invalid protocol") { bool throws = false; try { - auto p = b.connect("http://localhost:1234"); + auto p = ftl::createNode("http://localhost:1234"); } catch (const ftl::exception& ex) { ex.ignore(); @@ -65,7 +66,7 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE(throws); } - SECTION("automatic reconnect, after clean disconnect") { + /*SECTION("automatic reconnect, after clean disconnect") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); -- GitLab