Skip to content
Snippets Groups Projects
Commit 283e4bcc authored by Nicolas Pope's avatar Nicolas Pope
Browse files

WIP Add net callbacks and improve stability

parent 91eac3b0
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
namespace ftl { namespace ftl {
namespace net { namespace net {
struct Error {
int errno;
};
/** /**
* Represents a group of network peers and their resources, managing the * Represents a group of network peers and their resources, managing the
* searching of and sharing of resources across peers. Each universe can * searching of and sharing of resources across peers. Each universe can
...@@ -142,6 +146,14 @@ class Universe : public ftl::Configurable { ...@@ -142,6 +146,14 @@ class Universe : public ftl::Configurable {
void setLocalID(const ftl::UUID &u) { this_peer = u; }; void setLocalID(const ftl::UUID &u) { this_peer = u; };
const ftl::UUID &id() const { return this_peer; } const ftl::UUID &id() const { return this_peer; }
// --- Event Handlers ------------------------------------------------------
void onConnect(const std::string &, std::function<void(ftl::net::Peer*)>);
void onDisconnect(const std::string &, std::function<void(ftl::net::Peer*)>);
void onError(const std::string &, std::function<void(ftl::net::Peer*, const ftl::net::Error &)>);
void removeCallbacks(const std::string &);
private: private:
void _run(); void _run();
...@@ -149,7 +161,10 @@ class Universe : public ftl::Configurable { ...@@ -149,7 +161,10 @@ class Universe : public ftl::Configurable {
void _installBindings(); void _installBindings();
void _installBindings(Peer *); void _installBindings(Peer *);
bool _subscribe(const std::string &res); bool _subscribe(const std::string &res);
void _remove(Peer *); void _cleanupPeers();
void _notifyConnect(Peer *);
void _notifyDisconnect(Peer *);
void _notifyError(Peer *, const ftl::net::Error &);
static void __start(Universe *u); static void __start(Universe *u);
...@@ -167,6 +182,22 @@ class Universe : public ftl::Configurable { ...@@ -167,6 +182,22 @@ class Universe : public ftl::Configurable {
ftl::UUID id_; ftl::UUID id_;
ftl::net::Dispatcher disp_; ftl::net::Dispatcher disp_;
std::thread thread_; std::thread thread_;
struct ConnHandler {
std::string name;
std::function<void(ftl::net::Peer*)> h;
};
struct ErrHandler {
std::string name;
std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h;
};
// Handlers
std::list<ConnHandler> on_connect_;
std::list<ConnHandler> on_disconnect_;
std::list<ErrHandler> on_error_;
// std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_; // std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_;
}; };
......
...@@ -51,6 +51,8 @@ Universe::Universe(nlohmann::json &config) : ...@@ -51,6 +51,8 @@ Universe::Universe(nlohmann::json &config) :
} }
Universe::~Universe() { Universe::~Universe() {
LOG(INFO) << "Cleanup Network ...";
active_ = false; active_ = false;
thread_.join(); thread_.join();
...@@ -88,6 +90,7 @@ Peer *Universe::connect(const string &addr) { ...@@ -88,6 +90,7 @@ Peer *Universe::connect(const string &addr) {
p->onConnect([this](Peer &p) { p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p; peer_ids_[p.id()] = &p;
_notifyConnect(&p);
}); });
return p; return p;
...@@ -131,10 +134,9 @@ int Universe::_setDescriptors() { ...@@ -131,10 +134,9 @@ int Universe::_setDescriptors() {
FD_SET(s->_socket(), &sfdread_); FD_SET(s->_socket(), &sfdread_);
} }
FD_SET(s->_socket(), &sfderror_); FD_SET(s->_socket(), &sfderror_);
} else if (s) {
_remove(s);
} }
} }
_cleanupPeers();
return n; return n;
} }
...@@ -159,17 +161,24 @@ void Universe::_installBindings() { ...@@ -159,17 +161,24 @@ void Universe::_installBindings() {
} }
// Note: should be called inside a net lock // Note: should be called inside a net lock
void Universe::_remove(Peer *p) { void Universe::_cleanupPeers() {
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
for (auto i=peers_.begin(); i!=peers_.end(); i++) { auto i = peers_.begin();
if ((*i) == p) { while (i != peers_.end()) {
peers_.erase(i); break; if (!(*i)->isValid()) {
Peer *p = *i;
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
_notifyDisconnect(p);
auto ix = peer_ids_.find(p->id());
if (ix != peer_ids_.end()) peer_ids_.erase(ix);
delete p;
i = peers_.erase(i);
} else {
i++;
} }
} }
auto ix = peer_ids_.find(p->id());
if (ix != peer_ids_.end()) peer_ids_.erase(ix);
delete p;
} }
Peer *Universe::getPeer(const UUID &id) const { Peer *Universe::getPeer(const UUID &id) const {
...@@ -285,6 +294,7 @@ void Universe::_run() { ...@@ -285,6 +294,7 @@ void Universe::_run() {
_installBindings(p); _installBindings(p);
p->onConnect([this](Peer &p) { p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p; peer_ids_[p.id()] = &p;
_notifyConnect(&p);
}); });
} }
} }
...@@ -302,11 +312,86 @@ void Universe::_run() { ...@@ -302,11 +312,86 @@ void Universe::_run() {
s->socketError(); s->socketError();
s->close(); s->close();
} }
} else if (s != NULL) {
// Erase it
_remove(s);
} }
} }
_cleanupPeers();
}
}
void Universe::onConnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) {
unique_lock<mutex> lk(net_mutex_);
on_connect_.push_back({name, cb});
}
void Universe::onDisconnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) {
unique_lock<mutex> lk(net_mutex_);
on_disconnect_.push_back({name, cb});
}
void Universe::onError(const std::string &name, std::function<void(ftl::net::Peer*, const ftl::net::Error &)> cb) {
unique_lock<mutex> lk(net_mutex_);
on_error_.push_back({name, cb});
}
void Universe::removeCallbacks(const std::string &name) {
unique_lock<mutex> lk(net_mutex_);
{
auto i = on_connect_.begin();
while (i != on_connect_.end()) {
if ((*i).name == name) {
i = on_connect_.erase(i);
} else {
i++;
}
}
}
{
auto i = on_disconnect_.begin();
while (i != on_disconnect_.end()) {
if ((*i).name == name) {
i = on_disconnect_.erase(i);
} else {
i++;
}
}
}
{
auto i = on_error_.begin();
while (i != on_error_.end()) {
if ((*i).name == name) {
i = on_error_.erase(i);
} else {
i++;
}
}
}
}
void Universe::_notifyConnect(Peer *p) {
unique_lock<mutex> lk(net_mutex_);
for (auto &i : on_connect_) {
try {
i.h(p);
} catch(...) {
LOG(ERROR) << "Exception inside OnConnect hander: " << i.name;
}
} }
} }
void Universe::_notifyDisconnect(Peer *p) {
//unique_lock<mutex> lk(net_mutex_);
LOG(INFO) << "NOTIFY DISCONNECT";
for (auto &i : on_disconnect_) {
try {
i.h(p);
} catch(...) {
LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.name;
}
}
}
void Universe::_notifyError(Peer *p, const ftl::net::Error &e) {
}
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <chrono> #include <chrono>
using ftl::net::Universe; using ftl::net::Universe;
using ftl::net::Peer;
using std::this_thread::sleep_for; using std::this_thread::sleep_for;
using std::chrono::milliseconds; using std::chrono::milliseconds;
...@@ -76,6 +77,74 @@ TEST_CASE("Universe::connect()", "[net]") { ...@@ -76,6 +77,74 @@ TEST_CASE("Universe::connect()", "[net]") {
//fin_server(); //fin_server();
} }
TEST_CASE("Universe::onConnect()", "[net]") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
SECTION("single valid remote init connection") {
bool done = false;
a.onConnect("test", [&done](Peer *p) {
done = true;
});
b.connect("tcp://localhost:7077")->waitConnection();
sleep_for(milliseconds(100));
REQUIRE( done );
}
SECTION("single valid init connection") {
bool done = false;
b.onConnect("test", [&done](Peer *p) {
done = true;
});
b.connect("tcp://localhost:7077")->waitConnection();
sleep_for(milliseconds(100));
REQUIRE( done );
}
}
TEST_CASE("Universe::onDisconnect()", "[net]") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
SECTION("single valid remote close") {
bool done = false;
a.onDisconnect("test", [&done](Peer *p) {
done = true;
});
Peer *p = b.connect("tcp://localhost:7077");
p->waitConnection();
sleep_for(milliseconds(100));
p->close();
sleep_for(milliseconds(1100));
REQUIRE( done );
}
SECTION("single valid close") {
bool done = false;
b.onDisconnect("test", [&done](Peer *p) {
done = true;
});
Peer *p = b.connect("tcp://localhost:7077");
p->waitConnection();
sleep_for(milliseconds(100));
p->close();
sleep_for(milliseconds(1100));
REQUIRE( done );
}
}
TEST_CASE("Universe::broadcast()", "[net]") { TEST_CASE("Universe::broadcast()", "[net]") {
Universe a; Universe a;
Universe b; Universe b;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment