diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp index e906d2a39ad8e2207b5569345acd9730ef5bc9df..694170fcbb613705aa5505b19a7fb65f6a2df853 100644 --- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp +++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp @@ -4,6 +4,7 @@ #include "ftl/p2p-rm/mapped_ptr.hpp" #include "ftl/p2p-rm/internal.hpp" #include <ftl/p2p-rm/protocol.hpp> +#include <ftl/p2p-rm/p2p.hpp> #include <ftl/uri.hpp> #include <ftl/uuid.hpp> @@ -27,7 +28,7 @@ namespace rm { class Blob; -class Cluster : public ftl::net::Protocol { +class Cluster : public ftl::net::p2p { public: Cluster(const ftl::URI &uri, std::shared_ptr<ftl::net::Listener> l); Cluster(const char *uri, std::shared_ptr<ftl::net::Listener> l); @@ -114,48 +115,23 @@ class Cluster : public ftl::net::Protocol { /** * Allow member functions to be used for RPC calls by binding with 'this'. */ - template <typename R, typename C, typename ...Args> - auto member(R(C::*f)(Args...)) { - return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); }; - } - - ftl::UUID getOwner(const std::string &uri); + /*template <typename R, typename C, typename ...Args> + void bind_member(const std::string &name, R(C::*f)(Args...)) { + bind(name, [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); }); + }*/ - /** - * Make an RPC call to all connected peers and put into a results vector. - * This function blocks until all peers have responded or an error / - * timeout occurs. The return value indicates the number of failed peers, or - * is 0 if all returned. - */ - template <typename T, typename... ARGS> - int broadcastCall(const std::string &name, std::vector<T> &results, - ARGS... args) { - int count = 0; - auto f = [&count,&results](const T &r) { - std::cout << "broadcast return" << std::endl; - count--; - results.push_back(r); - }; - - for (auto p : peers_) { - count++; - p->asyncCall<T>(name, f, std::forward<ARGS>(args)...); - } - - ftl::net::wait([&count]() { return count == 0; }, 5.0); - return count; - } + std::optional<ftl::UUID> getOwner(const std::string &uri); private: UUID id_; std::string root_; std::shared_ptr<ftl::net::Listener> listener_; - std::vector<std::shared_ptr<ftl::net::Socket>> peers_; + //std::vector<std::shared_ptr<ftl::net::Socket>> peers_; std::map<std::string, ftl::rm::Blob*> blobs_; std::map<int,std::vector<std::tuple<std::shared_ptr<ftl::net::Socket>,std::string>>> rpc_results_; // Cache of seen requests. - std::unordered_map<ftl::UUID,long int> requests_; + //std::unordered_map<ftl::UUID,long int> requests_; ftl::rm::Blob *_lookup(const char *uri); Blob *_create(const char *uri, char *addr, size_t size, size_t count, @@ -163,7 +139,7 @@ class Cluster : public ftl::net::Protocol { void _registerRPC(); private: - std::tuple<ftl::UUID,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri); + //std::tuple<ftl::UUID,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri); }; }; diff --git a/p2p-rm/include/ftl/p2p-rm/p2p.hpp b/p2p-rm/include/ftl/p2p-rm/p2p.hpp index 35cf3b970f697445408d390c71ec2e476acca42f..5b3264fb07e449cf283a8817415ed579ea0157a2 100644 --- a/p2p-rm/include/ftl/p2p-rm/p2p.hpp +++ b/p2p-rm/include/ftl/p2p-rm/p2p.hpp @@ -9,6 +9,7 @@ #include <vector> #include <memory> #include <ftl/net/protocol.hpp> +#include <ftl/net/socket.hpp> #include <iostream> namespace ftl { @@ -17,6 +18,7 @@ namespace net { class p2p : public ftl::net::Protocol { public: p2p(const char *uri) : Protocol(uri) {} + p2p(const std::string &uri) : Protocol(uri) {} void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); }; diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp index af370e880f480f1e4e3c60975eefeac5635265b7..9268942d5fdf43e2be084e95e53b6a64a3b516b4 100644 --- a/p2p-rm/src/cluster.cpp +++ b/p2p-rm/src/cluster.cpp @@ -21,13 +21,15 @@ using std::vector; using std::tuple; using std::shared_ptr; using std::string; +using std::optional; using ftl::URI; using ftl::rm::Blob; using ftl::net::Socket; using ftl::UUID; +using ftl::net::p2p; using namespace std::chrono; -Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) { +Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : p2p(uri.getBaseURI()), listener_(l) { //auto me = this; root_ = uri.getHost(); @@ -42,7 +44,7 @@ Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseU LOG(INFO) << "Cluster UUID = " << id_.to_string(); } -Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), listener_(l) { +Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : p2p(uri), listener_(l) { URI u(uri); if (!u.isValid()) return; if (u.getScheme() != ftl::URI::SCHEME_FTL) return; @@ -74,9 +76,7 @@ void Cluster::reset() { } void Cluster::_registerRPC() { - bind("getowner", member(&Cluster::getOwner_RPC)); - - bind("nop", []() { return true; }); + bind_find_one("getowner", &Cluster::getOwner); bind(P2P_SYNC, [this](uint32_t msg, Socket &s) { LOG(INFO) << "Receive blob sync"; @@ -88,19 +88,16 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) { //p.setProtocol(this); - peers_.push_back(p); - //p2p::addPeer(p); + //peers_.push_back(p); + p2p::addPeer(p); if (!incoming) { - p->onConnect([this](Socket &s) { - UUID q; - int ttl = 10; - + p->onConnect([this](Socket &s) { for (auto b : blobs_) { - auto o = std::get<0>(s.call<tuple<UUID,uint32_t>>("getowner", q, ttl, b.first)); - if (o != id() && o != UUID(0)) { - b.second->owner_ = o; - LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o.to_string(); + auto o = find_one<UUID>("getowner", b.first); + if (o && *o != id()) { + b.second->owner_ = *o; + LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << (*o).to_string(); } } }); @@ -131,27 +128,11 @@ Blob *Cluster::_lookup(const char *uri) { return b; } -tuple<UUID,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) { - if (requests_.count(u) > 0) return {UUID(0),0}; - requests_[u] = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); - +optional<UUID> Cluster::getOwner(const std::string &uri) { if (blobs_.count(uri) > 0) { - return {blobs_[uri]->owner_,0}; + return blobs_[uri]->owner_; } - - vector<tuple<UUID,uint32_t>> results; - broadcastCall("getowner", results, u, ttl-1, uri); - - // TODO Verify all results are equal or empty - if (results.size() == 0) return {UUID(0),0}; - return results[0]; -} - -UUID Cluster::getOwner(const std::string &uri) { - UUID u; - int ttl = 10; - - return std::get<0>(getOwner_RPC(u, ttl, uri)); + return {}; } Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, @@ -174,12 +155,12 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, b->uri_ = std::string(uri); b->owner_ = id(); // I am initial owner by default... - UUID o = getOwner(uri); - if (o == id() || o == UUID(0)) { - // I am the owner! - //b->owner_ = "me"; + auto o = find_one<UUID>("getowner", uri); + + if ((o && *o == id()) || !o) { + } else { - b->owner_ = o; + b->owner_ = *o; } LOG(INFO) << "Mapping address to " << uri;