From 2e607bb60129c0371d5b2d9427299609884d4d5a Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Sat, 23 Feb 2019 20:06:40 +0200
Subject: [PATCH] Refactor to broadcastCall and allow member functions to be
 RPC functions

---
 p2p-rm/include/ftl/p2p-rm/blob.hpp    |  1 +
 p2p-rm/include/ftl/p2p-rm/cluster.hpp | 41 +++++++++++++--
 p2p-rm/src/blob.cpp                   |  7 ++-
 p2p-rm/src/cluster.cpp                | 73 +++++++++++++--------------
 p2p-rm/test/CMakeLists.txt            |  3 +-
 p2p-rm/test/mapped_ptr.cpp            |  3 ++
 p2p-rm/test/p2p-rm.cpp                | 13 ++---
 7 files changed, 90 insertions(+), 51 deletions(-)

diff --git a/p2p-rm/include/ftl/p2p-rm/blob.hpp b/p2p-rm/include/ftl/p2p-rm/blob.hpp
index 90ef69c4b..aa187047e 100644
--- a/p2p-rm/include/ftl/p2p-rm/blob.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/blob.hpp
@@ -19,6 +19,7 @@ struct Blob {
 	char *data_;
 	size_t size_;
 	std::string uri_;
+	std::string owner_;
 	uint32_t blobid_;
 	
 	void finished();
diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
index 9a95c0583..313f29621 100644
--- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
@@ -95,17 +95,51 @@ class Cluster {
 	/**
 	 * Connect to a new peer node using the specified socket.
 	 */
-	void addPeer(std::shared_ptr<ftl::net::Socket> &s);
+	void addPeer(std::shared_ptr<ftl::net::Socket> &s, bool incoming=false);
 
 	/**
 	 * Connect to a new peer using a URL string.
 	 */
 	void addPeer(const char *url);
 	
-	std::string getOwner(const char *uri);
+	/**
+	 * Allow member functions to be used for RPC calls by binding with 'this'.
+	 */
+	template <typename R, typename C, typename ...Args>
+	auto bind(R(C::*f)(Args...)) {
+	  return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); };
+	}
+	
+	std::string getOwner(const std::string &uri);
+	
+	/**
+	 * Make an RPC call to all connected peers and put into a results vector.
+	 * This function blocks until all peers have responded or an error /
+	 * timeout occurs. The return value indicates the number of failed peers, or
+	 * is 0 if all returned.
+	 */
+	template <typename T, typename... ARGS>
+	int broadcastCall(const std::string &name, std::vector<T> &results,
+			ARGS... args) {
+		int count = 0;
+		auto f = [&count,&results](msgpack::object &r) {
+			count--;
+			results.push_back(r.as<T>());
+		};
+
+		for (auto p : peers_) {
+			count++;
+			p->async_call(name, f, std::forward<ARGS>(args)...);
+		}
+		
+		// TODO Limit in case of no return.
+		while (count > 0) {
+			ftl::net::wait();
+		}
+		return count;
+	}
 	
 	private:
-	static int rpcid_;
 	std::string root_;
 	std::shared_ptr<ftl::net::Listener> listener_;
 	std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
@@ -115,6 +149,7 @@ class Cluster {
 	ftl::rm::Blob *_lookup(const char *uri);
 	Blob *_create(const char *uri, char *addr, size_t size, size_t count,
 		ftl::rm::flags_t flags, const std::string &tname);
+	void _registerRPC(ftl::net::Socket &s);
 };
 
 };
diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp
index 600cff69a..db75e8f86 100644
--- a/p2p-rm/src/blob.cpp
+++ b/p2p-rm/src/blob.cpp
@@ -2,6 +2,8 @@
 #include <ftl/net/socket.hpp>
 #include <ftl/p2p-rm/protocol.hpp>
 
