diff --git a/p2p-rm/include/ftl/p2p-rm/blob.hpp b/p2p-rm/include/ftl/p2p-rm/blob.hpp index aa187047e6fc08ca223dff2d4e59f9147ed969d1..e9355b0e27dd6261814328a00e981b508edb575a 100644 --- a/p2p-rm/include/ftl/p2p-rm/blob.hpp +++ b/p2p-rm/include/ftl/p2p-rm/blob.hpp @@ -4,12 +4,15 @@ #include <mutex> #include <shared_mutex> #include <ftl/net.hpp> +#include <ftl/uuid.hpp> #include <string> #include <vector> namespace ftl { namespace rm { +class Cluster; + /* NOT TO BE USED DIRECTLY */ struct Blob { //Blob(); @@ -19,10 +22,12 @@ struct Blob { char *data_; size_t size_; std::string uri_; - std::string owner_; + ftl::UUID owner_; uint32_t blobid_; + Cluster *cluster_; void finished(); + void becomeOwner(); //void write(size_t offset, const char *data, size_t size); //void read(size_t offset, char *data, size_t size); void sync(size_t offset, size_t size); diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp index 4df425a6743713c92c96a29407f53e7fa12ca856..e906d2a39ad8e2207b5569345acd9730ef5bc9df 100644 --- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp +++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp @@ -35,6 +35,12 @@ class Cluster : public ftl::net::Protocol { void reset(); inline void destroy() { reset(); } + const UUID &id() const { return id_; } + + template <typename T> + static bool is_owner(const ftl::mapped_ptr<T> &p) { + return (p.blob) ? p.blob->cluster_->id() == p.blob->owner_ : false; + } /** * Obtain a remote pointer from a URI. A nullptr is returned if the URI is @@ -113,7 +119,7 @@ class Cluster : public ftl::net::Protocol { return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); }; } - std::string getOwner(const std::string &uri); + ftl::UUID getOwner(const std::string &uri); /** * Make an RPC call to all connected peers and put into a results vector. @@ -126,6 +132,7 @@ class Cluster : public ftl::net::Protocol { ARGS... args) { int count = 0; auto f = [&count,&results](const T &r) { + std::cout << "broadcast return" << std::endl; count--; results.push_back(r); }; @@ -140,6 +147,7 @@ class Cluster : public ftl::net::Protocol { } private: + UUID id_; std::string root_; std::shared_ptr<ftl::net::Listener> listener_; std::vector<std::shared_ptr<ftl::net::Socket>> peers_; @@ -152,10 +160,10 @@ class Cluster : public ftl::net::Protocol { ftl::rm::Blob *_lookup(const char *uri); Blob *_create(const char *uri, char *addr, size_t size, size_t count, ftl::rm::flags_t flags, const std::string &tname); - void _registerRPC(ftl::net::Socket &s); + void _registerRPC(); private: - std::tuple<std::string,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri); + std::tuple<ftl::UUID,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri); }; }; diff --git a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp index 3aae5f0ad919924113c50b8251aad9b1be541909..7ae8d0d756325c373322a3661edad8f291e40693 100644 --- a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp +++ b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp @@ -23,8 +23,6 @@ namespace ftl { T *get() { return blob->data_; } size_t size() const { return blob->size_; } - bool is_owner() const { return false; } - write_ref<T> operator*(); write_ref<T> operator[](ptrdiff_t idx); write_ref<T> writable() { return ftl::write_ref<T>(*this); } @@ -84,7 +82,10 @@ namespace ftl { mapped_ptr<T> ptr_; // Constructor - write_ref(mapped_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) {} + write_ref(mapped_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) { + // Ensure ownership + ptr.blob->becomeOwner(); + } ~write_ref() { ptr_.blob->finished(); } bool is_valid() const { return !ptr_.is_null(); } diff --git a/p2p-rm/include/ftl/uuid.hpp b/p2p-rm/include/ftl/uuid.hpp index 674c274e22e52a8f7d8e55b1f283019a2ed82246..8ec0931e6deb0c2fb56d32e680eaafaa5afbb47f 100644 --- a/p2p-rm/include/ftl/uuid.hpp +++ b/p2p-rm/include/ftl/uuid.hpp @@ -11,6 +11,7 @@ namespace ftl { class UUID { public: UUID() { uuid_generate(uuid_); }; + UUID(int u) { memset(uuid_,u,16); }; UUID(const UUID &u) { memcpy(uuid_,u.uuid_,16); } bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; } @@ -19,6 +20,12 @@ namespace ftl { std::string str() const { return std::string((char*)uuid_,16); }; const unsigned char *raw() const { return &uuid_[0]; } + std::string to_string() const { + char b[37]; + uuid_unparse(uuid_, b); + return std::string(b); + } + MSGPACK_DEFINE(uuid_); private: diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp index c322aeefc01919c875d925471d0320347b26c6c5..cdcdbb7e74375fd12687e34ab5096a94f4bfb5c7 100644 --- a/p2p-rm/src/blob.cpp +++ b/p2p-rm/src/blob.cpp @@ -2,6 +2,7 @@ #include <ftl/p2p-rm/blob.hpp> #include <ftl/net/socket.hpp> #include <ftl/p2p-rm/protocol.hpp> +#include <ftl/p2p-rm/cluster.hpp> #include <iostream> @@ -35,6 +36,12 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { } } +void ftl::rm::Blob::becomeOwner() { + if (cluster_->id() == owner_) return; + + std::cout << "NOT OWNED BUT WRITING" << std::endl; +} + void ftl::rm::Blob::finished() { } diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp index f309b2be924dc263aa8becf3a4b4ce98dbbe2533..b7ee117bc39fe2170a540837e46033afecf32e25 100644 --- a/p2p-rm/src/cluster.cpp +++ b/p2p-rm/src/cluster.cpp @@ -30,12 +30,16 @@ using namespace std::chrono; Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) { //auto me = this; root_ = uri.getHost(); + + _registerRPC(); if (l != nullptr) { l->onConnection([&](shared_ptr<Socket> &s) { addPeer(s, true); }); } + + LOG(INFO) << "Cluster UUID = " << id_.to_string(); } Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), listener_(l) { @@ -45,12 +49,17 @@ Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), liste if (u.getPath().size() > 0) return; root_ = u.getHost(); + + _registerRPC(); if (l != nullptr) { + l->setProtocol(this); l->onConnection([&](shared_ptr<Socket> &s) { addPeer(s, true); }); } + + LOG(INFO) << "Cluster UUID = " << id_.to_string(); } Cluster::~Cluster() { @@ -64,9 +73,9 @@ void Cluster::reset() { blobs_.clear(); } -void Cluster::_registerRPC(Socket &s) { - //s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); }); - bind("getowner", member(&Cluster::getOwner)); +void Cluster::_registerRPC() { + bind("getowner", [this](const UUID &u, int ttl, const std::string &uri) { return getOwner_RPC(u,ttl,uri); }); + //bind("getowner", member(&Cluster::getOwner_RPC)); bind("nop", []() { return true; }); @@ -81,15 +90,17 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) { //p.setProtocol(this); peers_.push_back(p); - _registerRPC(*p); if (!incoming) { p->onConnect([this](Socket &s) { + UUID q; + int ttl = 10; + for (auto b : blobs_) { - auto o = s.call<string>("getowner", b.first); - if (o.size() > 0) { + auto o = std::get<0>(s.call<tuple<UUID,uint32_t>>("getowner", q, ttl, b.first)); + if (o != id() && o != UUID(0)) { b.second->owner_ = o; - LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o; + LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o.to_string(); } } }); @@ -98,6 +109,7 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) { shared_ptr<Socket> Cluster::addPeer(const char *url) { auto sock = ftl::net::connect(url); + sock->setProtocol(this); addPeer(sock); return sock; } @@ -119,22 +131,23 @@ Blob *Cluster::_lookup(const char *uri) { return b; } -tuple<string,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) { - if (requests_.count(u) > 0) return {"",0}; +tuple<UUID,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) { + if (requests_.count(u) > 0) return {UUID(0),0}; requests_[u] = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); - std::cout << "GETOWNER" << std::endl; - if (blobs_.count(uri) != 0) return {blobs_[uri]->owner_,0}; + if (blobs_.count(uri) > 0) { + return {blobs_[uri]->owner_,0}; + } - vector<tuple<string,uint32_t>> results; + vector<tuple<UUID,uint32_t>> results; broadcastCall("getowner", results, u, ttl-1, uri); // TODO Verify all results are equal or empty - if (results.size() == 0) return {"",0}; + if (results.size() == 0) return {UUID(0),0}; return results[0]; } -std::string Cluster::getOwner(const std::string &uri) { +UUID Cluster::getOwner(const std::string &uri) { UUID u; int ttl = 10; @@ -148,26 +161,31 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; } - if (blobs_[u.getBaseURI()] != NULL) return NULL; + if (blobs_.count(u.getBaseURI()) > 0) { + LOG(WARNING) << "Mapping already exists for " << uri; + return blobs_[u.getBaseURI()]; + } Blob *b = new Blob; + b->cluster_ = this; b->data_ = addr; b->size_ = size; b->uri_ = std::string(uri); - b->owner_ = ""; - blobs_[u.getBaseURI()] = b; + b->owner_ = id(); // I am initial owner by default... - std::string o = getOwner(uri); - if (o.size() == 0) { + UUID o = getOwner(uri); + if (o == id() || o == UUID(0)) { // I am the owner! - std::cout << "I own " << uri << std::endl; - b->owner_ = "me"; + //b->owner_ = "me"; } else { - std::cout << "I do not own " << uri << std::endl; b->owner_ = o; } + LOG(INFO) << "Mapping address to " << uri; + + blobs_[u.getBaseURI()] = b; + //std::cout << owners << std::endl; return b; diff --git a/p2p-rm/test/p2p_integration.cpp b/p2p-rm/test/p2p_integration.cpp index 4b4a4e8fb861f815aeee3fdfdd114216e6c8941b..64fe8d1cb76823f0fa3a4270581fbaac3eca8ed8 100644 --- a/p2p-rm/test/p2p_integration.cpp +++ b/p2p-rm/test/p2p_integration.cpp @@ -14,8 +14,6 @@ SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) { l->setProtocol(&c1); Cluster c2("ftl://utu.fi", l); - auto s = c1.addPeer("tcp://localhost:9000"); - int data1 = 89; int data2 = 99; @@ -24,10 +22,12 @@ SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) { REQUIRE( m1.is_valid() ); REQUIRE( m2.is_valid() ); + auto s = c1.addPeer("tcp://localhost:9000"); + ftl::net::wait([&s]() { return s->isConnected(); }); - REQUIRE( m2.is_owner() ); - REQUIRE( !m1.is_owner() ); + REQUIRE( Cluster::is_owner(m2) ); + REQUIRE( !Cluster::is_owner(m1) ); l->close(); ftl::net::stop(); @@ -52,8 +52,8 @@ SCENARIO( "Post-connection ownership resolution", "[ownership]" ) { REQUIRE( m1.is_valid() ); REQUIRE( m2.is_valid() ); - REQUIRE( !m2.is_owner() ); - REQUIRE( m1.is_owner() ); + REQUIRE( !Cluster::is_owner(m2) ); + REQUIRE( Cluster::is_owner(m1) ); l->close(); ftl::net::stop(); @@ -77,13 +77,13 @@ SCENARIO( "Write change ownership", "[ownership]" ) { REQUIRE( m1.is_valid() ); REQUIRE( m2.is_valid() ); - REQUIRE( !m2.is_owner() ); - REQUIRE( m1.is_owner() ); + REQUIRE( Cluster::is_owner(m1) ); + REQUIRE( !Cluster::is_owner(m2) ); *m2 = 676; - REQUIRE( m2.is_owner() ); - REQUIRE( !m1.is_owner() ); + REQUIRE( Cluster::is_owner(m2) ); + REQUIRE( !Cluster::is_owner(m1) ); l->close(); ftl::net::stop();