From d128912211a873d4eebd21673754f12abf9fc436 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Wed, 27 Feb 2019 15:23:28 +0200
Subject: [PATCH] Cluster ownership negotiation correct on mapping but not for
 change on write

---
 p2p-rm/include/ftl/p2p-rm/blob.hpp       |  7 ++-
 p2p-rm/include/ftl/p2p-rm/cluster.hpp    | 14 ++++--
 p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp |  7 +--
 p2p-rm/include/ftl/uuid.hpp              |  7 +++
 p2p-rm/src/blob.cpp                      |  7 +++
 p2p-rm/src/cluster.cpp                   | 62 +++++++++++++++---------
 p2p-rm/test/p2p_integration.cpp          | 20 ++++----
 7 files changed, 85 insertions(+), 39 deletions(-)

diff --git a/p2p-rm/include/ftl/p2p-rm/blob.hpp b/p2p-rm/include/ftl/p2p-rm/blob.hpp
index aa187047e..e9355b0e2 100644
--- a/p2p-rm/include/ftl/p2p-rm/blob.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/blob.hpp
@@ -4,12 +4,15 @@
 #include <mutex>
 #include <shared_mutex>
 #include <ftl/net.hpp>
+#include <ftl/uuid.hpp>
 #include <string>
 #include <vector>
 
 namespace ftl {
 namespace rm {
 
+class Cluster;
+
 /* NOT TO BE USED DIRECTLY */
 struct Blob {
 	//Blob();
@@ -19,10 +22,12 @@ struct Blob {
 	char *data_;
 	size_t size_;
 	std::string uri_;
-	std::string owner_;
+	ftl::UUID owner_;
 	uint32_t blobid_;
+	Cluster *cluster_;
 	
 	void finished();
+	void becomeOwner();
 	//void write(size_t offset, const char *data, size_t size);
 	//void read(size_t offset, char *data, size_t size);
 	void sync(size_t offset, size_t size);
diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
index 4df425a67..e906d2a39 100644
--- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
@@ -35,6 +35,12 @@ class Cluster : public ftl::net::Protocol {
 	
 	void reset();
 	inline void destroy() { reset(); }
+	const UUID &id() const { return id_; }
+	
+	template <typename T>
+	static bool is_owner(const ftl::mapped_ptr<T> &p) {
+		return (p.blob) ? p.blob->cluster_->id() == p.blob->owner_ : false;
+	}
 	
 	/**
 	 * Obtain a remote pointer from a URI. A nullptr is returned if the URI is
@@ -113,7 +119,7 @@ class Cluster : public ftl::net::Protocol {
 	  return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); };
 	}
 	
-	std::string getOwner(const std::string &uri);
+	ftl::UUID getOwner(const std::string &uri);
 	
 	/**
 	 * Make an RPC call to all connected peers and put into a results vector.
@@ -126,6 +132,7 @@ class Cluster : public ftl::net::Protocol {
 			ARGS... args) {
 		int count = 0;
 		auto f = [&count,&results](const T &r) {
+			std::cout << "broadcast return" << std::endl;
 			count--;
 			results.push_back(r);
 		};
@@ -140,6 +147,7 @@ class Cluster : public ftl::net::Protocol {
 	}
 	
 	private:
+	UUID id_;
 	std::string root_;
 	std::shared_ptr<ftl::net::Listener> listener_;
 	std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
@@ -152,10 +160,10 @@ class Cluster : public ftl::net::Protocol {
 	ftl::rm::Blob *_lookup(const char *uri);
 	Blob *_create(const char *uri, char *addr, size_t size, size_t count,
 		ftl::rm::flags_t flags, const std::string &tname);
-	void _registerRPC(ftl::net::Socket &s);
+	void _registerRPC();
 	
 	private:
-	std::tuple<std::string,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri);
+	std::tuple<ftl::UUID,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri);
 };
 
 };
diff --git a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp
index 3aae5f0ad..7ae8d0d75 100644
--- a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp
@@ -23,8 +23,6 @@ namespace ftl {
 		T *get() { return blob->data_; }
 		size_t size() const { return blob->size_; }
 		
-		bool is_owner() const { return false; }
-		
 		write_ref<T> operator*();
 		write_ref<T> operator[](ptrdiff_t idx);
 		write_ref<T> writable() { return ftl::write_ref<T>(*this); }
@@ -84,7 +82,10 @@ namespace ftl {
 		mapped_ptr<T> ptr_;
 		
 		// Constructor
-		write_ref(mapped_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) {}
+		write_ref(mapped_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) {
+			// Ensure ownership
+			ptr.blob->becomeOwner();
+		}
 		~write_ref() { ptr_.blob->finished(); }
 		
 		bool is_valid() const { return !ptr_.is_null(); }
diff --git a/p2p-rm/include/ftl/uuid.hpp b/p2p-rm/include/ftl/uuid.hpp
index 674c274e2..8ec0931e6 100644
--- a/p2p-rm/include/ftl/uuid.hpp
+++ b/p2p-rm/include/ftl/uuid.hpp
@@ -11,6 +11,7 @@ namespace ftl {
 	class UUID {
 		public:
 		UUID() { uuid_generate(uuid_); };
+		UUID(int u) { memset(uuid_,u,16); };
 		UUID(const UUID &u) { memcpy(uuid_,u.uuid_,16); }
 		
 		bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; }
@@ -19,6 +20,12 @@ namespace ftl {
 		std::string str() const { return std::string((char*)uuid_,16); };
 		const unsigned char *raw() const { return &uuid_[0]; }
 		
+		std::string to_string() const {
+			char b[37];
+			uuid_unparse(uuid_, b);
+			return std::string(b);
+		}
+		
 		MSGPACK_DEFINE(uuid_);
 		
 		private:
diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp
index c322aeefc..cdcdbb7e7 100644
--- a/p2p-rm/src/blob.cpp
+++ b/p2p-rm/src/blob.cpp
@@ -2,6 +2,7 @@
 #include <ftl/p2p-rm/blob.hpp>
 #include <ftl/net/socket.hpp>
 #include <ftl/p2p-rm/protocol.hpp>
+#include <ftl/p2p-rm/cluster.hpp>
 
 #include <iostream>
 
@@ -35,6 +36,12 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
 	}
 }
 
+void ftl::rm::Blob::becomeOwner() {
+	if (cluster_->id() == owner_) return;
+	
+	std::cout << "NOT OWNED BUT WRITING" << std::endl;
+}
+
 void ftl::rm::Blob::finished() {
 
 }
diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp
index f309b2be9..b7ee117bc 100644
--- a/p2p-rm/src/cluster.cpp
+++ b/p2p-rm/src/cluster.cpp
@@ -30,12 +30,16 @@ using namespace std::chrono;
 Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) {
 	//auto me = this;
 	root_ = uri.getHost();
+	
+	_registerRPC();
 
 	if (l != nullptr) {
 		l->onConnection([&](shared_ptr<Socket> &s) {
 			addPeer(s, true);		
 		});
 	}
+	
+	LOG(INFO) << "Cluster UUID = " << id_.to_string();
 }
 
 Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), listener_(l) {
@@ -45,12 +49,17 @@ Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), liste
 	if (u.getPath().size() > 0) return;
 
 	root_ = u.getHost();
+	
+	_registerRPC();
 
 	if (l != nullptr) {
+		l->setProtocol(this);
 		l->onConnection([&](shared_ptr<Socket> &s) {
 			addPeer(s, true);		
 		});
 	}
+	
+	LOG(INFO) << "Cluster UUID = " << id_.to_string();
 }
 
 Cluster::~Cluster() {
@@ -64,9 +73,9 @@ void Cluster::reset() {
 	blobs_.clear();
 }
 
-void Cluster::_registerRPC(Socket &s) {
-	//s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); });
-	bind("getowner", member(&Cluster::getOwner));
+void Cluster::_registerRPC() {
+	bind("getowner", [this](const UUID &u, int ttl, const std::string &uri) { return getOwner_RPC(u,ttl,uri); });
+	//bind("getowner", member(&Cluster::getOwner_RPC));
 	
 	bind("nop", []() { return true; });
 	
@@ -81,15 +90,17 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
 	//p.setProtocol(this);
 
 	peers_.push_back(p);
-	_registerRPC(*p);
 	
 	if (!incoming) {
 		p->onConnect([this](Socket &s) {
+			UUID q;
+			int ttl = 10;
+			
 			for (auto b : blobs_) {
-				auto o = s.call<string>("getowner", b.first);
-				if (o.size() > 0) {
+				auto o = std::get<0>(s.call<tuple<UUID,uint32_t>>("getowner", q, ttl, b.first));
+				if (o != id() && o != UUID(0)) {
 					b.second->owner_ = o;
-					LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o;
+					LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o.to_string();
 				}
 			}
 		});
@@ -98,6 +109,7 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
 
 shared_ptr<Socket> Cluster::addPeer(const char *url) {
 	auto sock = ftl::net::connect(url);
+	sock->setProtocol(this);
 	addPeer(sock);
 	return sock;
 }
@@ -119,22 +131,23 @@ Blob *Cluster::_lookup(const char *uri) {
 	return b;
 }
 
-tuple<string,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) {
-	if (requests_.count(u) > 0) return {"",0};
+tuple<UUID,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) {
+	if (requests_.count(u) > 0) return {UUID(0),0};
 	requests_[u] = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
 	
-	std::cout << "GETOWNER" << std::endl;
-	if (blobs_.count(uri) != 0) return {blobs_[uri]->owner_,0};
+	if (blobs_.count(uri) > 0) {
+		return {blobs_[uri]->owner_,0};
+	}
 	
-	vector<tuple<string,uint32_t>> results;
+	vector<tuple<UUID,uint32_t>> results;
 	broadcastCall("getowner", results, u, ttl-1, uri);
 	
 	// TODO Verify all results are equal or empty
-	if (results.size() == 0) return {"",0};
+	if (results.size() == 0) return {UUID(0),0};
 	return results[0];
 }
 
-std::string Cluster::getOwner(const std::string &uri) {
+UUID Cluster::getOwner(const std::string &uri) {
 	UUID u;
 	int ttl = 10;
 
@@ -148,26 +161,31 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
 	if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
 	if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
 	
-	if (blobs_[u.getBaseURI()] != NULL) return NULL;
+	if (blobs_.count(u.getBaseURI()) > 0) {
+		LOG(WARNING) << "Mapping already exists for " << uri;
+		return blobs_[u.getBaseURI()];
+	}
 	
 	Blob *b = new Blob;
 
+	b->cluster_ = this;
 	b->data_ = addr;
 	b->size_ = size;
 	b->uri_ = std::string(uri);
-	b->owner_ = "";
-	blobs_[u.getBaseURI()] = b;
+	b->owner_ = id(); // I am initial owner by default...
 
-	std::string o = getOwner(uri);
-	if (o.size() == 0) {
+	UUID o = getOwner(uri);
+	if (o == id() || o == UUID(0)) {
 		// I am the owner!
-		std::cout << "I own " << uri << std::endl;
-		b->owner_ = "me";
+		//b->owner_ = "me";
 	} else {
-		std::cout << "I do not own " << uri << std::endl;
 		b->owner_ = o;
 	}
 	
+	LOG(INFO) << "Mapping address to " << uri;
+	
+	blobs_[u.getBaseURI()] = b;
+	
 	//std::cout << owners << std::endl;
 	
 	return b;
diff --git a/p2p-rm/test/p2p_integration.cpp b/p2p-rm/test/p2p_integration.cpp
index 4b4a4e8fb..64fe8d1cb 100644
--- a/p2p-rm/test/p2p_integration.cpp
+++ b/p2p-rm/test/p2p_integration.cpp
@@ -14,8 +14,6 @@ SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) {
 	l->setProtocol(&c1);
 	Cluster c2("ftl://utu.fi", l);
 	
-	auto s = c1.addPeer("tcp://localhost:9000");
-	
 	int data1 = 89;
 	int data2 = 99;
 	
@@ -24,10 +22,12 @@ SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) {
 	REQUIRE( m1.is_valid() );
 	REQUIRE( m2.is_valid() );
 	
+	auto s = c1.addPeer("tcp://localhost:9000");
+	
 	ftl::net::wait([&s]() { return s->isConnected(); });
 	
-	REQUIRE( m2.is_owner() );
-	REQUIRE( !m1.is_owner() );
+	REQUIRE( Cluster::is_owner(m2) );
+	REQUIRE( !Cluster::is_owner(m1) );
 	
 	l->close();
 	ftl::net::stop();
@@ -52,8 +52,8 @@ SCENARIO( "Post-connection ownership resolution", "[ownership]" ) {
 	REQUIRE( m1.is_valid() );
 	REQUIRE( m2.is_valid() );
 	
-	REQUIRE( !m2.is_owner() );
-	REQUIRE( m1.is_owner() );
+	REQUIRE( !Cluster::is_owner(m2) );
+	REQUIRE( Cluster::is_owner(m1) );
 	
 	l->close();
 	ftl::net::stop();
@@ -77,13 +77,13 @@ SCENARIO( "Write change ownership", "[ownership]" ) {
 	REQUIRE( m1.is_valid() );
 	REQUIRE( m2.is_valid() );
 	
-	REQUIRE( !m2.is_owner() );
-	REQUIRE( m1.is_owner() );
+	REQUIRE( Cluster::is_owner(m1) );
+	REQUIRE( !Cluster::is_owner(m2) );
 	
 	*m2 = 676;
 	
-	REQUIRE( m2.is_owner() );
-	REQUIRE( !m1.is_owner() );
+	REQUIRE( Cluster::is_owner(m2) );
+	REQUIRE( !Cluster::is_owner(m1) );
 	
 	l->close();
 	ftl::net::stop();
-- 
GitLab