From e91ca7483a092e29763e20ef348adc4fd48f27f6 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Mon, 8 Apr 2019 19:39:36 +0300
Subject: [PATCH] Allow shared dispatcher amongst peers

---
 net/cpp/include/ftl/net/dispatcher.hpp |  5 ++
 net/cpp/include/ftl/net/peer.hpp       | 89 ++++----------------------
 net/cpp/include/ftl/net/protocol.hpp   |  4 --
 net/cpp/include/ftl/net/universe.hpp   | 50 +++++++++++++++
 net/cpp/src/dispatcher.cpp             | 11 ++++
 net/cpp/src/peer.cpp                   | 27 +++++++-
 net/cpp/src/universe.cpp               | 10 ++-
 7 files changed, 109 insertions(+), 87 deletions(-)

diff --git a/net/cpp/include/ftl/net/dispatcher.hpp b/net/cpp/include/ftl/net/dispatcher.hpp
index 67840a78b..465058352 100644
--- a/net/cpp/include/ftl/net/dispatcher.hpp
+++ b/net/cpp/include/ftl/net/dispatcher.hpp
@@ -12,6 +12,9 @@
 #include <tuple>
 #include <functional>
 #include <iostream>
+#include <vector>
+#include <string>
+#include <unordered_map>
 
 namespace ftl {
 
@@ -116,6 +119,8 @@ class Dispatcher {
 		}));
 	}
 	
+	std::vector<std::string> getBindings() const;
+	
 	using adaptor_type = std::function<std::unique_ptr<msgpack::object_handle>(
         msgpack::object const &)>;
 
diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp
index ec5562be3..0de0b4ea2 100644
--- a/net/cpp/include/ftl/net/peer.hpp
+++ b/net/cpp/include/ftl/net/peer.hpp
@@ -4,6 +4,7 @@
 #define GLOG_NO_ABBREVIATED_SEVERITIES
 #include <glog/logging.h>
 #include <ftl/net/protocol.hpp>
+#include <ftl/net/dispatcher.hpp>
 #include <ftl/uri.hpp>
 #include <ftl/uuid.hpp>
 
@@ -42,7 +43,7 @@ struct caller : virtual_caller {
 	std::function<void(const T&)> f_;
 };
 
-typedef std::tuple<const char*,size_t> array;
+//typedef std::tuple<const char*,size_t> array;
 /*struct compress{};
 struct encrypt{};
 struct decompress{};
@@ -61,8 +62,8 @@ class Peer {
 	};
 
 	public:
-	explicit Peer(const char *uri);
-	explicit Peer(int s);
+	explicit Peer(const char *uri, ftl::net::Dispatcher *d=nullptr);
+	explicit Peer(int s, ftl::net::Dispatcher *d=nullptr);
 	~Peer();
 	
 	/**
@@ -123,7 +124,9 @@ class Peer {
 	int send(const std::string &name, ARGS... args);
 	
 	/**
-	 * Bind a function to an RPC call name.
+	 * Bind a function to an RPC call name. Note: if an overriding dispatcher
+	 * is used then these bindings will propagate to all peers sharing that
+	 * dispatcher.
 	 */
 	template <typename F>
 	void bind(const std::string &name, F func);
