Skip to content
Snippets Groups Projects
Commit d1289122 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Cluster ownership negotiation correct on mapping but not for change on write

parent 25275150
No related branches found
No related tags found
No related merge requests found
......@@ -4,12 +4,15 @@
#include <mutex>
#include <shared_mutex>
#include <ftl/net.hpp>
#include <ftl/uuid.hpp>
#include <string>
#include <vector>
namespace ftl {
namespace rm {
class Cluster;
/* NOT TO BE USED DIRECTLY */
struct Blob {
//Blob();
......@@ -19,10 +22,12 @@ struct Blob {
char *data_;
size_t size_;
std::string uri_;
std::string owner_;
ftl::UUID owner_;
uint32_t blobid_;
Cluster *cluster_;
void finished();
void becomeOwner();
//void write(size_t offset, const char *data, size_t size);
//void read(size_t offset, char *data, size_t size);
void sync(size_t offset, size_t size);
......
......@@ -35,6 +35,12 @@ class Cluster : public ftl::net::Protocol {
void reset();
inline void destroy() { reset(); }
const UUID &id() const { return id_; }
template <typename T>
static bool is_owner(const ftl::mapped_ptr<T> &p) {
return (p.blob) ? p.blob->cluster_->id() == p.blob->owner_ : false;
}
/**
* Obtain a remote pointer from a URI. A nullptr is returned if the URI is
......@@ -113,7 +119,7 @@ class Cluster : public ftl::net::Protocol {
return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); };
}
std::string getOwner(const std::string &uri);
ftl::UUID getOwner(const std::string &uri);
/**
* Make an RPC call to all connected peers and put into a results vector.
......@@ -126,6 +132,7 @@ class Cluster : public ftl::net::Protocol {
ARGS... args) {
int count = 0;
auto f = [&count,&results](const T &r) {
std::cout << "broadcast return" << std::endl;
count--;
results.push_back(r);
};
......@@ -140,6 +147,7 @@ class Cluster : public ftl::net::Protocol {
}
private:
UUID id_;
std::string root_;
std::shared_ptr<ftl::net::Listener> listener_;
std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
......@@ -152,10 +160,10 @@ class Cluster : public ftl::net::Protocol {
ftl::rm::Blob *_lookup(const char *uri);
Blob *_create(const char *uri, char *addr, size_t size, size_t count,
ftl::rm::flags_t flags, const std::string &tname);
void _registerRPC(ftl::net::Socket &s);
void _registerRPC();
private:
std::tuple<std::string,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri);
std::tuple<ftl::UUID,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri);
};
};
......
......@@ -23,8 +23,6 @@ namespace ftl {
T *get() { return blob->data_; }
size_t size() const { return blob->size_; }
bool is_owner() const { return false; }
write_ref<T> operator*();
write_ref<T> operator[](ptrdiff_t idx);
write_ref<T> writable() { return ftl::write_ref<T>(*this); }
......@@ -84,7 +82,10 @@ namespace ftl {
mapped_ptr<T> ptr_;
// Constructor
write_ref(mapped_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) {}
write_ref(mapped_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) {
// Ensure ownership
ptr.blob->becomeOwner();
}
~write_ref() { ptr_.blob->finished(); }
bool is_valid() const { return !ptr_.is_null(); }
......
......@@ -11,6 +11,7 @@ namespace ftl {
class UUID {
public:
UUID() { uuid_generate(uuid_); };
UUID(int u) { memset(uuid_,u,16); };
UUID(const UUID &u) { memcpy(uuid_,u.uuid_,16); }
bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; }
......@@ -19,6 +20,12 @@ namespace ftl {
std::string str() const { return std::string((char*)uuid_,16); };
const unsigned char *raw() const { return &uuid_[0]; }
std::string to_string() const {
char b[37];
uuid_unparse(uuid_, b);
return std::string(b);
}
MSGPACK_DEFINE(uuid_);
private:
......
......@@ -2,6 +2,7 @@
#include <ftl/p2p-rm/blob.hpp>
#include <ftl/net/socket.hpp>
#include <ftl/p2p-rm/protocol.hpp>
#include <ftl/p2p-rm/cluster.hpp>
#include <iostream>
......@@ -35,6 +36,12 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
}
}
void ftl::rm::Blob::becomeOwner() {
if (cluster_->id() == owner_) return;
std::cout << "NOT OWNED BUT WRITING" << std::endl;
}
void ftl::rm::Blob::finished() {
}
......
......@@ -30,12 +30,16 @@ using namespace std::chrono;
Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) {
//auto me = this;
root_ = uri.getHost();
_registerRPC();
if (l != nullptr) {
l->onConnection([&](shared_ptr<Socket> &s) {
addPeer(s, true);
});
}
LOG(INFO) << "Cluster UUID = " << id_.to_string();
}
Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), listener_(l) {
......@@ -45,12 +49,17 @@ Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), liste
if (u.getPath().size() > 0) return;
root_ = u.getHost();
_registerRPC();
if (l != nullptr) {
l->setProtocol(this);
l->onConnection([&](shared_ptr<Socket> &s) {
addPeer(s, true);
});
}
LOG(INFO) << "Cluster UUID = " << id_.to_string();
}
Cluster::~Cluster() {
......@@ -64,9 +73,9 @@ void Cluster::reset() {
blobs_.clear();
}
void Cluster::_registerRPC(Socket &s) {
//s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); });
bind("getowner", member(&Cluster::getOwner));
void Cluster::_registerRPC() {
bind("getowner", [this](const UUID &u, int ttl, const std::string &uri) { return getOwner_RPC(u,ttl,uri); });
//bind("getowner", member(&Cluster::getOwner_RPC));
bind("nop", []() { return true; });
......@@ -81,15 +90,17 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
//p.setProtocol(this);
peers_.push_back(p);
_registerRPC(*p);
if (!incoming) {
p->onConnect([this](Socket &s) {
UUID q;
int ttl = 10;
for (auto b : blobs_) {
auto o = s.call<string>("getowner", b.first);
if (o.size() > 0) {
auto o = std::get<0>(s.call<tuple<UUID,uint32_t>>("getowner", q, ttl, b.first));
if (o != id() && o != UUID(0)) {
b.second->owner_ = o;
LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o;
LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o.to_string();
}
}
});
......@@ -98,6 +109,7 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
shared_ptr<Socket> Cluster::addPeer(const char *url) {
auto sock = ftl::net::connect(url);
sock->setProtocol(this);
addPeer(sock);
return sock;
}
......@@ -119,22 +131,23 @@ Blob *Cluster::_lookup(const char *uri) {
return b;
}
tuple<string,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) {
if (requests_.count(u) > 0) return {"",0};
tuple<UUID,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) {
if (requests_.count(u) > 0) return {UUID(0),0};
requests_[u] = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
std::cout << "GETOWNER" << std::endl;
if (blobs_.count(uri) != 0) return {blobs_[uri]->owner_,0};
if (blobs_.count(uri) > 0) {
return {blobs_[uri]->owner_,0};
}
vector<tuple<string,uint32_t>> results;
vector<tuple<UUID,uint32_t>> results;
broadcastCall("getowner", results, u, ttl-1, uri);
// TODO Verify all results are equal or empty
if (results.size() == 0) return {"",0};
if (results.size() == 0) return {UUID(0),0};
return results[0];
}
std::string Cluster::getOwner(const std::string &uri) {
UUID Cluster::getOwner(const std::string &uri) {
UUID u;
int ttl = 10;
......@@ -148,26 +161,31 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
if (blobs_[u.getBaseURI()] != NULL) return NULL;
if (blobs_.count(u.getBaseURI()) > 0) {
LOG(WARNING) << "Mapping already exists for " << uri;
return blobs_[u.getBaseURI()];
}
Blob *b = new Blob;
b->cluster_ = this;
b->data_ = addr;
b->size_ = size;
b->uri_ = std::string(uri);
b->owner_ = "";
blobs_[u.getBaseURI()] = b;
b->owner_ = id(); // I am initial owner by default...
std::string o = getOwner(uri);
if (o.size() == 0) {
UUID o = getOwner(uri);
if (o == id() || o == UUID(0)) {
// I am the owner!
std::cout << "I own " << uri << std::endl;
b->owner_ = "me";
//b->owner_ = "me";
} else {
std::cout << "I do not own " << uri << std::endl;
b->owner_ = o;
}
LOG(INFO) << "Mapping address to " << uri;
blobs_[u.getBaseURI()] = b;
//std::cout << owners << std::endl;
return b;
......
......@@ -14,8 +14,6 @@ SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) {
l->setProtocol(&c1);
Cluster c2("ftl://utu.fi", l);
auto s = c1.addPeer("tcp://localhost:9000");
int data1 = 89;
int data2 = 99;
......@@ -24,10 +22,12 @@ SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) {
REQUIRE( m1.is_valid() );
REQUIRE( m2.is_valid() );
auto s = c1.addPeer("tcp://localhost:9000");
ftl::net::wait([&s]() { return s->isConnected(); });
REQUIRE( m2.is_owner() );
REQUIRE( !m1.is_owner() );
REQUIRE( Cluster::is_owner(m2) );
REQUIRE( !Cluster::is_owner(m1) );
l->close();
ftl::net::stop();
......@@ -52,8 +52,8 @@ SCENARIO( "Post-connection ownership resolution", "[ownership]" ) {
REQUIRE( m1.is_valid() );
REQUIRE( m2.is_valid() );
REQUIRE( !m2.is_owner() );
REQUIRE( m1.is_owner() );
REQUIRE( !Cluster::is_owner(m2) );
REQUIRE( Cluster::is_owner(m1) );
l->close();
ftl::net::stop();
......@@ -77,13 +77,13 @@ SCENARIO( "Write change ownership", "[ownership]" ) {
REQUIRE( m1.is_valid() );
REQUIRE( m2.is_valid() );
REQUIRE( !m2.is_owner() );
REQUIRE( m1.is_owner() );
REQUIRE( Cluster::is_owner(m1) );
REQUIRE( !Cluster::is_owner(m2) );
*m2 = 676;
REQUIRE( m2.is_owner() );
REQUIRE( !m1.is_owner() );
REQUIRE( Cluster::is_owner(m2) );
REQUIRE( !Cluster::is_owner(m1) );
l->close();
ftl::net::stop();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment