Skip to content
Snippets Groups Projects
Commit 4109d570 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/#6' into 'main'

#6 Add peer garbage collection

See merge request nicolaspope/beyond-protocol!17
parents b504373f 04bea32d
No related branches found
No related tags found
No related merge requests found
...@@ -114,6 +114,23 @@ class Self { ...@@ -114,6 +114,23 @@ class Self {
*/ */
size_t numberOfNodes() const; size_t numberOfNodes() const;
/**
* @brief Get the maximum allowed number of connections. Any attempt to connect more
* peers will result in them being rejected.
*
* @return size_t
*/
size_t getMaxConnections() const;
/**
* @brief Set the maximum allowed connections. This should only be changed before
* there are any active connections, resizing with active connections could cause
* errors. The default number is 10.
*
* @param m Number of allowed node connections
*/
void setMaxConnections(size_t m);
/** /**
* @brief Will block until all currently registered connnections have completed. * @brief Will block until all currently registered connnections have completed.
* You should not use this, but rather use onConnect. * You should not use this, but rather use onConnect.
......
...@@ -76,6 +76,14 @@ size_t Self::numberOfNodes() const { ...@@ -76,6 +76,14 @@ size_t Self::numberOfNodes() const {
return universe_->numberOfPeers(); return universe_->numberOfPeers();
} }
size_t Self::getMaxConnections() const {
return universe_->getMaxConnections();
}
void Self::setMaxConnections(size_t m) {
universe_->setMaxConnections(m);
}
int Self::waitConnections(int seconds) { int Self::waitConnections(int seconds) {
return universe_->waitConnections(seconds); return universe_->waitConnections(seconds);
} }
......
...@@ -47,6 +47,8 @@ using ftl::net::internal::SocketServer; ...@@ -47,6 +47,8 @@ using ftl::net::internal::SocketServer;
using ftl::net::internal::Server_TCP; using ftl::net::internal::Server_TCP;
using std::chrono::milliseconds; using std::chrono::milliseconds;
constexpr int kDefaultMaxConnections = 10;
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -81,30 +83,12 @@ Universe::Universe() : ...@@ -81,30 +83,12 @@ Universe::Universe() :
active_(true), active_(true),
this_peer(ftl::protocol::id), this_peer(ftl::protocol::id),
impl_(new ftl::net::NetImplDetail()), impl_(new ftl::net::NetImplDetail()),
peers_(10), peers_(kDefaultMaxConnections),
phase_(0), phase_(0),
periodic_time_(1.0), periodic_time_(1.0),
reconnect_attempts_(5), reconnect_attempts_(5),
thread_(Universe::__start, this) { thread_(Universe::__start, this) {
_installBindings(); _installBindings();
// Add an idle timer job to garbage collect peer objects
// Note: Important to be a timer job to ensure no other timer jobs are
// using the object.
// FIXME: Replace use of timer.
/*garbage_timer_ = ftl::timer::add(ftl::timer::kTimerIdle10, [this](int64_t ts) {
if (garbage_.size() > 0) {
UNIQUE_LOCK(net_mutex_,lk);
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) LOG(1) << "Garbage collection";
while (garbage_.size() > 0) {
delete garbage_.front();
garbage_.pop_front();
}
}
}
return true;
});*/
} }
Universe::~Universe() { Universe::~Universe() {
...@@ -112,6 +96,11 @@ Universe::~Universe() { ...@@ -112,6 +96,11 @@ Universe::~Universe() {
CHECK_EQ(peer_instances_, 0); CHECK_EQ(peer_instances_, 0);
} }
void Universe::setMaxConnections(size_t m) {
UNIQUE_LOCK(net_mutex_, lk);
peers_.resize(m);
}
size_t Universe::getSendBufferSize(ftl::URI::scheme_t s) { size_t Universe::getSendBufferSize(ftl::URI::scheme_t s) {
// TODO(Nick): Allow these to be configured again. // TODO(Nick): Allow these to be configured again.
switch (s) { switch (s) {
...@@ -448,6 +437,19 @@ void Universe::_periodic() { ...@@ -448,6 +437,19 @@ void Universe::_periodic() {
i = reconnects_.erase(i); i = reconnects_.erase(i);
}*/ }*/
} }
// Garbage peers may not be needed any more
if (garbage_.size() > 0) {
UNIQUE_LOCK(net_mutex_, lk);
// Only do garbage if processing is idle.
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) DLOG(1) << "Garbage collection";
while (garbage_.size() > 0) {
garbage_.front().reset();
garbage_.pop_front();
}
}
}
} }
void Universe::__start(Universe *u) { void Universe::__start(Universe *u) {
......
...@@ -172,6 +172,9 @@ class Universe { ...@@ -172,6 +172,9 @@ class Universe {
static inline std::shared_ptr<Universe> getInstance() { return instance_; } static inline std::shared_ptr<Universe> getInstance() { return instance_; }
void setMaxConnections(size_t m);
size_t getMaxConnections() const { return peers_.size(); }
// --- Test support ------------------------------------------------------- // --- Test support -------------------------------------------------------
PeerPtr injectFakePeer(std::unique_ptr<ftl::net::internal::SocketConnection> s); PeerPtr injectFakePeer(std::unique_ptr<ftl::net::internal::SocketConnection> s);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment