From 14894bcbd8b97ca089aafce9c2761016de8c25cb Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Wed, 27 Feb 2019 11:14:38 +0200
Subject: [PATCH] Add initial ownership tests and send request uuid on owner
 search

---
 p2p-rm/include/ftl/p2p-rm/cluster.hpp    |   9 +-
 p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp |   2 +
 p2p-rm/include/ftl/p2p-rm/protocol.hpp   |  14 +++
 p2p-rm/include/ftl/uuid.hpp              |  38 ++++++++
 p2p-rm/src/blob.cpp                      |   7 +-
 p2p-rm/src/cluster.cpp                   |  35 ++++++--
 p2p-rm/test/CMakeLists.txt               |  19 ++--
 p2p-rm/test/p2p_integration.cpp          | 105 +++++++++++++++++++++++
 8 files changed, 215 insertions(+), 14 deletions(-)
 create mode 100644 p2p-rm/include/ftl/uuid.hpp

diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
index dfeb5ffcd..4df425a67 100644
--- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
@@ -6,6 +6,7 @@
 #include <ftl/p2p-rm/protocol.hpp>
 
 #include <ftl/uri.hpp>
+#include <ftl/uuid.hpp>
 #include <ftl/net/socket.hpp>
 #include <ftl/net/protocol.hpp>
 
@@ -102,7 +103,7 @@ class Cluster : public ftl::net::Protocol {
 	/**
 	 * Connect to a new peer using a URL string.
 	 */
-	void addPeer(const char *url);
+	std::shared_ptr<ftl::net::Socket> addPeer(const char *url);
 	
 	/**
 	 * Allow member functions to be used for RPC calls by binding with 'this'.
@@ -145,10 +146,16 @@ class Cluster : public ftl::net::Protocol {
 	std::map<std::string, ftl::rm::Blob*> blobs_;
 	std::map<int,std::vector<std::tuple<std::shared_ptr<ftl::net::Socket>,std::string>>> rpc_results_;
 
+	// Cache of seen requests.
+	std::unordered_map<ftl::UUID,long int> requests_;
+
 	ftl::rm::Blob *_lookup(const char *uri);
 	Blob *_create(const char *uri, char *addr, size_t size, size_t count,
 		ftl::rm::flags_t flags, const std::string &tname);
 	void _registerRPC(ftl::net::Socket &s);
+	
+	private:
+	std::tuple<std::string,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri);
 };
 
 };
diff --git a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp
index ef3419d66..3aae5f0ad 100644
--- a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp
@@ -23,6 +23,8 @@ namespace ftl {
 		T *get() { return blob->data_; }
 		size_t size() const { return blob->size_; }
 		
+		bool is_owner() const { return false; }
+		
 		write_ref<T> operator*();
 		write_ref<T> operator[](ptrdiff_t idx);
 		write_ref<T> writable() { return ftl::write_ref<T>(*this); }
diff --git a/p2p-rm/include/ftl/p2p-rm/protocol.hpp b/p2p-rm/include/ftl/p2p-rm/protocol.hpp
index 743e48dc0..392154f36 100644
--- a/p2p-rm/include/ftl/p2p-rm/protocol.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/protocol.hpp
@@ -12,5 +12,19 @@
 #define P2P_PEERSEARCH			(FTL_PROTOCOL_FREE + 6)
 #define P2P_RPC_CALL			(FTL_PROTOCOL_FREE + 7)
 
+namespace ftl {
+namespace rm {
+	struct P2PQuery {
+		char guid[16];
+		uint8_t ttl;
+	};
+	
+	struct MemOwner {
+		char peer[16];
+		uint64_t age;
+	};
+};
+};
+
 #endif // _FTL_P2P_RM_PROTOCOL_HPP_
 
diff --git a/p2p-rm/include/ftl/uuid.hpp b/p2p-rm/include/ftl/uuid.hpp
new file mode 100644
index 000000000..674c274e2
--- /dev/null
+++ b/p2p-rm/include/ftl/uuid.hpp
@@ -0,0 +1,38 @@
+#ifndef _FTL_UUID_HPP_
+#define _FTL_UUID_HPP_
+
+#include <uuid/uuid.h>
+#include <memory>
+#include <string>
+#include <functional>
+#include <msgpack.hpp>
+
+namespace ftl {
+	class UUID {
+		public:
+		UUID() { uuid_generate(uuid_); };
+		UUID(const UUID &u) { memcpy(uuid_,u.uuid_,16); }
+		
+		bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; }
+		bool operator!=(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) != 0; }
+		
+		std::string str() const { return std::string((char*)uuid_,16); };
+		const unsigned char *raw() const { return &uuid_[0]; }
+		
+		MSGPACK_DEFINE(uuid_);
+		
+		private:
+		unsigned char uuid_[16];
+	};
+};
+
+namespace std {
+	template <> struct hash<ftl::UUID> {
+		size_t operator()(const ftl::UUID & x) const {
+			return std::hash<std::string>{}(x.str());
+		}
+	};
+};
+
+#endif // _FTL_UUID_HPP_
+
diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp
index e4b3b60b1..c322aeefc 100644
--- a/p2p-rm/src/blob.cpp
+++ b/p2p-rm/src/blob.cpp
@@ -1,9 +1,12 @@
+#include <glog/logging.h>
 #include <ftl/p2p-rm/blob.hpp>
 #include <ftl/net/socket.hpp>
 #include <ftl/p2p-rm/protocol.hpp>
 
 #include <iostream>
 
+using ftl::net::array;
+
 struct SyncHeader {
 	uint32_t blobid;
 	uint32_t offset;
@@ -18,6 +21,8 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
 	}
 
 	// TODO Delay send to collate many write operations?
+	
+	LOG(INFO) << "Synchronise blob " << blob.blobid_;
 
 	if (blob.sockets_.size() > 0) {
 		SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)};
@@ -25,7 +30,7 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
 		for (auto s : blob.sockets_) {
 			// Send over network
 			s->send(P2P_SYNC, std::string((const char*)&header,sizeof(header)),
-				std::string(&blob.data_[offset],size));
+				array{&blob.data_[offset],size});
 		}
 	}
 }
diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp
index 44063b861..f309b2be9 100644
--- a/p2p-rm/src/cluster.cpp
+++ b/p2p-rm/src/cluster.cpp
@@ -1,3 +1,4 @@
+#include <glog/logging.h>
 #include "ftl/p2p-rm.hpp"
 #include "ftl/p2p-rm/blob.hpp"
 #include "ftl/p2p-rm/protocol.hpp"
@@ -11,16 +12,20 @@
 #include <string>
 #include <iostream>
 #include <vector>
+#include <chrono>
 
 using ftl::rm::Cluster;
 using ftl::net::Listener;
 using std::map;
 using std::vector;
+using std::tuple;
 using std::shared_ptr;
 using std::string;
 using ftl::URI;
 using ftl::rm::Blob;
 using ftl::net::Socket;
+using ftl::UUID;
+using namespace std::chrono;
 
 Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) {
 	//auto me = this;
@@ -62,6 +67,12 @@ void Cluster::reset() {
 void Cluster::_registerRPC(Socket &s) {
 	//s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); });
 	bind("getowner", member(&Cluster::getOwner));
+	
+	bind("nop", []() { return true; });
+	
+	bind(P2P_SYNC, [this](uint32_t msg, Socket &s) {
+		LOG(INFO) << "Receive blob sync";
+	});
 }
 
 void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
@@ -85,9 +96,10 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
 	}
 }
 
-void Cluster::addPeer(const char *url) {
+shared_ptr<Socket> Cluster::addPeer(const char *url) {
 	auto sock = ftl::net::connect(url);
 	addPeer(sock);
+	return sock;
 }
 
 Blob *Cluster::_lookup(const char *uri) {
@@ -107,19 +119,28 @@ Blob *Cluster::_lookup(const char *uri) {
 	return b;
 }
 
-std::string Cluster::getOwner(const std::string &uri) {
-	vector<string> results;
+tuple<string,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) {
+	if (requests_.count(u) > 0) return {"",0};
+	requests_[u] = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
 	
 	std::cout << "GETOWNER" << std::endl;
-	if (blobs_.count(uri) != 0) return blobs_[uri]->owner_;
-
-	broadcastCall("getowner", results, uri);
+	if (blobs_.count(uri) != 0) return {blobs_[uri]->owner_,0};
+	
+	vector<tuple<string,uint32_t>> results;
+	broadcastCall("getowner", results, u, ttl-1, uri);
 	
 	// TODO Verify all results are equal or empty
-	if (results.size() == 0) return "";
+	if (results.size() == 0) return {"",0};
 	return results[0];
 }
 
+std::string Cluster::getOwner(const std::string &uri) {
+	UUID u;
+	int ttl = 10;
+
+	return std::get<0>(getOwner_RPC(u, ttl, uri));
+}
+
 Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
 		ftl::rm::flags_t flags, const std::string &tname) {
 	URI u(uri);
diff --git a/p2p-rm/test/CMakeLists.txt b/p2p-rm/test/CMakeLists.txt
index 9cb7a8049..659c52341 100644
--- a/p2p-rm/test/CMakeLists.txt
+++ b/p2p-rm/test/CMakeLists.txt
@@ -1,18 +1,27 @@
-add_executable(mapped_ptr_unit EXCLUDE_FROM_ALL
+add_executable(mapped_ptr_unit
 	./tests.cpp
 	./mapped_ptr_unit.cpp
 	../src/blob.cpp
 )
 target_link_libraries(mapped_ptr_unit gflags glog)
 
-add_executable(cluster_unit EXCLUDE_FROM_ALL
+add_executable(cluster_unit
 	./tests.cpp
 	../src/cluster.cpp
 	./cluster_unit.cpp
 )
-target_link_libraries(cluster_unit uriparser ftlnet gflags glog)
+target_link_libraries(cluster_unit uriparser ftlnet gflags glog uuid)
 
-add_executable(peer_test EXCLUDE_FROM_ALL
+add_executable(p2p_integration
+	./tests.cpp
+	../src/cluster.cpp
+	../src/p2prm.cpp
+	../src/blob.cpp
+	./p2p_integration.cpp
+)
+target_link_libraries(p2p_integration uriparser ftlnet gflags glog uuid)
+
+add_executable(peer_test
 	./peer_cli.cpp
 	../src/p2prm.cpp
 	../src/cluster.cpp
@@ -24,5 +33,5 @@ add_test(Mapped_ptrUnitTest mapped_ptr_unit)
 add_test(ClusterUnitTest cluster_unit)
 
 add_custom_target(tests)
-add_dependencies(tests mapped_ptr_unit cluster_unit peer_test)
+add_dependencies(tests mapped_ptr_unit cluster_unit peer_test p2p_integration)
 
diff --git a/p2p-rm/test/p2p_integration.cpp b/p2p-rm/test/p2p_integration.cpp
index e69de29bb..4b4a4e8fb 100644
--- a/p2p-rm/test/p2p_integration.cpp
+++ b/p2p-rm/test/p2p_integration.cpp
@@ -0,0 +1,105 @@
+#include "catch.hpp"
+#include <ftl/p2p-rm/cluster.hpp>
+#include <ftl/net/socket.hpp>
+#include <ftl/net/listener.hpp>
+#include <ftl/net.hpp>
+#include <vector>
+#include <iostream>
+
+using ftl::rm::Cluster;
+
+SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) {
+	Cluster c1("ftl://utu.fi", nullptr);
+	auto l = ftl::net::listen("tcp://localhost:9000");
+	l->setProtocol(&c1);
+	Cluster c2("ftl://utu.fi", l);
+	
+	auto s = c1.addPeer("tcp://localhost:9000");
+	
+	int data1 = 89;
+	int data2 = 99;
+	
+	auto m1 = c1.map<int>("ftl://utu.fi/memory/r1", &data1);
+	auto m2 = c2.map<int>("ftl://utu.fi/memory/r1", &data2);
+	REQUIRE( m1.is_valid() );
+	REQUIRE( m2.is_valid() );
+	
+	ftl::net::wait([&s]() { return s->isConnected(); });
+	
+	REQUIRE( m2.is_owner() );
+	REQUIRE( !m1.is_owner() );
+	
+	l->close();
+	ftl::net::stop();
+}
+
+	
+SCENARIO( "Post-connection ownership resolution", "[ownership]" ) {
+	Cluster c1("ftl://utu.fi", nullptr);
+	auto l = ftl::net::listen("tcp://localhost:9000");
+	l->setProtocol(&c1);
+	Cluster c2("ftl://utu.fi", l);
+	
+	auto s = c1.addPeer("tcp://localhost:9000");
+	
+	int data1 = 89;
+	int data2 = 99;
+	
+	ftl::net::wait([&s]() { return s->isConnected(); });
+	
+	auto m1 = c1.map<int>("ftl://utu.fi/memory/r1", &data1);
+	auto m2 = c2.map<int>("ftl://utu.fi/memory/r1", &data2);
+	REQUIRE( m1.is_valid() );
+	REQUIRE( m2.is_valid() );
+	
+	REQUIRE( !m2.is_owner() );
+	REQUIRE( m1.is_owner() );
+	
+	l->close();
+	ftl::net::stop();
+}
+	
+SCENARIO( "Write change ownership", "[ownership]" ) {
+	Cluster c1("ftl://utu.fi", nullptr);
+	auto l = ftl::net::listen("tcp://localhost:9000");
+	l->setProtocol(&c1);
+	Cluster c2("ftl://utu.fi", l);
+	
+	auto s = c1.addPeer("tcp://localhost:9000");
+	
+	int data1 = 89;
+	int data2 = 99;
+	
+	ftl::net::wait([&s]() { return s->isConnected(); });
+	
+	auto m1 = c1.map<int>("ftl://utu.fi/memory/r1", &data1);
+	auto m2 = c2.map<int>("ftl://utu.fi/memory/r1", &data2);
+	REQUIRE( m1.is_valid() );
+	REQUIRE( m2.is_valid() );
+	
+	REQUIRE( !m2.is_owner() );
+	REQUIRE( m1.is_owner() );
+	
+	*m2 = 676;
+	
+	REQUIRE( m2.is_owner() );
+	REQUIRE( !m1.is_owner() );
+	
+	l->close();
+	ftl::net::stop();
+}
+
+	
+	/*
+	
+	REQUIRE( *m2 == 99 );
+	REQUIRE( *m1 == 89 );
+	
+	*m2 = 77;
+	ftl::net::wait();
+	REQUIRE( *m2 == 77 );
+	REQUIRE( *m1 == 77 );
+	
+	*m1 = 66;
+	REQUIRE( *m2 == 66 );
+	REQUIRE( *m1 == 66 );*/
-- 
GitLab