From 26097e65fefbf60ca31fbf46e929061a7242d1b1 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Tue, 26 Feb 2019 13:37:45 +0200
Subject: [PATCH] Rework p2p to work with net ftlnet

---
 p2p-rm/CMakeLists.txt                         |  2 ++
 p2p-rm/include/ftl/p2p-rm/cluster.hpp         | 17 +++++----
 p2p-rm/include/ftl/p2p-rm/protocol.hpp        | 14 ++++----
 p2p-rm/src/blob.cpp                           |  2 +-
 p2p-rm/src/cluster.cpp                        | 21 +++++++++--
 p2p-rm/src/main.cpp                           |  0
 p2p-rm/test/CMakeLists.txt                    | 26 ++++++--------
 p2p-rm/test/{blob.cpp => blob_unit.cpp}       |  0
 p2p-rm/test/{p2p-rm.cpp => cluster_unit.cpp}  | 36 ++++++++++---------
 .../{mapped_ptr.cpp => mapped_ptr_unit.cpp}   |  7 ++--
 p2p-rm/test/p2p_integration.cpp               |  0
 p2p-rm/test/{peer.cpp => peer_cli.cpp}        |  0
 12 files changed, 70 insertions(+), 55 deletions(-)
 create mode 100644 p2p-rm/src/main.cpp
 rename p2p-rm/test/{blob.cpp => blob_unit.cpp} (100%)
 rename p2p-rm/test/{p2p-rm.cpp => cluster_unit.cpp} (66%)
 rename p2p-rm/test/{mapped_ptr.cpp => mapped_ptr_unit.cpp} (89%)
 create mode 100644 p2p-rm/test/p2p_integration.cpp
 rename p2p-rm/test/{peer.cpp => peer_cli.cpp} (100%)

diff --git a/p2p-rm/CMakeLists.txt b/p2p-rm/CMakeLists.txt
index a374b86f6..e0c3e6db5 100644
--- a/p2p-rm/CMakeLists.txt
+++ b/p2p-rm/CMakeLists.txt
@@ -4,6 +4,8 @@ include (CheckIncludeFile)
 include (CheckIncludeFileCXX)
 include (CheckFunctionExists)
 project (ftlp2prm)
+include(CTest)
+enable_testing()
 
 #find_package(PkgConfig)
 #pkg_check_modules(GTKMM gtkmm-3.0)
diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
index 313f29621..dfeb5ffcd 100644
--- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp
@@ -7,6 +7,7 @@
 
 #include <ftl/uri.hpp>
 #include <ftl/net/socket.hpp>
+#include <ftl/net/protocol.hpp>
 
 #include <type_traits>
 #include <memory>
@@ -25,9 +26,10 @@ namespace rm {
 
 class Blob;
 
-class Cluster {
+class Cluster : public ftl::net::Protocol {
 	public:
 	Cluster(const ftl::URI &uri, std::shared_ptr<ftl::net::Listener> l);
+	Cluster(const char *uri, std::shared_ptr<ftl::net::Listener> l);
 	~Cluster();
 	
 	void reset();
@@ -106,7 +108,7 @@ class Cluster {
 	 * Allow member functions to be used for RPC calls by binding with 'this'.
 	 */
 	template <typename R, typename C, typename ...Args>
-	auto bind(R(C::*f)(Args...)) {
+	auto member(R(C::*f)(Args...)) {
 	  return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); };
 	}
 	
@@ -122,20 +124,17 @@ class Cluster {
 	int broadcastCall(const std::string &name, std::vector<T> &results,
 			ARGS... args) {
 		int count = 0;
-		auto f = [&count,&results](msgpack::object &r) {
+		auto f = [&count,&results](const T &r) {
 			count--;
-			results.push_back(r.as<T>());
+			results.push_back(r);
 		};
 
 		for (auto p : peers_) {
 			count++;
-			p->async_call(name, f, std::forward<ARGS>(args)...);
+			p->asyncCall<T>(name, f, std::forward<ARGS>(args)...);
 		}
 		
-		// TODO Limit in case of no return.
-		while (count > 0) {
-			ftl::net::wait();
-		}
+		ftl::net::wait([&count]() { return count == 0; }, 5.0);
 		return count;
 	}
 	
diff --git a/p2p-rm/include/ftl/p2p-rm/protocol.hpp b/p2p-rm/include/ftl/p2p-rm/protocol.hpp
index fda11ae67..743e48dc0 100644
--- a/p2p-rm/include/ftl/p2p-rm/protocol.hpp
+++ b/p2p-rm/include/ftl/p2p-rm/protocol.hpp
@@ -4,13 +4,13 @@
 /* To get the service space for p2p */
 #include <ftl/net/protocol.hpp>
 
-#define P2P_SYNC				(FTL_PROTOCOL_P2P + 1)
-#define P2P_REQUESTOWNERSHIP	(FTL_PROTOCOL_P2P + 2)
-#define P2P_FINDOWNER			(FTL_PROTOCOL_P2P + 3)
-#define P2P_NOTIFYOWNERSHIP		(FTL_PROTOCOL_P2P + 4)
-#define P2P_URISEARCH			(FTL_PROTOCOL_P2P + 5)
-#define P2P_PEERSEARCH			(FTL_PROTOCOL_P2P + 6)
-#define P2P_RPC_CALL			(FTL_PROTOCOL_P2P + 7)
+#define P2P_SYNC				(FTL_PROTOCOL_FREE + 1)
+#define P2P_REQUESTOWNERSHIP	(FTL_PROTOCOL_FREE + 2)
+#define P2P_FINDOWNER			(FTL_PROTOCOL_FREE + 3)
+#define P2P_NOTIFYOWNERSHIP		(FTL_PROTOCOL_FREE + 4)
+#define P2P_URISEARCH			(FTL_PROTOCOL_FREE + 5)
+#define P2P_PEERSEARCH			(FTL_PROTOCOL_FREE + 6)
+#define P2P_RPC_CALL			(FTL_PROTOCOL_FREE + 7)
 
 #endif // _FTL_P2P_RM_PROTOCOL_HPP_
 
diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp
index db75e8f86..e4b3b60b1 100644
--- a/p2p-rm/src/blob.cpp
+++ b/p2p-rm/src/blob.cpp
@@ -24,7 +24,7 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
 	
 		for (auto s : blob.sockets_) {
 			// Send over network
-			s->send2(P2P_SYNC, std::string((const char*)&header,sizeof(header)),
+			s->send(P2P_SYNC, std::string((const char*)&header,sizeof(header)),
 				std::string(&blob.data_[offset],size));
 		}
 	}
diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp
index 31bda1b78..44063b861 100644
--- a/p2p-rm/src/cluster.cpp
+++ b/p2p-rm/src/cluster.cpp
@@ -22,7 +22,7 @@ using ftl::URI;
 using ftl::rm::Blob;
 using ftl::net::Socket;
 
-Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) {
+Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) {
 	//auto me = this;
 	root_ = uri.getHost();
 
@@ -33,6 +33,21 @@ Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) {
 	}
 }
 
+Cluster::Cluster(const char *uri, shared_ptr<Listener> l) : Protocol(uri), listener_(l) {
+	URI u(uri);
+	if (!u.isValid()) return;
+	if (u.getScheme() != ftl::URI::SCHEME_FTL) return;
+	if (u.getPath().size() > 0) return;
+
+	root_ = u.getHost();
+
+	if (l != nullptr) {
+		l->onConnection([&](shared_ptr<Socket> &s) {
+			addPeer(s, true);		
+		});
+	}
+}
+
 Cluster::~Cluster() {
 	reset();
 }
@@ -46,12 +61,14 @@ void Cluster::reset() {
 
 void Cluster::_registerRPC(Socket &s) {
 	//s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); });
-	s.bind("getowner", bind(&Cluster::getOwner));
+	bind("getowner", member(&Cluster::getOwner));
 }
 
 void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
 	LOG(INFO) << ((incoming) ? "Incoming peer added: " : "Peer added: ") << p->getURI();
 
+	//p.setProtocol(this);
+
 	peers_.push_back(p);
 	_registerRPC(*p);
 	
diff --git a/p2p-rm/src/main.cpp b/p2p-rm/src/main.cpp
new file mode 100644
index 000000000..e69de29bb
diff --git a/p2p-rm/test/CMakeLists.txt b/p2p-rm/test/CMakeLists.txt
index 46cffacba..9cb7a8049 100644
--- a/p2p-rm/test/CMakeLists.txt
+++ b/p2p-rm/test/CMakeLists.txt
@@ -1,34 +1,28 @@
-include(CTest)
-enable_testing()
-
-add_executable(mapped_ptr EXCLUDE_FROM_ALL
+add_executable(mapped_ptr_unit EXCLUDE_FROM_ALL
 	./tests.cpp
-	./mapped_ptr.cpp
+	./mapped_ptr_unit.cpp
 	../src/blob.cpp
 )
-target_link_libraries(mapped_ptr gflags glog)
+target_link_libraries(mapped_ptr_unit gflags glog)
 
-add_executable(p2p_rm EXCLUDE_FROM_ALL
+add_executable(cluster_unit EXCLUDE_FROM_ALL
 	./tests.cpp
-	../src/p2prm.cpp
 	../src/cluster.cpp
-	../src/blob.cpp
-	
-	./p2p-rm.cpp
+	./cluster_unit.cpp
 )
-target_link_libraries(p2p_rm uriparser ftlnet gflags glog)
+target_link_libraries(cluster_unit uriparser ftlnet gflags glog)
 
 add_executable(peer_test EXCLUDE_FROM_ALL
-	./peer.cpp
+	./peer_cli.cpp
 	../src/p2prm.cpp
 	../src/cluster.cpp
 	../src/blob.cpp
 )
 target_link_libraries(peer_test uriparser ftlnet gflags glog)
 
-add_test(Mapped_ptr mapped_ptr)
-add_test(RM_API p2p_rm)
+add_test(Mapped_ptrUnitTest mapped_ptr_unit)
+add_test(ClusterUnitTest cluster_unit)
 
 add_custom_target(tests)
-add_dependencies(tests mapped_ptr p2p_rm peer_test)
+add_dependencies(tests mapped_ptr_unit cluster_unit peer_test)
 
diff --git a/p2p-rm/test/blob.cpp b/p2p-rm/test/blob_unit.cpp
similarity index 100%
rename from p2p-rm/test/blob.cpp
rename to p2p-rm/test/blob_unit.cpp
diff --git a/p2p-rm/test/p2p-rm.cpp b/p2p-rm/test/cluster_unit.cpp
similarity index 66%
rename from p2p-rm/test/p2p-rm.cpp
rename to p2p-rm/test/cluster_unit.cpp
index 0c7a21c50..02edfd52f 100644
--- a/p2p-rm/test/p2p-rm.cpp
+++ b/p2p-rm/test/cluster_unit.cpp
@@ -1,10 +1,12 @@
 #include "catch.hpp"
-#include <ftl/p2p-rm.hpp>
+#include <ftl/p2p-rm/cluster.hpp>
 #include <ftl/net/socket.hpp>
 #include <ftl/net.hpp>
 #include <vector>
 #include <iostream>
 
+using ftl::rm::Cluster;
+
 // --- Mock Socket Send
 
 /*static std::vector<uint32_t> msgs;
@@ -43,78 +45,78 @@ std::shared_ptr<ftl::net::Socket> ftl::net::connect(const char *url) {
 // --- End Mock Socket Send
 
 SCENARIO( "Cluster::map()", "[map]" ) {
-	auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr);
+	Cluster cluster("ftl://utu.fi", nullptr);
 	
 	GIVEN( "a valid URI and array datatype" ) {
 		int data[10];
-		auto m = cluster->map<int[10]>("ftl://utu.fi/memory/test0", &data);
+		auto m = cluster.map<int[10]>("ftl://utu.fi/memory/test0", &data);
 		REQUIRE( m.is_valid() );
 
-		auto r = cluster->get<int[10]>("ftl://utu.fi/memory/test0");
+		auto r = cluster.get<int[10]>("ftl://utu.fi/memory/test0");
 		REQUIRE( r.is_valid() );
 		REQUIRE( r.size() == 10*sizeof(int) );
 		REQUIRE( r.is_local() );
 	}
 	
 	GIVEN( "a valid URI and invalid data" ) {
-		auto m = cluster->map<int>("ftl://utu.fi/memory/test0", NULL);
+		auto m = cluster.map<int>("ftl://utu.fi/memory/test0", NULL);
 		REQUIRE( !m.is_valid() );
 	}
 	
 	GIVEN( "an empty URI" ) {
 		int data;
-		auto m = cluster->map<int>("", &data);
+		auto m = cluster.map<int>("", &data);
 		REQUIRE( !m.is_valid() );
 	}
 	
 	GIVEN( "an invalid URI" ) {
 		int data;
-		auto m = cluster->map<int>("noschema/test", &data);
+		auto m = cluster.map<int>("noschema/test", &data);
 		REQUIRE( !m.is_valid() );
 	}
 	
 	GIVEN( "an invalid URI schema" ) {
 		int data;
-		auto m = cluster->map<int>("http://utu.fi/memory/test0", &data);
+		auto m = cluster.map<int>("http://utu.fi/memory/test0", &data);
 		REQUIRE( !m.is_valid() );
 	}
 	
 	GIVEN( "an invalid URI host" ) {
 		int data;
-		auto m = cluster->map<int>("ftl://yle.fi/wrong/test0", &data);
+		auto m = cluster.map<int>("ftl://yle.fi/wrong/test0", &data);
 		REQUIRE( !m.is_valid() );
 	}
 	
 	GIVEN( "a duplicate URI" ) {
 		int data;
-		auto a = cluster->map<int>("ftl://utu.fi/memory/test0", &data);
-		auto b = cluster->map<int>("ftl://utu.fi/memory/test0", &data);
+		auto a = cluster.map<int>("ftl://utu.fi/memory/test0", &data);
+		auto b = cluster.map<int>("ftl://utu.fi/memory/test0", &data);
 		REQUIRE( !b.is_valid() );
 		REQUIRE( a.is_valid() );
 	}
 }
 
 SCENARIO( "Getting a read_ref", "[get]" ) {
-	auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr);
+	Cluster cluster("ftl://utu.fi", nullptr);
 	// Add fake peer
 	auto p = std::make_shared<ftl::net::Socket>(0);
-	cluster->addPeer(p);
+	cluster.addPeer(p);
 	
 	int data = 89;
 	int data2 = 99;
-	auto m = cluster->map<int>("ftl://utu.fi/memory/test1", &data);
-	cluster->map<int>("ftl://utu.fi/memory/remote0", &data2);
+	auto m = cluster.map<int>("ftl://utu.fi/memory/test1", &data);
+	cluster.map<int>("ftl://utu.fi/memory/remote0", &data2);
 	REQUIRE( m.is_valid() );
 	
 	GIVEN( "a valid URI to local memory" ) {
-		const auto r = cluster->getReadable<int>("ftl://utu.fi/memory/test1");
+		const auto r = cluster.getReadable<int>("ftl://utu.fi/memory/test1");
 		REQUIRE( r.is_valid() );
 		REQUIRE( r.pointer().is_local() );
 		REQUIRE( r == 89 );
 	}
 	
 	GIVEN( "a valid URI to remote memory" ) {
-		const auto r = cluster->getReadable<int>("ftl://utu.fi/memory/remote0");
+		const auto r = cluster.getReadable<int>("ftl://utu.fi/memory/remote0");
 		REQUIRE( r.is_valid() );
 		//REQUIRE( !r.pointer().is_local() );
 		REQUIRE( r == 888 );
diff --git a/p2p-rm/test/mapped_ptr.cpp b/p2p-rm/test/mapped_ptr_unit.cpp
similarity index 89%
rename from p2p-rm/test/mapped_ptr.cpp
rename to p2p-rm/test/mapped_ptr_unit.cpp
index dfea46c8a..e6b56328d 100644
--- a/p2p-rm/test/mapped_ptr.cpp
+++ b/p2p-rm/test/mapped_ptr_unit.cpp
@@ -5,7 +5,7 @@
 
 #include <iostream>
 
-// Mock the BLOB
+// --- Mocks -------------------------------------------------------------------
 
 static bool blob_sync = false;
 
@@ -13,11 +13,12 @@ void ftl::rm::Blob::sync(size_t offset, size_t size) {
 	blob_sync = true;
 }
 
-int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) {
-	std::cout << "SEND2 (" << service << ")" << std::endl;
+int ftl::net::Socket::_send() {
 	return 0;
 }
 
+// --- Tests -------------------------------------------------------------------
+
 SCENARIO( "Reading from a remote pointer", "[remote_ptr]" ) {
 	// Make a dummy blob
 	auto blob = new ftl::rm::Blob();
diff --git a/p2p-rm/test/p2p_integration.cpp b/p2p-rm/test/p2p_integration.cpp
new file mode 100644
index 000000000..e69de29bb
diff --git a/p2p-rm/test/peer.cpp b/p2p-rm/test/peer_cli.cpp
similarity index 100%
rename from p2p-rm/test/peer.cpp
rename to p2p-rm/test/peer_cli.cpp
-- 
GitLab