diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp index 0837878df271ab26758979b7c49110ebe4409fcf..2aadeaebcabb653c2592e89effe9a19db39c1248 100644 --- a/include/ftl/protocol/self.hpp +++ b/include/ftl/protocol/self.hpp @@ -8,6 +8,7 @@ #include <ftl/uuid.hpp> #include <ftl/uri.hpp> +#include <ftl/handle.hpp> #include <memory> @@ -18,6 +19,10 @@ class Universe; namespace protocol { +struct Error { + int errno; +}; + class Self { public: /** Peer for outgoing connection: resolve address and connect */ @@ -58,6 +63,10 @@ class Self { /** get webservice peer pointer, returns nullptr if not connected to webservice */ std::shared_ptr<ftl::protocol::Node> getWebService() const; + ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); + ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); + ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)>&); + protected: std::shared_ptr<ftl::net::Universe> universe_; }; diff --git a/src/peer.cpp b/src/peer.cpp index e2a3c647bed33df684a53c1c028d2ef782248e20..91c2375bc14ec75367eb061dd9eabb650db7b1a9 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -498,14 +498,14 @@ bool Peer::waitConnection() { std::unique_lock<std::mutex> lk(m); std::condition_variable cv; - Callback h = net_->onConnect([this, &cv](const std::shared_ptr<Peer> &p) { + auto h = net_->onConnect([this, &cv](const std::shared_ptr<Peer> &p) { if (p.get() == this) { cv.notify_one(); } + return true; }); cv.wait_for(lk, seconds(1), [this]() { return status_ == NodeStatus::kConnected;}); - net_->removeCallback(h); return status_ == NodeStatus::kConnected; } diff --git a/src/self.cpp b/src/self.cpp index 603c9363251fdac07e7f12975c32c8b6f03c9945..63bdf7257542f351b1d2af35d0e53e9cac602db0 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -46,3 +46,22 @@ std::shared_ptr<ftl::protocol::Node> Self::getNode(const ftl::UUID &pid) const { std::shared_ptr<ftl::protocol::Node> Self::getWebService() const { return std::make_shared<ftl::protocol::Node>(universe_->getWebService()); } + +ftl::Handle Self::onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) { + return universe_->onConnect([cb](const std::shared_ptr<ftl::net::Peer> &p) { + return cb(std::make_shared<ftl::protocol::Node>(p)); + }); +} + +ftl::Handle Self::onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)> &cb) { + return universe_->onDisconnect([cb](const std::shared_ptr<ftl::net::Peer> &p) { + return cb(std::make_shared<ftl::protocol::Node>(p)); + }); +} + +ftl::Handle Self::onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)> &cb) { + return universe_->onError([cb](const std::shared_ptr<ftl::net::Peer> &p, const ftl::net::Error &err) { + ftl::protocol::Error perr = {}; + return cb(std::make_shared<ftl::protocol::Node>(p), perr); + }); +} diff --git a/src/universe.cpp b/src/universe.cpp index f8e2fa18c4ae9533ac47d0c2224edd907d0db054..bdaac2db4d8ca177bf0dbc9c046a9e68e312e426 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -69,7 +69,6 @@ struct NetImplDetail { #define WS_SEND_BUFFER_SIZE (1024*1024) #define WS_RECEIVE_BUFFER_SIZE (62*1024) -Callback ftl::net::Universe::cbid__ = 0; std::shared_ptr<Universe> Universe::instance_ = nullptr; Universe::Universe() : @@ -508,61 +507,16 @@ void Universe::_run() { } } -Callback Universe::onConnect(const std::function<void(const std::shared_ptr<Peer>&)> &cb) { - UNIQUE_LOCK(handler_mutex_,lk); - Callback id = cbid__++; - on_connect_.push_back({id, cb}); - return id; +ftl::Handle Universe::onConnect(const std::function<bool(const std::shared_ptr<Peer>&)> &cb) { + return on_connect_.on(cb); } -Callback Universe::onDisconnect(const std::function<void(const std::shared_ptr<Peer>&)> &cb) { - UNIQUE_LOCK(handler_mutex_,lk); - Callback id = cbid__++; - on_disconnect_.push_back({id, cb}); - return id; +ftl::Handle Universe::onDisconnect(const std::function<bool(const std::shared_ptr<Peer>&)> &cb) { + return on_disconnect_.on(cb); } -Callback Universe::onError(const std::function<void(const std::shared_ptr<Peer>&, const ftl::net::Error &)> &cb) { - UNIQUE_LOCK(handler_mutex_,lk); - Callback id = cbid__++; - on_error_.push_back({id, cb}); - return id; -} - -void Universe::removeCallback(Callback cbid) { - UNIQUE_LOCK(handler_mutex_,lk); - { - auto i = on_connect_.begin(); - while (i != on_connect_.end()) { - if ((*i).id == cbid) { - i = on_connect_.erase(i); - } else { - i++; - } - } - } - - { - auto i = on_disconnect_.begin(); - while (i != on_disconnect_.end()) { - if ((*i).id == cbid) { - i = on_disconnect_.erase(i); - } else { - i++; - } - } - } - - { - auto i = on_error_.begin(); - while (i != on_error_.end()) { - if ((*i).id == cbid) { - i = on_error_.erase(i); - } else { - i++; - } - } - } +ftl::Handle Universe::onError(const std::function<bool(const std::shared_ptr<Peer>&, const ftl::net::Error &)> &cb) { + return on_error_.on(cb); } static std::shared_ptr<Peer> findPeer(const std::vector<std::shared_ptr<Peer>> &peers, const Peer *p) { @@ -578,27 +532,21 @@ void Universe::_notifyConnect(Peer *p) { peer_ids_[ptr->id()] = ptr->localID(); - for (auto &i : on_connect_) { - try { - i.h(ptr); - } catch(...) { - LOG(ERROR) << "Exception inside OnConnect hander: " << i.id; - } + try { + on_connect_.trigger(ptr); + } catch(const std::exception &e) { + LOG(ERROR) << "Exception inside OnConnect hander: " << e.what(); } } void Universe::_notifyDisconnect(Peer *p) { - // In all cases, should already be locked outside this function call - //unique_lock<mutex> lk(net_mutex_); UNIQUE_LOCK(handler_mutex_,lk); const auto ptr = findPeer(peers_, p); - for (auto &i : on_disconnect_) { - try { - i.h(ptr); - } catch(...) { - LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.id; - } + try { + on_disconnect_.trigger(ptr); + } catch(const std::exception &e) { + LOG(ERROR) << "Exception inside OnDisconnect hander: " << e.what(); } } diff --git a/src/universe.hpp b/src/universe.hpp index c688238b07ebf04b3d7293ac8bfb92d9fbf99464..dd15b16e03c24b63acaf392cdaacad5781c0bea4 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -161,11 +161,9 @@ public: // --- Event Handlers ------------------------------------------------------ - Callback onConnect(const std::function<void(const std::shared_ptr<ftl::net::Peer>&)>&); - Callback onDisconnect(const std::function<void(const std::shared_ptr<ftl::net::Peer>&)>&); - Callback onError(const std::function<void(const std::shared_ptr<Peer>&, const ftl::net::Error &)>&); - - void removeCallback(Callback cbid); + ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::net::Peer>&)>&); + ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::net::Peer>&)>&); + ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::net::Peer>&, const ftl::net::Error &)>&); size_t getSendBufferSize(ftl::URI::scheme_t s); size_t getRecvBufferSize(ftl::URI::scheme_t s); @@ -209,22 +207,10 @@ private: double periodic_time_; int reconnect_attempts_; - struct ConnHandler { - Callback id; - std::function<void(const std::shared_ptr<ftl::net::Peer>&)> h; - }; - - struct ErrHandler { - Callback id; - std::function<void(const std::shared_ptr<ftl::net::Peer>&, const ftl::net::Error &)> h; - }; - - // Handlers - std::list<ConnHandler> on_connect_; - std::list<ConnHandler> on_disconnect_; - std::list<ErrHandler> on_error_; + ftl::Handler<const std::shared_ptr<ftl::net::Peer>&> on_connect_; + ftl::Handler<const std::shared_ptr<ftl::net::Peer>&> on_disconnect_; + ftl::Handler<const std::shared_ptr<ftl::net::Peer>&, const ftl::net::Error &> on_error_; - static Callback cbid__; static std::shared_ptr<Universe> instance_; // NOTE: Must always be last member diff --git a/test/net_integration.cpp b/test/net_integration.cpp index 94f6963fbfb7fda122594f5223e57258fd0ab182..4b880363b64a0e61258b41fd8b0a1e265ffcff1f 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -66,58 +66,64 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE(throws); } - /*SECTION("automatic reconnect, after clean disconnect") { + SECTION("automatic reconnect, after clean disconnect") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); - auto p_connecting = b.connect(uri); + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + + auto p_connecting = ftl::createNode(uri); REQUIRE(p_connecting); bool disconnected_once = false; - a.onConnect([&](ftl::net::Peer* p_listening) { + auto h = ftl::getSelf()->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { if (!disconnected_once) { // remote closes on first connection disconnected_once = true; - p_listening->close(); + p_listening->close(true); LOG(INFO) << "disconnected"; } else { // notify on second cv.notify_one(); } + return true; }); REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); REQUIRE(p_connecting->isConnected()); } - SECTION("automatic reconnect, socket close") { + SECTION("automatic reconnect from originating connection") { std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lk(mtx); - auto p_connecting = b.connect(uri); + auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); + + auto p_connecting = ftl::createNode(uri); REQUIRE(p_connecting); bool disconnected_once = false; - a.onConnect([&](ftl::net::Peer* p_listening) { + auto h = ftl::getSelf()->onConnect([&](const std::shared_ptr<ftl::protocol::Node> &p_listening) { if (!disconnected_once) { // disconnect on first connection disconnected_once = true; - p_listening->rawClose(); + p_connecting->close(true); LOG(INFO) << "disconnected"; } else { // notify on second cv.notify_one(); } + return true; }); REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); REQUIRE(p_connecting->isConnected()); - }*/ + } } /*TEST_CASE("Universe::onConnect()", "[net]") {