From 6797a90e76e505923a605a453777166a3b2bd7b9 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Thu, 11 Apr 2019 13:13:10 +0300
Subject: [PATCH] Start adding resource subscription code

---
 net/cpp/include/ftl/net/peer.hpp     |  2 ++
 net/cpp/include/ftl/net/universe.hpp | 43 ++++++++++++++++++++++++++++
 net/cpp/include/ftl/uuid.hpp         |  3 ++
 net/cpp/src/peer.cpp                 | 12 +++++---
 net/cpp/src/universe.cpp             | 40 ++++++++++++++++++++++++--
 5 files changed, 93 insertions(+), 7 deletions(-)

diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp
index 0b9b81228..5355ceb47 100644
--- a/net/cpp/include/ftl/net/peer.hpp
+++ b/net/cpp/include/ftl/net/peer.hpp
@@ -34,6 +34,8 @@ extern int setDescriptors();
 namespace ftl {
 namespace net {
 
+extern ftl::UUID this_peer;
+
 class Universe;
 
 struct virtual_caller {
diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp
index 8f84287fe..20fdfa21f 100644
--- a/net/cpp/include/ftl/net/universe.hpp
+++ b/net/cpp/include/ftl/net/universe.hpp
@@ -54,6 +54,8 @@ class Universe {
 	
 	int numberOfPeers() const { return peers_.size(); }
 	
+	Peer *getPeer(const ftl::UUID &pid) const;
+	
 	/**
 	 * Bind a function to an RPC or service call name. This will implicitely
 	 * be called by any peer making the request.
@@ -76,6 +78,12 @@ class Universe {
 	template <typename... ARGS>
 	void broadcast(const std::string &name, ARGS... args);
 	
+	template <typename R, typename... ARGS>
+	R call(const UUID &pid, const std::string &name, ARGS... args);
+	
+	template <typename R, typename... ARGS>
+	std::optional<R> findOne(const std::string &name, ARGS... args);
+	
 	/**
 	 * Send a non-blocking RPC call with no return value to all subscribers
 	 * of a resource. There may be no subscribers.
@@ -88,7 +96,10 @@ class Universe {
 	private:
 	void _run();
 	int _setDescriptors();
+	void _installBindings();
 	void _installBindings(Peer *);
+	bool _subscribe(const std::string &res);
+	std::optional<ftl::UUID> _findOwner(const std::string &res);
 	
 	static void __start(Universe *u);
 	
@@ -100,6 +111,8 @@ class Universe {
 	fd_set sfdread_;
 	std::vector<ftl::net::Listener*> listeners_;
 	std::vector<ftl::net::Peer*> peers_;
+	std::map<std::string, std::vector<ftl::net::Peer*>> subscribers_;
+	std::map<ftl::UUID, ftl::net::Peer*> peer_ids_;
 	ftl::UUID id_;
 	ftl::net::Dispatcher disp_;
 	
@@ -115,6 +128,13 @@ void Universe::bind(const std::string &name, F func) {
 	    typename ftl::internal::func_kind_info<F>::args_kind());
 }
 
+template <typename F>
+bool Universe::subscribe(const std::string &res, F func) {
+	bind(res, func);
+	_subscribe(res);
+	return true;
+}
+
 template <typename... ARGS>
 void Universe::broadcast(const std::string &name, ARGS... args) {
 	for (auto p : peers_) {
@@ -122,6 +142,29 @@ void Universe::broadcast(const std::string &name, ARGS... args) {
 	}
 }
 
+template <typename R, typename... ARGS>
+std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
+	for (auto p : peers_) {
+		p->send(name, args...);
+	}
+	return {};
+}
+
+template <typename R, typename... ARGS>
+R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
+	Peer *p = getPeer(pid);
+	if (p == nullptr) throw -1;
+	return p->call<R>(name, args...);
+}
+
+template <typename... ARGS>
+void Universe::publish(const std::string &res, ARGS... args) {
+	auto subs = subscribers_[res];
+	for (auto p : subs) {
+		p->send(res, args...);
+	}
+}
+
 };  // namespace net
 };  // namespace ftl
 
diff --git a/net/cpp/include/ftl/uuid.hpp b/net/cpp/include/ftl/uuid.hpp
index 76fae7629..77b448bee 100644
--- a/net/cpp/include/ftl/uuid.hpp
+++ b/net/cpp/include/ftl/uuid.hpp
@@ -37,6 +37,9 @@ namespace ftl {
 		bool operator!=(const UUID &u) const {
 			return memcmp(&uuid_,&u.uuid_,16) != 0;
 		}
+		bool operator<(const UUID &u) const {
+			return strncmp((const char*)uuid_, (const char *)u.uuid_, 16) < 0;
+		}
 		
 		/**
 		 * Get a raw data string.
diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp
index bc969f91b..bb0eb10b0 100644
--- a/net/cpp/src/peer.cpp
+++ b/net/cpp/src/peer.cpp
@@ -51,6 +51,9 @@ using ftl::net::Dispatcher;
 
 int Peer::rpcid__ = 0;
 
+// Global peer UUID
+ftl::UUID ftl::net::this_peer;
+
 static ctpl::thread_pool pool(5);
 
 // TODO(nick) Move to tcp_internal.cpp
@@ -145,12 +148,13 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
 			} else {
 				status_ = kConnected;
 				version_ = version;
+				peerid_ = pid;
 			}
 		});
 	
-		ftl::UUID uuid;
+		//ftl::UUID uuid;
 
-		send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid); 
+		send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); 
 	}
 }
 
@@ -210,8 +214,8 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
 			} else {
 				status_ = kConnected;
 				version_ = version;
-				ftl::UUID uuid;
-				send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid);
+				peerid_ = pid;
+				send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
 			}
 		}); 
 	}
diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp
index 006131c4e..6624a0a9a 100644
--- a/net/cpp/src/universe.cpp
+++ b/net/cpp/src/universe.cpp
@@ -12,8 +12,12 @@ using ftl::net::Peer;
 using ftl::net::Listener;
 using ftl::net::Universe;
 using nlohmann::json;
+using ftl::UUID;
+using std::optional;
 
-Universe::Universe() : active_(true), thread_(Universe::__start, this) {}
+Universe::Universe() : active_(true), thread_(Universe::__start, this) {
+	_installBindings();
+}
 
 Universe::Universe(nlohmann::json &config) :
 		active_(true), config_(config), thread_(Universe::__start, this) {
@@ -30,6 +34,8 @@ Universe::Universe(nlohmann::json &config) :
 			connect(p);
 		}
 	}
+	
+	_installBindings();
 }
 
 Universe::~Universe() {
@@ -104,13 +110,41 @@ int Universe::_setDescriptors() {
 }
 
 void Universe::_installBindings(Peer *p) {
-	p->bind("__subscribe__", [this](const string &uri) {
-		// Add this peer to subscription list for uri resource
+
+}
+
+void Universe::_installBindings() {
+	bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
+		
 	});
 	
 	
 }
 
+Peer *Universe::getPeer(const UUID &id) const {
+	auto ix = peer_ids_.find(id);
+	if (ix == peer_ids_.end()) return nullptr;
+	else return ix->second;
+}
+
+optional<UUID> Universe::_findOwner(const string &res) {
+	// TODO(nick) cache this information
+	return findOne<UUID>("__owner__", res);
+}
+
+bool Universe::_subscribe(const std::string &res) {
+	// Need to find who owns the resource
+	optional<UUID> pid = _findOwner(res);
+	
+	if (pid) {
+		return call<bool>(*pid, "__subscribe__", id_, res);
+	} else {
+		// No resource found
+		LOG(WARNING) << "Subscribe to unknown resource: " << res;
+		return false;
+	}
+}
+
 void Universe::__start(Universe * u) {
 	u->_run();
 }
-- 
GitLab