+#include <iostream>
+
 struct SyncHeader {
 	uint32_t blobid;
 	uint32_t offset;
@@ -10,7 +12,10 @@ struct SyncHeader {
 
 void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
 	// Sanity check
-	if (offset + size > blob.size_) throw -1;
+	if (offset + size > blob.size_) {
+		LOG(ERROR) << "Memory overrun during sync";
+		return;
+	}
 
 	// TODO Delay send to collate many write operations?
 
diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp
index 5c8e4b982..31bda1b78 100644
--- a/p2p-rm/src/cluster.cpp
+++ b/p2p-rm/src/cluster.cpp
@@ -10,24 +10,25 @@
 #include <map>
 #include <string>
 #include <iostream>
+#include <vector>
 
 using ftl::rm::Cluster;
 using ftl::net::Listener;
 using std::map;
+using std::vector;
 using std::shared_ptr;
+using std::string;
 using ftl::URI;
 using ftl::rm::Blob;
 using ftl::net::Socket;
 
-int Cluster::rpcid_ = 0;
-
 Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) {
 	//auto me = this;
 	root_ = uri.getHost();
 
 	if (l != nullptr) {
 		l->onConnection([&](shared_ptr<Socket> &s) {
-			addPeer(s);		
+			addPeer(s, true);		
 		});
 	}
 }
@@ -43,21 +44,27 @@ void Cluster::reset() {
 	blobs_.clear();
 }
 
-void Cluster::addPeer(shared_ptr<Socket> &p) {
-	LOG(INFO) << "Peer added: " << p->getURI();
-	//auto me = this;
+void Cluster::_registerRPC(Socket &s) {
+	//s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); });
+	s.bind("getowner", bind(&Cluster::getOwner));
+}
+
+void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
+	LOG(INFO) << ((incoming) ? "Incoming peer added: " : "Peer added: ") << p->getURI();
+
 	peers_.push_back(p);
-	/*p->onMessage([&](int service, const std::string &data) {
-		std::cout << "MSG " << service << std::endl;
-	});*/
-	p->bind("getowner", [](const std::string &uri) -> std::string {
-		std::cout << "GETOWNER" << std::endl;
-		return "";
-	});
+	_registerRPC(*p);
 	
-	// TODO Check ownership of my blobs.
-	for (auto b : blobs_) {
-		getOwner(b.first.c_str());
+	if (!incoming) {
+		p->onConnect([this](Socket &s) {
+			for (auto b : blobs_) {
+				auto o = s.call<string>("getowner", b.first);
+				if (o.size() > 0) {
+					b.second->owner_ = o;
+					LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o;
+				}
+			}
+		});
 	}
 }
 
@@ -77,34 +84,23 @@ Blob *Cluster::_lookup(const char *uri) {
 	std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl;
 
 	if (!b) {
-		// Whoops, need to map it first!
+		LOG(WARNING) << "Unmapped memory requested: " << uri;
 	}
 
 	return b;
 }
 
-std::string Cluster::getOwner(const char *uri) {
-	std::string result;
-	int count = 0;
-
-	auto f = [&](msgpack::object &r) {
-		count--;
-		std::string res = r.as<std::string>();
-		if (res.size() > 0) result = res;
-	};
-
-	for (auto p : peers_) {
-		count++;
-		LOG(INFO) << "Request owner of " << uri << " from " << p->getURI();
-		p->async_call("getowner", f, std::string(uri));
-	}
+std::string Cluster::getOwner(const std::string &uri) {
+	vector<string> results;
 	
-	// TODO Limit in case of no return.
-	while (count > 0) {
-		ftl::net::wait();
-	}
+	std::cout << "GETOWNER" << std::endl;
+	if (blobs_.count(uri) != 0) return blobs_[uri]->owner_;
+
+	broadcastCall("getowner", results, uri);
 	
-	return result;
+	// TODO Verify all results are equal or empty
+	if (results.size() == 0) return "";
+	return results[0];
 }
 
 Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
@@ -121,14 +117,17 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
 	b->data_ = addr;
 	b->size_ = size;
 	b->uri_ = std::string(uri);
+	b->owner_ = "";
 	blobs_[u.getBaseURI()] = b;
 
 	std::string o = getOwner(uri);
 	if (o.size() == 0) {
 		// I am the owner!
 		std::cout << "I own " << uri << std::endl;
+		b->owner_ = "me";
 	} else {
 		std::cout << "I do not own " << uri << std::endl;
+		b->owner_ = o;
 	}
 	
 	//std::cout << owners << std::endl;
diff --git a/p2p-rm/test/CMakeLists.txt b/p2p-rm/test/CMakeLists.txt
index 2ff0402ee..46cffacba 100644
--- a/p2p-rm/test/CMakeLists.txt
+++ b/p2p-rm/test/CMakeLists.txt
@@ -6,6 +6,7 @@ add_executable(mapped_ptr EXCLUDE_FROM_ALL
 	./mapped_ptr.cpp
 	../src/blob.cpp
 )
+target_link_libraries(mapped_ptr gflags glog)
 
 add_executable(p2p_rm EXCLUDE_FROM_ALL
 	./tests.cpp
@@ -15,7 +16,7 @@ add_executable(p2p_rm EXCLUDE_FROM_ALL
 	
 	./p2p-rm.cpp
 )
-target_link_libraries(p2p_rm uriparser)
+target_link_libraries(p2p_rm uriparser ftlnet gflags glog)
 
 add_executable(peer_test EXCLUDE_FROM_ALL
 	./peer.cpp
diff --git a/p2p-rm/test/mapped_ptr.cpp b/p2p-rm/test/mapped_ptr.cpp
index 5dfa74e69..dfea46c8a 100644
--- a/p2p-rm/test/mapped_ptr.cpp
+++ b/p2p-rm/test/mapped_ptr.cpp
@@ -22,6 +22,7 @@ SCENARIO( "Reading from a remote pointer", "[remote_ptr]" ) {
 	// Make a dummy blob
 	auto blob = new ftl::rm::Blob();
 	blob->data_ = (char*)(new int[5]);
+	blob->size_ = 5*sizeof(int);
 	((int*)(blob->data_))[0] = 55;
 	((int*)(blob->data_))[1] = 66;
 	
@@ -37,6 +38,7 @@ SCENARIO( "Writing to a remote pointer", "[remote_ptr]" ) {
 	// Make a dummy blob
 	auto blob = new ftl::rm::Blob();
 	blob->data_ = (char*)(new int[5]);
+	blob->size_ = 5*sizeof(int);
 	((int*)(blob->data_))[0] = 55;
 	((int*)(blob->data_))[1] = 66;
 	
@@ -63,6 +65,7 @@ SCENARIO( "Writing to readonly pointer fails", "[remote_ptr]" ) {
 	// Make a dummy blob
 	auto blob = new ftl::rm::Blob();
 	blob->data_ = (char*)(new int[5]);
+	blob->size_ = 5*sizeof(int);
 	((int*)(blob->data_))[0] = 55;
 	((int*)(blob->data_))[1] = 66;
 	
diff --git a/p2p-rm/test/p2p-rm.cpp b/p2p-rm/test/p2p-rm.cpp
index bf7979985..0c7a21c50 100644
--- a/p2p-rm/test/p2p-rm.cpp
+++ b/p2p-rm/test/p2p-rm.cpp
@@ -7,7 +7,7 @@
 
 // --- Mock Socket Send
 
-static std::vector<uint32_t> msgs;
+/*static std::vector<uint32_t> msgs;
 
 int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) {
 	msgs.push_back(service);
@@ -29,12 +29,6 @@ int ftl::net::Socket::send(uint32_t service, const std::string &data) {
 	msgs.push_back(service);
 	std::cout << "SEND (" << service << ")" << std::endl;
 
-	/*if (service == P2P_RPC_CALL) {
-		// UNPACK
-		// PACK RETURN
-		message(P2P_RPC_RETURN, rdata);
-	}*/
-
 	return 0;
 }
 
@@ -44,7 +38,7 @@ bool ftl::net::wait() {
 
 std::shared_ptr<ftl::net::Socket> ftl::net::connect(const char *url) {
 	return nullptr;
-}
+}*/
 
 // --- End Mock Socket Send
 
@@ -103,7 +97,8 @@ SCENARIO( "Cluster::map()", "[map]" ) {
 SCENARIO( "Getting a read_ref", "[get]" ) {
 	auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr);
 	// Add fake peer
-	cluster->addPeer(std::shared_ptr<ftl::net::Socket>(new ftl::net::Socket(0)));
+	auto p = std::make_shared<ftl::net::Socket>(0);
+	cluster->addPeer(p);
 	
 	int data = 89;
 	int data2 = 99;
-- 
GitLab