From 86d7dfc6e96a796ff420fef6d5863ae7d244849a Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Thu, 28 Feb 2019 12:51:36 +0200
Subject: [PATCH] P2P Protocol and unit tests

---
 p2p-rm/include/ftl/p2p-rm/p2p.hpp |  73 +++++++++++++
 p2p-rm/src/cluster.cpp            |   6 +-
 p2p-rm/test/CMakeLists.txt        |  11 +-
 p2p-rm/test/p2p_unit.cpp          | 168 ++++++++++++++++++++++++++++++
 4 files changed, 253 insertions(+), 5 deletions(-)
 create mode 100644 p2p-rm/include/ftl/p2p-rm/p2p.hpp
 create mode 100644 p2p-rm/test/p2p_unit.cpp

diff --git a/p2p-rm/include/ftl/p2p-rm/p2p.hpp b/p2p-rm/include/ftl/p2p-rm/p2p.hpp
new file mode 100644
index 000000000..badde7ca7
--- /dev/null
+++ b/p2p-rm/include/ftl/p2p-rm/p2p.hpp
@@ -0,0 +1,73 @@
+#ifndef _FTL_RM_P2P_HPP_
+#define _FTL_RM_P2P_HPP_
+
+#include <ftl/uuid.hpp>
+#include <optional>
+#include <string>
+#include <map>
+#include <chrono>
+#include <vector>
+#include <memory>
+#include <ftl/net/protocol.hpp>
+#include <iostream>
+
+namespace ftl {
+namespace net {
+
+class p2p : public ftl::net::Protocol {
+	public:
+	p2p(const char *uri) : Protocol(uri) {}
+	
+	void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); };
+	
+	template <typename R, typename C, typename... Args>
+	void bind_find_one(const std::string &name, std::optional<R>(C::*f)(Args...)) {
+		bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::optional<R> {
+			if (requests_.count(u) > 0) return {};
+			requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+			
+			auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...);
+			if (result) return result;
+			
+			// Otherwise we must search again
+			if (ttl == 0) return {};
+			
+			return _find_one<R>(name, u, ttl-1, args...);
+		});
+	}
+	
+	/*template <typename R, typename C, typename ...Args>
+	void bind_find_all(const std::string &name, R(C::*f)(Args...)) {
+	
+	}*/
+	
+	template <typename R, typename... Args>
+	std::optional<R> find_one(const std::string &name, Args... args) {
+		ftl::UUID req;
+		int ttl = 10;
+		return _find_one<R>(name, req, ttl, args...);
+	}
+	
+	template <typename R, typename... Args>
+	std::optional<R> _find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) {
+		for (auto p : peers_) {
+			auto res = p->call<std::optional<R>>(name, u, ttl, args...);
+			if (res) return res;
+		}
+		return {};
+	}
+	
+	/*R find_all(const std::string &name, Args... args) {
+	
+	}*/
+	
+	private:
+	std::unordered_map<ftl::UUID,long int> requests_;
+	std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
+};
+
+}; // namespace net
+}; // namespace ftl
+
+#endif // _FTL_RM_P2P_HPP_
+
diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp
index b7ee117bc..af370e880 100644
--- a/p2p-rm/src/cluster.cpp
+++ b/p2p-rm/src/cluster.cpp
@@ -74,8 +74,7 @@ void Cluster::reset() {
 }
 
 void Cluster::_registerRPC() {
-	bind("getowner", [this](const UUID &u, int ttl, const std::string &uri) { return getOwner_RPC(u,ttl,uri); });
-	//bind("getowner", member(&Cluster::getOwner_RPC));
+	bind("getowner", member(&Cluster::getOwner_RPC));
 	
 	bind("nop", []() { return true; });
 	
@@ -90,6 +89,7 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
 	//p.setProtocol(this);
 
 	peers_.push_back(p);
+	//p2p::addPeer(p);
 	
 	if (!incoming) {
 		p->onConnect([this](Socket &s) {
@@ -119,7 +119,7 @@ Blob *Cluster::_lookup(const char *uri) {
 	if (!u.isValid()) return NULL;
 	if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
 	//if (u.getPathSegment(0) != "memory") return NULL;
-	if (u.getHost() != root_) { LOG(ERROR) << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
+	if (u.getHost() != root_) { LOG(ERROR) << "Non matching URI base : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
 
 	auto b = blobs_[u.getBaseURI()];
 	std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl;
diff --git a/p2p-rm/test/CMakeLists.txt b/p2p-rm/test/CMakeLists.txt
index 659c52341..fd423726c 100644
--- a/p2p-rm/test/CMakeLists.txt
+++ b/p2p-rm/test/CMakeLists.txt
@@ -3,7 +3,7 @@ add_executable(mapped_ptr_unit
 	./mapped_ptr_unit.cpp
 	../src/blob.cpp
 )
-target_link_libraries(mapped_ptr_unit gflags glog)
+target_link_libraries(mapped_ptr_unit gflags glog uuid)
 
 add_executable(cluster_unit
 	./tests.cpp
@@ -12,6 +12,12 @@ add_executable(cluster_unit
 )
 target_link_libraries(cluster_unit uriparser ftlnet gflags glog uuid)
 
+add_executable(p2p_unit
+	./tests.cpp
+	./p2p_unit.cpp
+)
+target_link_libraries(p2p_unit ftlnet uriparser gflags glog uuid)
+
 add_executable(p2p_integration
 	./tests.cpp
 	../src/cluster.cpp
@@ -27,10 +33,11 @@ add_executable(peer_test
 	../src/cluster.cpp
 	../src/blob.cpp
 )
-target_link_libraries(peer_test uriparser ftlnet gflags glog)
+target_link_libraries(peer_test uriparser ftlnet gflags glog uuid)
 
 add_test(Mapped_ptrUnitTest mapped_ptr_unit)
 add_test(ClusterUnitTest cluster_unit)
+add_test(P2PUnitTest p2p_unit)
 
 add_custom_target(tests)
 add_dependencies(tests mapped_ptr_unit cluster_unit peer_test p2p_integration)
diff --git a/p2p-rm/test/p2p_unit.cpp b/p2p-rm/test/p2p_unit.cpp
new file mode 100644
index 000000000..62c0732c2
--- /dev/null
+++ b/p2p-rm/test/p2p_unit.cpp
@@ -0,0 +1,168 @@
+#include "catch.hpp"
+#include <ftl/net/dispatcher.hpp>
+#include <ftl/net/protocol.hpp>
+#include <ftl/net/socket.hpp>
+#include <memory>
+#include <iostream>
+
+#include <sys/select.h>
+
+using ftl::net::Dispatcher;
+using ftl::net::Protocol;
+using ftl::net::Socket;
+
+// --- Mock --------------------------------------------------------------------
+
+static std::string last_send;
+
+using ftl::net::Socket;
+
+class MockSocket : public Socket {
+	public:	
+	MockSocket() : Socket(0) {}
+	void mock_dispatchRPC(const std::string &d) { protocol()->dispatchRPC(*this,d); }
+	
+	void mock_data() { data(); }
+
+};
+
+extern int select(int nfds, fd_set *readfds, fd_set *writefds,
+                  fd_set *exceptfds, struct timeval *timeout) {
+    std::cout << "SELECT CALLED" << std::endl;
+	return 1;      
+}
+
+extern ssize_t recv(int sd, void *buf, size_t n, int f) {	
+	std::cout << "Recv called : " << last_send.size() << std::endl;
+	int l = last_send.size();
+	if (l == 0) return 0;
+	std::memcpy(buf, last_send.c_str(), l);
+	last_send = "";
+	return l;
+}
+
+extern ssize_t writev(int sd, const struct iovec *v, int cnt) {
+	size_t len = 0; //v[0].iov_len+v[1].iov_len;
+	char buf[1000];
+	char *bufp = &buf[0];
+	
+	for (auto i=0; i<cnt; i++) {
+		std::memcpy(bufp,v[i].iov_base,v[i].iov_len);
+		len += v[i].iov_len;
+		bufp += v[i].iov_len;
+	}
+	
+	last_send = std::string(&buf[0], len);
+	return len;
+}
+
+extern std::vector<std::shared_ptr<ftl::net::Socket>> sockets;
+
+// --- Support -----------------------------------------------------------------
+
+Dispatcher::response_t get_response() {
+	auto h = (ftl::net::Header*)last_send.data();
+	const char *data = last_send.data() + sizeof(ftl::net::Header);
+	auto unpacked = msgpack::unpack(data, h->size-4);
+	Dispatcher::response_t the_result;
+	unpacked.get().convert(the_result);
+	return the_result;
+}
+
+// --- Tests -------------------------------------------------------------------
+
+#include <ftl/p2p-rm/p2p.hpp>
+
+using ftl::net::p2p;
+
+SCENARIO("p2p::bind_find_one()", "[find_one]") {
+	class Mock_p2p : public p2p {
+		public:
+		Mock_p2p() : p2p("mock://") {
+			bind_find_one("test", &Mock_p2p::test);
+		}
+		
+		std::optional<int> test(int a) {
+			if (a == 2) return 44;
+			else return {};
+		}
+	};
+	
+	Mock_p2p p;
+	std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket());
+	s->setProtocol(&p);
+	p.addPeer(s);
+	
+	GIVEN("a query that expects a valid result") {
+		// Create a mock RPC message with expected result
+		ftl::UUID req;
+		int ttl = 10;
+		auto args_obj = std::make_tuple(req, ttl, 2);
+		auto call_obj = std::make_tuple(0,0,"test",args_obj);
+		std::stringstream buf;
+		msgpack::pack(buf, call_obj);
+		
+		s->mock_dispatchRPC(buf.str());
+		
+		// Make sure we get a response
+		auto [kind,id,err,res] = get_response();
+		REQUIRE( *(res.as<std::optional<int>>()) == 44 );
+		REQUIRE( kind == 1 );
+		REQUIRE( id == 0 );
+		REQUIRE( err.type == 0 );
+	}
+	
+	GIVEN("a query that expects no result") {
+		// Create a mock RPC message with expected result
+		ftl::UUID req;
+		int ttl = 10;
+		auto args_obj = std::make_tuple(req, ttl, 3);
+		auto call_obj = std::make_tuple(0,0,"test",args_obj);
+		std::stringstream buf;
+		msgpack::pack(buf, call_obj);
+		s->mock_dispatchRPC(buf.str());
+		
+		// Make sure we get a response
+		auto [kind,id,err,res] = get_response();
+		REQUIRE( !res.as<std::optional<int>>() );
+		REQUIRE( kind == 1 );
+		REQUIRE( id == 0 );
+		REQUIRE( err.type == 0 );
+	}
+	
+	ftl::net::stop();
+}
+
+SCENARIO("p2p::find_one()", "[find_one]") {
+	class Mock_p2p : public p2p {
+		public:
+		Mock_p2p() : p2p("mock://") {
+			bind_find_one("test", &Mock_p2p::test);
+		}
+		
+		std::optional<int> test(int a) {
+			if (a == 2) return 44;
+			else return {};
+		}
+	};
+	
+	Mock_p2p p;
+	std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket());
+	sockets.push_back(s);
+	s->setProtocol(&p);
+	p.addPeer(s);
+	
+	GIVEN("a query that expects a valid result") {
+		auto res = p.find_one<int>("test", 2);		
+		REQUIRE( res.has_value() );
+		REQUIRE( *res == 44 );
+	}
+	
+	GIVEN("a query that expects no result") {
+		auto res = p.find_one<int>("test", 3);		
+		REQUIRE( !res.has_value() );
+	}
+	
+	ftl::net::stop();
+}
+
-- 
GitLab