@@ -162,7 +165,7 @@ class Peer {
 	
 	int _send();
 	
-	template <typename... ARGS>
+	/*template <typename... ARGS>
 	int _send(const std::string &t, ARGS... args);
 	
 	template <typename... ARGS>
@@ -176,13 +179,14 @@ class Peer {
 	
 	template <typename T, typename... ARGS,
 			ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)>
-	int _send(const T &t, ARGS... args);
+	int _send(const T &t, ARGS... args);*/
 
 	private: // Data
 	Status status_;
 	int sock_;
 	ftl::URI::scheme_t scheme_;
 	uint32_t version_;
+	bool destroy_disp_;
 	
 	// Receive buffers
 	msgpack::unpacker recv_buf_;
@@ -193,7 +197,7 @@ class Peer {
 	std::string uri_;
 	ftl::UUID peerid_;
 	
-	ftl::net::Dispatcher disp_;
+	ftl::net::Dispatcher *disp_;
 	std::vector<std::function<void()>> open_handlers_;
 	//std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_
 	std::vector<std::function<void()>> close_handlers_;
@@ -208,7 +212,6 @@ template <typename... ARGS>
 int Peer::send(const std::string &s, ARGS... args) {
 	// Leave a blank entry for websocket header
 	if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
-	//msgpack::pack(send_buf_, std::make_tuple(s, std::make_tuple(args...)));
 	auto args_obj = std::make_tuple(args...);
 	auto call_obj = std::make_tuple(0,s,args_obj);
 	msgpack::pack(send_buf_, call_obj);
@@ -217,79 +220,11 @@ int Peer::send(const std::string &s, ARGS... args) {
 
 template <typename F>
 void Peer::bind(const std::string &name, F func) {
-	disp_.bind(name, func,
+	disp_->bind(name, func,
 		typename ftl::internal::func_kind_info<F>::result_kind(),
 	    typename ftl::internal::func_kind_info<F>::args_kind());
 }
 
-/*template <typename T>
-int Socket::read(T *b, size_t count) {
-	static_assert(std::is_trivial<T>::value, "Can only read trivial types");
-	return read((char*)b, sizeof(T)*count);
-}
-
-template <typename T>
-int Socket::read(std::vector<T> &b, size_t count) {
-	count = (count == 0) ? size()/sizeof(T) : count; // TODO Round this!
-	if (b.size() != count) b.resize(count);
-	return read((char*)b.data(), sizeof(T)*count);
-}
-
-template <typename T>
-int Socket::read(T &b) {
-	if (std::is_array<T>::value) return read(&b,std::extent<T>::value);
-	else return read(&b);
-}
-
-template <typename T>
-Socket &Socket::operator>>(T &t) {
-	if (std::is_array<T>::value) read(&t,std::extent<T>::value);
-	else read(&t);
-	return *this;
-}*/
-
-/*template <typename... ARGS>
-int Peer::_send(const std::string &t, ARGS... args) {
-	//send_vec_.push_back({const_cast<char*>(t.data()),t.size()});
-	//header_w_->size += t.size();
-	msgpack::pack(send_buf_, t);
-	return _send(args...)+t.size();
-}
-
-template <typename... ARGS>
-int Peer::_send(const ftl::net::array &b, ARGS... args) {
-	//send_vec_.push_back({const_cast<char*>(std::get<0>(b)),std::get<1>(b)});
-	//header_w_->size += std::get<1>(b);
-	msgpack::pack(send_buf_, msgpack::type::raw_ref(std::get<0>(b), std::get<1>(b)));
-	return std::get<1>(b)+_send(args...);
-}
-
-template <typename T, typename... ARGS>
-int Peer::_send(const std::vector<T> &t, ARGS... args) {
-	//send_vec_.push_back({const_cast<char*>(t.data()),t.size()});
-	//header_w_->size += t.size();
-	msgpack::pack(send_buf_, t);
-	return _send(args...)+t.size();
-}
-
-template <typename... Types, typename... ARGS>
-int Peer::_send(const std::tuple<Types...> &t, ARGS... args) {
-	//send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)});
-	//header_w_->size += sizeof(t);
-	msgpack::pack(send_buf_, t);
-	return sizeof(t)+_send(args...);
-}
-
-template <typename T, typename... ARGS,
-		ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)>
-int Peer::_send(const T &t, ARGS... args) {
-	//send_vec_.push_back({const_cast<T*>(&t),sizeof(T)});
-	//header_w_->size += sizeof(T);
-	msgpack::pack(send_buf_, t);
-	return sizeof(T)+_send(args...);
-}*/
-
-//template <typename T, typename... ARGS>
 template <typename R, typename... ARGS>
 R Peer::call(const std::string &name, ARGS... args) {
 	bool hasreturned = false;
diff --git a/net/cpp/include/ftl/net/protocol.hpp b/net/cpp/include/ftl/net/protocol.hpp
index 0fc0f7083..b9effae82 100644
--- a/net/cpp/include/ftl/net/protocol.hpp
+++ b/net/cpp/include/ftl/net/protocol.hpp
@@ -2,13 +2,9 @@
 #define _FTL_NET_PROTOCOL_HPP_
 
 #include <ftl/uuid.hpp>
-#include <ftl/net/func_traits.hpp>
-#include <ftl/net/dispatcher.hpp>
 #include <ftl/config.h>
 #include <tuple>
 
-
-
 namespace ftl {
 namespace net {
 
diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp
index aae8a6f07..a60785654 100644
--- a/net/cpp/include/ftl/net/universe.hpp
+++ b/net/cpp/include/ftl/net/universe.hpp
@@ -3,6 +3,8 @@
 
 #include <ftl/net/peer.hpp>
 #include <ftl/net/listener.hpp>
+#include <ftl/net/dispatcher.hpp>
+#include <ftl/uuid.hpp>
 #include <vector>
 #include <string>
 #include <thread>
@@ -10,14 +12,59 @@
 namespace ftl {
 namespace net {
 
+/**
+ * Represents a group of network peers and their resources, managing the
+ * searching of and sharing of resources across peers. Each universe can
+ * listen on multiple ports/interfaces for connecting peers, and can connect
+ * to any number of peers. The creation of a Universe object also creates a
+ * new thread to manage the networking, therefore it is threadsafe but
+ * callbacks will execute in a different thread so must also be threadsafe in
+ * their actions.
+ */
 class Universe {
 	public:
+	/**
+	 * Constructor with a URI base. The base uri is used as a base to validate
+	 * resource identifiers. (it may be removed). This creates a new thread
+	 * to monitor network sockets.
+	 */
 	explicit Universe(const std::string &base);
+
+	/**
+	 * The destructor will terminate the network thread before completing.
+	 */
 	~Universe();
 	
+	/**
+	 * Open a new listening port on a given interfaces.
+	 *   eg. "tcp://localhost:9000"
+	 * @param addr URI giving protocol, interface and port
+	 */
 	bool listen(const std::string &addr);
+	
+	/**
+	 * Create a new peer connection.
+	 *   eg. "tcp://10.0.0.2:9000"
+	 * Supported protocols include tcp and ws.
+	 *
+	 * @param addr URI giving protocol, interface and port
+	 */
 	bool connect(const std::string &addr);
 	
+	/**
+	 * Bind a function to an RPC or service call name. This will implicitely
+	 * be called by any peer making the request.
+	 */
+	template <typename F>
+	void bind(const std::string &name, F func);
+	
+	/**
+	 * Send a non-blocking RPC call with no return value to all connected
+	 * peers.
+	 */
+	template <typename... ARGS>
+	void broadcast(const std::string &name, ARGS... args);
+	
 	private:
 	void _run();
 	int _setDescriptors();
@@ -33,9 +80,12 @@ class Universe {
 	fd_set sfdread_;
 	std::vector<ftl::net::Listener*> listeners_;
 	std::vector<ftl::net::Peer*> peers_;
+	ftl::UUID id_;
+	ftl::net::Dispatcher disp_;
 };
 
 };  // namespace net
 };  // namespace ftl
 
 #endif  // _FTL_NET_UNIVERSE_HPP_
+
diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp
index adc4758d0..394307e19 100644
--- a/net/cpp/src/dispatcher.cpp
+++ b/net/cpp/src/dispatcher.cpp
@@ -5,6 +5,9 @@
 #include <iostream>
 
 using ftl::net::Peer;
+using ftl::net::Dispatcher;
+using std::vector;
+using std::string;
 
 /*static std::string hexStr(const std::string &s)
 {
@@ -23,6 +26,14 @@ using ftl::net::Peer;
 //    dispatch(s, unpacked.get());
 //}
 
+vector<string> Dispatcher::getBindings() const {
+	vector<string> res;
+	for (auto x : funcs_) {
+		res.push_back(x.first);
+	}
+	return res;
+}
+
 void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) {
     switch (msg.via.array.size) {
     case 3:
diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp
index a6efd0692..b01ac81d9 100644
--- a/net/cpp/src/peer.cpp
+++ b/net/cpp/src/peer.cpp
@@ -36,6 +36,7 @@ using std::get;
 using ftl::net::Peer;
 using ftl::URI;
 using ftl::net::ws_connect;
+using ftl::net::Dispatcher;
 
 /*static std::string hexStr(const std::string &s)
 {
@@ -123,10 +124,18 @@ static int tcpConnect(URI &uri) {
 	return csocket;
 }
 
-Peer::Peer(int s) : sock_(s) {
+Peer::Peer(int s, Dispatcher *d) : sock_(s) {
 	status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
 	_updateURI();
 	
+	if (d != nullptr) {
+		disp_ = d;
+		destroy_disp_ = false;
+	} else {
+		disp_ = new Dispatcher();
+		destroy_disp_ = true;
+	}
+	
 	// Send the initiating handshake if valid
 	if (status_ == kConnecting) {
 		// Install return handshake handler.
@@ -146,11 +155,19 @@ Peer::Peer(int s) : sock_(s) {
 	}
 }
 
-Peer::Peer(const char *pUri) : uri_(pUri) {	
+Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {	
 	URI uri(pUri);
 	
 	status_ = kInvalid;
 	sock_ = INVALID_SOCKET;
+	
+	if (d != nullptr) {
+		disp_ = d;
+		destroy_disp_ = false;
+	} else {
+		disp_ = new Dispatcher();
+		destroy_disp_ = true;
+	}
 
 	scheme_ = uri.getProtocol();
 	if (uri.getProtocol() == URI::SCHEME_TCP) {
@@ -301,7 +318,7 @@ bool Peer::data() {
 				return false;
 			}
 		}
-		disp_.dispatch(*this, obj);
+		disp_->dispatch(*this, obj);
 	}
 	return false;
 }
@@ -498,5 +515,9 @@ int Peer::_send() {
 
 Peer::~Peer() {
 	close();
+	
+	if (destroy_disp_) {
+		delete disp_;
+	}
 }
 
diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp
index c7e52d652..2e4659a7e 100644
--- a/net/cpp/src/universe.cpp
+++ b/net/cpp/src/universe.cpp
@@ -36,7 +36,7 @@ bool Universe::listen(const string &addr) {
 }
 
 bool Universe::connect(const string &addr) {
-	auto p = new Peer(addr.c_str());
+	auto p = new Peer(addr.c_str(), &disp_);
 	if (!p) return false;
 	
 	if (p->status() != Peer::kInvalid) {
@@ -81,7 +81,11 @@ int Universe::_setDescriptors() {
 }
 
 void Universe::_installBindings(Peer *p) {
-
+	p->bind("__subscribe__", [this](const string &uri) {
+		// Add this peer to subscription list for uri resource
+	});
+	
+	
 }
 
 void Universe::__start(Universe * u) {
@@ -127,7 +131,7 @@ void Universe::_run() {
 						int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
 
 						if (csock != INVALID_SOCKET) {
-							auto p = new Peer(csock);
+							auto p = new Peer(csock, &disp_);
 							peers_.push_back(p);
 							
 							_installBindings(p);
-- 
GitLab