From 283e4bcc26ed6e4580fc6c235eb194668d527de1 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Thu, 6 Jun 2019 09:51:37 +0300 Subject: [PATCH] WIP Add net callbacks and improve stability --- .../net/cpp/include/ftl/net/universe.hpp | 33 ++++- components/net/cpp/src/universe.cpp | 113 +++++++++++++++--- components/net/cpp/test/net_integration.cpp | 69 +++++++++++ 3 files changed, 200 insertions(+), 15 deletions(-) diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 169eff589..d849f9b0e 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -20,6 +20,10 @@ namespace ftl { namespace net { +struct Error { + int errno; +}; + /** * Represents a group of network peers and their resources, managing the * searching of and sharing of resources across peers. Each universe can @@ -142,6 +146,14 @@ class Universe : public ftl::Configurable { void setLocalID(const ftl::UUID &u) { this_peer = u; }; const ftl::UUID &id() const { return this_peer; } + + // --- Event Handlers ------------------------------------------------------ + + void onConnect(const std::string &, std::function<void(ftl::net::Peer*)>); + void onDisconnect(const std::string &, std::function<void(ftl::net::Peer*)>); + void onError(const std::string &, std::function<void(ftl::net::Peer*, const ftl::net::Error &)>); + + void removeCallbacks(const std::string &); private: void _run(); @@ -149,7 +161,10 @@ class Universe : public ftl::Configurable { void _installBindings(); void _installBindings(Peer *); bool _subscribe(const std::string &res); - void _remove(Peer *); + void _cleanupPeers(); + void _notifyConnect(Peer *); + void _notifyDisconnect(Peer *); + void _notifyError(Peer *, const ftl::net::Error &); static void __start(Universe *u); @@ -167,6 +182,22 @@ class Universe : public ftl::Configurable { ftl::UUID id_; ftl::net::Dispatcher disp_; std::thread thread_; + + struct ConnHandler { + std::string name; + std::function<void(ftl::net::Peer*)> h; + }; + + struct ErrHandler { + std::string name; + std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h; + }; + + // Handlers + std::list<ConnHandler> on_connect_; + std::list<ConnHandler> on_disconnect_; + std::list<ErrHandler> on_error_; + // std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_; }; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 28a6b0056..578570083 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -51,6 +51,8 @@ Universe::Universe(nlohmann::json &config) : } Universe::~Universe() { + LOG(INFO) << "Cleanup Network ..."; + active_ = false; thread_.join(); @@ -88,6 +90,7 @@ Peer *Universe::connect(const string &addr) { p->onConnect([this](Peer &p) { peer_ids_[p.id()] = &p; + _notifyConnect(&p); }); return p; @@ -131,10 +134,9 @@ int Universe::_setDescriptors() { FD_SET(s->_socket(), &sfdread_); } FD_SET(s->_socket(), &sfderror_); - } else if (s) { - _remove(s); } } + _cleanupPeers(); return n; } @@ -159,17 +161,24 @@ void Universe::_installBindings() { } // Note: should be called inside a net lock -void Universe::_remove(Peer *p) { - LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); - for (auto i=peers_.begin(); i!=peers_.end(); i++) { - if ((*i) == p) { - peers_.erase(i); break; +void Universe::_cleanupPeers() { + + auto i = peers_.begin(); + while (i != peers_.end()) { + if (!(*i)->isValid()) { + Peer *p = *i; + LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); + _notifyDisconnect(p); + + auto ix = peer_ids_.find(p->id()); + if (ix != peer_ids_.end()) peer_ids_.erase(ix); + delete p; + + i = peers_.erase(i); + } else { + i++; } } - - auto ix = peer_ids_.find(p->id()); - if (ix != peer_ids_.end()) peer_ids_.erase(ix); - delete p; } Peer *Universe::getPeer(const UUID &id) const { @@ -285,6 +294,7 @@ void Universe::_run() { _installBindings(p); p->onConnect([this](Peer &p) { peer_ids_[p.id()] = &p; + _notifyConnect(&p); }); } } @@ -302,11 +312,86 @@ void Universe::_run() { s->socketError(); s->close(); } - } else if (s != NULL) { - // Erase it - _remove(s); } } + _cleanupPeers(); + } +} + +void Universe::onConnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { + unique_lock<mutex> lk(net_mutex_); + on_connect_.push_back({name, cb}); +} + +void Universe::onDisconnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { + unique_lock<mutex> lk(net_mutex_); + on_disconnect_.push_back({name, cb}); +} + +void Universe::onError(const std::string &name, std::function<void(ftl::net::Peer*, const ftl::net::Error &)> cb) { + unique_lock<mutex> lk(net_mutex_); + on_error_.push_back({name, cb}); +} + +void Universe::removeCallbacks(const std::string &name) { + unique_lock<mutex> lk(net_mutex_); + { + auto i = on_connect_.begin(); + while (i != on_connect_.end()) { + if ((*i).name == name) { + i = on_connect_.erase(i); + } else { + i++; + } + } + } + + { + auto i = on_disconnect_.begin(); + while (i != on_disconnect_.end()) { + if ((*i).name == name) { + i = on_disconnect_.erase(i); + } else { + i++; + } + } + } + + { + auto i = on_error_.begin(); + while (i != on_error_.end()) { + if ((*i).name == name) { + i = on_error_.erase(i); + } else { + i++; + } + } + } +} + +void Universe::_notifyConnect(Peer *p) { + unique_lock<mutex> lk(net_mutex_); + for (auto &i : on_connect_) { + try { + i.h(p); + } catch(...) { + LOG(ERROR) << "Exception inside OnConnect hander: " << i.name; + } } } +void Universe::_notifyDisconnect(Peer *p) { + //unique_lock<mutex> lk(net_mutex_); + LOG(INFO) << "NOTIFY DISCONNECT"; + for (auto &i : on_disconnect_) { + try { + i.h(p); + } catch(...) { + LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.name; + } + } +} + +void Universe::_notifyError(Peer *p, const ftl::net::Error &e) { + +} diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index 816edca92..58aaedd22 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -5,6 +5,7 @@ #include <chrono> using ftl::net::Universe; +using ftl::net::Peer; using std::this_thread::sleep_for; using std::chrono::milliseconds; @@ -76,6 +77,74 @@ TEST_CASE("Universe::connect()", "[net]") { //fin_server(); } +TEST_CASE("Universe::onConnect()", "[net]") { + Universe a; + Universe b; + + a.listen("tcp://localhost:7077"); + + SECTION("single valid remote init connection") { + bool done = false; + + a.onConnect("test", [&done](Peer *p) { + done = true; + }); + + b.connect("tcp://localhost:7077")->waitConnection(); + sleep_for(milliseconds(100)); + REQUIRE( done ); + } + + SECTION("single valid init connection") { + bool done = false; + + b.onConnect("test", [&done](Peer *p) { + done = true; + }); + + b.connect("tcp://localhost:7077")->waitConnection(); + sleep_for(milliseconds(100)); + REQUIRE( done ); + } +} + +TEST_CASE("Universe::onDisconnect()", "[net]") { + Universe a; + Universe b; + + a.listen("tcp://localhost:7077"); + + SECTION("single valid remote close") { + bool done = false; + + a.onDisconnect("test", [&done](Peer *p) { + done = true; + }); + + Peer *p = b.connect("tcp://localhost:7077"); + p->waitConnection(); + sleep_for(milliseconds(100)); + p->close(); + sleep_for(milliseconds(1100)); + REQUIRE( done ); + } + + SECTION("single valid close") { + bool done = false; + + b.onDisconnect("test", [&done](Peer *p) { + done = true; + }); + + Peer *p = b.connect("tcp://localhost:7077"); + p->waitConnection(); + sleep_for(milliseconds(100)); + p->close(); + sleep_for(milliseconds(1100)); + REQUIRE( done ); + } +} + TEST_CASE("Universe::broadcast()", "[net]") { Universe a; Universe b; -- GitLab