diff --git a/src/peer.cpp b/src/peer.cpp index 0dfa85c26d52e15bbb29689358337c31e3a2c716..e2a3c647bed33df684a53c1c028d2ef782248e20 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -216,6 +216,7 @@ void Peer::close(bool retry) { } void Peer::_close(bool retry) { + if (status_ == NodeStatus::kDisconnected) return; status_ = NodeStatus::kDisconnected; if (sock_->is_valid()) { @@ -325,8 +326,11 @@ void Peer::data() { is_waiting_ = false; lk.unlock(); + ++job_count_; + ftl::pool.push([this](int id) { _data(); + --job_count_; }); } } @@ -372,7 +376,8 @@ bool Peer::_data() { msgpack::object obj = msg_handle.get(); // more data: repeat (loop) - ftl::pool.push([this](int id) { _data(); }); + ++job_count_; + ftl::pool.push([this](int id) { _data(); --job_count_; }); if (status_ == NodeStatus::kConnecting) { // If not connected, must lock to make sure no other thread performs this step @@ -537,7 +542,14 @@ int Peer::_send() { } Peer::~Peer() { - UNIQUE_LOCK(send_mtx_,lk1); - UNIQUE_LOCK(recv_mtx_,lk2); - _close(false); + { + UNIQUE_LOCK(send_mtx_,lk1); + UNIQUE_LOCK(recv_mtx_,lk2); + _close(false); + } + + // Prevent deletion if there are any jobs remaining + while (job_count_ > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } } diff --git a/src/peer.hpp b/src/peer.hpp index 06fcb354754afee061ffd06738b49e5aea9ff27c..e26c1def70c511f958671c15f7d1f2de5820ec81 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -271,6 +271,7 @@ private: // Functions // debug variables, see comments for data() in peer.cpp for details std::atomic_uint64_t dbg_recv_begin_ctr_ = 0; std::atomic_uint64_t dbg_recv_end_ctr_ = 0; + std::atomic_int job_count_ = 0; // reconnect when clean disconnect received from remote bool reconnect_on_remote_disconnect_ = true; diff --git a/test/net_integration.cpp b/test/net_integration.cpp index 6095d624a20d06c40e2f99b66926a38dacd663d3..94f6963fbfb7fda122594f5223e57258fd0ab182 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -3,6 +3,7 @@ #include <ftl/protocol/self.hpp> #include <ftl/protocol/node.hpp> #include <ftl/uri.hpp> +#include <ftl/exception.hpp> #include <thread> #include <chrono> @@ -53,10 +54,10 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE( ftl::getSelf()->numberOfNodes() == 1); } - /*SECTION("invalid protocol") { + SECTION("invalid protocol") { bool throws = false; try { - auto p = b.connect("http://localhost:1234"); + auto p = ftl::createNode("http://localhost:1234"); } catch (const ftl::exception& ex) { ex.ignore(); @@ -65,7 +66,7 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE(throws); } - SECTION("automatic reconnect, after clean disconnect") { + /*SECTION("automatic reconnect, after clean disconnect") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx);