Skip to content
Snippets Groups Projects
Commit 6ce02750 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/#61' into 'main'

#61 Peer and universe shutdown fixes

See merge request beyondaka/beyond-protocol!43
parents 38f230f4 ae8ea389
No related branches found
No related tags found
No related merge requests found
......@@ -568,9 +568,11 @@ Peer::~Peer() {
}
// Prevent deletion if there are any jobs remaining
if (job_count_ > 0 && ftl::pool.size() > 0) {
int count = 10;
while (job_count_ > 0 && ftl::pool.size() > 0 && count-- > 0) {
DLOG(1) << "Waiting on peer jobs... " << job_count_;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
if (job_count_ > 0) LOG(FATAL) << "Peer jobs not terminated";
}
if (job_count_ > 0) LOG(FATAL) << "Peer jobs not terminated";
}
......@@ -192,11 +192,17 @@ void Universe::shutdown() {
active_ = false;
thread_.join();
// FIXME: This shouldn't be needed
if (peer_instances_ > 0 && ftl::pool.size() > 0) {
DLOG(WARNING) << "Waiting on peer destruction... " << peer_instances_;
_cleanupPeers();
while (garbage_.size() > 0) {
_garbage();
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
// Try 10 times to delete the peers
// Note: other threads may still be using the peer object
int count = 10;
while (peer_instances_ > 0 && count-- > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(2));
if (peer_instances_ > 0) LOG(FATAL) << "Peers not destroyed";
}
}
......@@ -467,14 +473,18 @@ void Universe::_periodic() {
// Garbage peers may not be needed any more
if (garbage_.size() > 0) {
UNIQUE_LOCK(net_mutex_, lk);
// Only do garbage if processing is idle.
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) DLOG(1) << "Garbage collection";
while (garbage_.size() > 0) {
garbage_.front().reset();
garbage_.pop_front();
}
_garbage();
}
}
void Universe::_garbage() {
UNIQUE_LOCK(net_mutex_, lk);
// Only do garbage if processing is idle.
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) DLOG(1) << "Garbage collection";
while (garbage_.size() > 0) {
garbage_.front().reset();
garbage_.pop_front();
}
}
}
......
......@@ -191,6 +191,7 @@ class Universe {
void _notifyDisconnect(ftl::net::Peer *);
void _notifyError(ftl::net::Peer *, ftl::protocol::Error, const std::string &);
void _periodic();
void _garbage();
ftl::net::PeerPtr _findPeer(const ftl::net::Peer *p);
void _removePeer(PeerPtr &p);
void _insertPeer(const ftl::net::PeerPtr &ptr);
......
......@@ -24,6 +24,25 @@ static bool try_for(int count, const std::function<bool()> &f) {
// --- Tests -------------------------------------------------------------------
TEST_CASE("Garbage bug", "[net]") {
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
REQUIRE( p );
REQUIRE( p->waitConnection(5) );
REQUIRE( self->waitConnections(5) == 1 );
REQUIRE( ftl::getSelf()->numberOfNodes() == 1);
p.reset();
ftl::protocol::reset();
}
TEST_CASE("Listen and Connect", "[net]") {
auto self = ftl::createDummySelf();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment