Skip to content
Snippets Groups Projects
Commit 33724b20 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fix peer threading bugs

parent cf128c61
No related branches found
No related tags found
No related merge requests found
......@@ -216,6 +216,7 @@ void Peer::close(bool retry) {
}
void Peer::_close(bool retry) {
if (status_ == NodeStatus::kDisconnected) return;
status_ = NodeStatus::kDisconnected;
if (sock_->is_valid()) {
......@@ -325,8 +326,11 @@ void Peer::data() {
is_waiting_ = false;
lk.unlock();
++job_count_;
ftl::pool.push([this](int id) {
_data();
--job_count_;
});
}
}
......@@ -372,7 +376,8 @@ bool Peer::_data() {
msgpack::object obj = msg_handle.get();
// more data: repeat (loop)
ftl::pool.push([this](int id) { _data(); });
++job_count_;
ftl::pool.push([this](int id) { _data(); --job_count_; });
if (status_ == NodeStatus::kConnecting) {
// If not connected, must lock to make sure no other thread performs this step
......@@ -537,7 +542,14 @@ int Peer::_send() {
}
Peer::~Peer() {
{
UNIQUE_LOCK(send_mtx_,lk1);
UNIQUE_LOCK(recv_mtx_,lk2);
_close(false);
}
// Prevent deletion if there are any jobs remaining
while (job_count_ > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
......@@ -271,6 +271,7 @@ private: // Functions
// debug variables, see comments for data() in peer.cpp for details
std::atomic_uint64_t dbg_recv_begin_ctr_ = 0;
std::atomic_uint64_t dbg_recv_end_ctr_ = 0;
std::atomic_int job_count_ = 0;
// reconnect when clean disconnect received from remote
bool reconnect_on_remote_disconnect_ = true;
......
......@@ -3,6 +3,7 @@
#include <ftl/protocol/self.hpp>
#include <ftl/protocol/node.hpp>
#include <ftl/uri.hpp>
#include <ftl/exception.hpp>
#include <thread>
#include <chrono>
......@@ -53,10 +54,10 @@ TEST_CASE("Listen and Connect", "[net]") {
REQUIRE( ftl::getSelf()->numberOfNodes() == 1);
}
/*SECTION("invalid protocol") {
SECTION("invalid protocol") {
bool throws = false;
try {
auto p = b.connect("http://localhost:1234");
auto p = ftl::createNode("http://localhost:1234");
}
catch (const ftl::exception& ex) {
ex.ignore();
......@@ -65,7 +66,7 @@ TEST_CASE("Listen and Connect", "[net]") {
REQUIRE(throws);
}
SECTION("automatic reconnect, after clean disconnect") {
/*SECTION("automatic reconnect, after clean disconnect") {
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment