diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index b498ca9d00318835589092e3989ce227c3dc4866..69af43b34aefff488f03c2425909a7dcc3a68da8 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -7,7 +7,7 @@ #define POOL_SIZE 10 -#define DEBUG_MUTEX +//#define DEBUG_MUTEX #define MUTEX_TIMEOUT 20 #if defined DEBUG_MUTEX diff --git a/components/net/cpp/src/dispatcher.cpp b/components/net/cpp/src/dispatcher.cpp index a7d504627bb863b9cee8ba5976fe0b01fed211e3..3231b8ddc4604c7929a04c5c33a36385e1bd94ea 100644 --- a/components/net/cpp/src/dispatcher.cpp +++ b/components/net/cpp/src/dispatcher.cpp @@ -65,15 +65,15 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { // assert(type == 0); if (type == 1) { - LOG(INFO) << "RPC return for " << id; + //DLOG(INFO) << "RPC return for " << id; s._dispatchResponse(id, args); } else if (type == 0) { - LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); + //DLOG(INFO) << "RPC " << name << "() <- " << s.getURI(); auto func = _locateHandler(name); if (func) { - LOG(INFO) << "Found binding for " << name; + //DLOG(INFO) << "Found binding for " << name; try { auto result = (*func)(args); //->get(); s._sendResponse(id, result->get()); diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 5c7b9acdc0608917aa61dd6d786be4cc4b2bd2c7..25059b5473c154d31ea6b31950a3e5e5eff02de8 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -514,11 +514,11 @@ bool Peer::_data() { _data(); }); - if (status_ != kConnected) { + if (status_ == kConnecting) { // If not connected, must lock to make sure no other thread performs this step UNIQUE_LOCK(recv_mtx_,lk); // Verify still not connected after lock - if (status_ != kConnected) { + if (status_ == kConnecting) { // First message must be a handshake try { tuple<uint32_t, std::string, msgpack::object> hs; @@ -554,7 +554,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO: Handle error reporting... UNIQUE_LOCK(cb_mtx_,lk); if (callbacks_.count(id) > 0) { - DLOG(1) << "Received return RPC value"; + //DLOG(1) << "Received return RPC value"; // Allow for unlock before callback auto cb = std::move(callbacks_[id]); @@ -576,7 +576,6 @@ void Peer::cancelCall(int id) { } void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { - LOG(INFO) << "Sending response: " << id; Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); UNIQUE_LOCK(send_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 4d1d00499e6d42038a8a7190bba529a53f445cb3..29cf1254f9a4926190d1753eb4ecc0836dfcbb99 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -1,4 +1,5 @@ #include <ftl/net/universe.hpp> +#include <ftl/timer.hpp> #include <chrono> #ifdef WIN32 @@ -38,6 +39,8 @@ Universe::Universe() : reconnect_attempts_(50), thread_(Universe::__start, this) { _installBindings(); + + LOG(WARNING) << "Deprecated Universe constructor"; } Universe::Universe(nlohmann::json &config) : @@ -53,7 +56,22 @@ Universe::Universe(nlohmann::json &config) : _installBindings(); - LOG(INFO) << "SEND BUFFER SIZE = " << send_size_; + // Add an idle timer job to garbage collect peer objects + // Note: Important to be a timer job to ensure no other timer jobs are + // using the object. + ftl::timer::add(ftl::timer::kTimerIdle10, [this](int64_t ts) { + if (garbage_.size() > 0) { + UNIQUE_LOCK(net_mutex_,lk); + if (ftl::pool.n_idle() == ftl::pool.size()) { + if (garbage_.size() > 0) LOG(INFO) << "Garbage collection"; + while (garbage_.size() > 0) { + delete garbage_.front(); + garbage_.pop_front(); + } + } + } + return true; + }); } Universe::~Universe() { @@ -184,17 +202,6 @@ void Universe::_installBindings() { // Note: should be called inside a net lock void Universe::_cleanupPeers() { - - if (ftl::pool.n_idle() == ftl::pool.size()) { - if (garbage_.size() > 0) LOG(INFO) << "Garbage collection"; - while (garbage_.size() > 0) { - // FIXME: There is possibly still something with a peer pointer - // that is causing this throw an exception sometimes? - delete garbage_.front(); - garbage_.pop_front(); - } - } - auto i = peers_.begin(); while (i != peers_.end()) { if (!(*i)->isValid()) {