Skip to content
Snippets Groups Projects
Commit 6ce66362 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/163/garbage' into 'master'

Resolves #163 garbage segfault

Closes #163

See merge request nicolas.pope/ftl!100
parents 0b76aafe 4828bbd9
No related branches found
No related tags found
1 merge request!100Resolves #163 garbage segfault
Pipeline #13098 passed
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
#define POOL_SIZE 10 #define POOL_SIZE 10
#define DEBUG_MUTEX //#define DEBUG_MUTEX
#define MUTEX_TIMEOUT 20 #define MUTEX_TIMEOUT 20
#if defined DEBUG_MUTEX #if defined DEBUG_MUTEX
......
...@@ -65,15 +65,15 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { ...@@ -65,15 +65,15 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
// assert(type == 0); // assert(type == 0);
if (type == 1) { if (type == 1) {
LOG(INFO) << "RPC return for " << id; //DLOG(INFO) << "RPC return for " << id;
s._dispatchResponse(id, args); s._dispatchResponse(id, args);
} else if (type == 0) { } else if (type == 0) {
LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); //DLOG(INFO) << "RPC " << name << "() <- " << s.getURI();
auto func = _locateHandler(name); auto func = _locateHandler(name);
if (func) { if (func) {
LOG(INFO) << "Found binding for " << name; //DLOG(INFO) << "Found binding for " << name;
try { try {
auto result = (*func)(args); //->get(); auto result = (*func)(args); //->get();
s._sendResponse(id, result->get()); s._sendResponse(id, result->get());
......
...@@ -514,11 +514,11 @@ bool Peer::_data() { ...@@ -514,11 +514,11 @@ bool Peer::_data() {
_data(); _data();
}); });
if (status_ != kConnected) { if (status_ == kConnecting) {
// If not connected, must lock to make sure no other thread performs this step // If not connected, must lock to make sure no other thread performs this step
UNIQUE_LOCK(recv_mtx_,lk); UNIQUE_LOCK(recv_mtx_,lk);
// Verify still not connected after lock // Verify still not connected after lock
if (status_ != kConnected) { if (status_ == kConnecting) {
// First message must be a handshake // First message must be a handshake
try { try {
tuple<uint32_t, std::string, msgpack::object> hs; tuple<uint32_t, std::string, msgpack::object> hs;
...@@ -554,7 +554,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { ...@@ -554,7 +554,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
// TODO: Handle error reporting... // TODO: Handle error reporting...
UNIQUE_LOCK(cb_mtx_,lk); UNIQUE_LOCK(cb_mtx_,lk);
if (callbacks_.count(id) > 0) { if (callbacks_.count(id) > 0) {
DLOG(1) << "Received return RPC value"; //DLOG(1) << "Received return RPC value";
// Allow for unlock before callback // Allow for unlock before callback
auto cb = std::move(callbacks_[id]); auto cb = std::move(callbacks_[id]);
...@@ -576,7 +576,6 @@ void Peer::cancelCall(int id) { ...@@ -576,7 +576,6 @@ void Peer::cancelCall(int id) {
} }
void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
LOG(INFO) << "Sending response: " << id;
Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res);
UNIQUE_LOCK(send_mtx_,lk); UNIQUE_LOCK(send_mtx_,lk);
if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
......
#include <ftl/net/universe.hpp> #include <ftl/net/universe.hpp>
#include <ftl/timer.hpp>
#include <chrono> #include <chrono>
#ifdef WIN32 #ifdef WIN32
...@@ -38,6 +39,8 @@ Universe::Universe() : ...@@ -38,6 +39,8 @@ Universe::Universe() :
reconnect_attempts_(50), reconnect_attempts_(50),
thread_(Universe::__start, this) { thread_(Universe::__start, this) {
_installBindings(); _installBindings();
LOG(WARNING) << "Deprecated Universe constructor";
} }
Universe::Universe(nlohmann::json &config) : Universe::Universe(nlohmann::json &config) :
...@@ -53,7 +56,22 @@ Universe::Universe(nlohmann::json &config) : ...@@ -53,7 +56,22 @@ Universe::Universe(nlohmann::json &config) :
_installBindings(); _installBindings();
LOG(INFO) << "SEND BUFFER SIZE = " << send_size_; // Add an idle timer job to garbage collect peer objects
// Note: Important to be a timer job to ensure no other timer jobs are
// using the object.
ftl::timer::add(ftl::timer::kTimerIdle10, [this](int64_t ts) {
if (garbage_.size() > 0) {
UNIQUE_LOCK(net_mutex_,lk);
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) LOG(INFO) << "Garbage collection";
while (garbage_.size() > 0) {
delete garbage_.front();
garbage_.pop_front();
}
}
}
return true;
});
} }
Universe::~Universe() { Universe::~Universe() {
...@@ -184,17 +202,6 @@ void Universe::_installBindings() { ...@@ -184,17 +202,6 @@ void Universe::_installBindings() {
// Note: should be called inside a net lock // Note: should be called inside a net lock
void Universe::_cleanupPeers() { void Universe::_cleanupPeers() {
if (ftl::pool.n_idle() == ftl::pool.size()) {
if (garbage_.size() > 0) LOG(INFO) << "Garbage collection";
while (garbage_.size() > 0) {
// FIXME: There is possibly still something with a peer pointer
// that is causing this throw an exception sometimes?
delete garbage_.front();
garbage_.pop_front();
}
}
auto i = peers_.begin(); auto i = peers_.begin();
while (i != peers_.end()) { while (i != peers_.end()) {
if (!(*i)->isValid()) { if (!(*i)->isValid()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment