From 32795d0cb24042626cc16a43b53c7250f32e6705 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Thu, 11 Apr 2019 17:46:45 +0300
Subject: [PATCH] Add subscribe tests and get working. Fix for onConnect
 trigger in peer

---
 net/cpp/include/ftl/net/peer.hpp     | 17 ++++++++++++-----
 net/cpp/include/ftl/net/universe.hpp | 10 +++++++---
 net/cpp/src/peer.cpp                 | 15 +++++++++------
 net/cpp/src/universe.cpp             | 22 ++++++++++++++++++++--
 net/cpp/test/net_integration.cpp     | 28 ++++++++++++++++++++++++++++
 5 files changed, 76 insertions(+), 16 deletions(-)

diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp
index 0681cf693..cd0fe67d0 100644
--- a/net/cpp/include/ftl/net/peer.hpp
+++ b/net/cpp/include/ftl/net/peer.hpp
@@ -140,9 +140,9 @@ class Peer {
 	template <typename F>
 	void bind(const std::string &name, F func);
 
-	//void onError(std::function<void(Socket&, int err, const char *msg)> &f) {}
-	void onConnect(std::function<void()> &f);
-	void onDisconnect(std::function<void()> &f) {}
+	// void onError(std::function<void(Peer &, int err, const char *msg)> &f) {}
+	void onConnect(const std::function<void(Peer &)> &f);
+	void onDisconnect(std::function<void(Peer &)> &f) {}
 	
 	bool isWaiting() const { return is_waiting_; }
 	
@@ -180,6 +180,13 @@ class Peer {
 	
 	int _send();
 	
+	template<typename... ARGS>
+	void _trigger(const std::vector<std::function<void(Peer &, ARGS...)>> &hs, ARGS... args) {
+		for (auto h : hs) {
+			h(*this, args...);
+		}
+	}
+	
 	/*template <typename... ARGS>
 	int _send(const std::string &t, ARGS... args);
 	
@@ -215,9 +222,9 @@ class Peer {
 	ftl::UUID peerid_;
 	
 	ftl::net::Dispatcher *disp_;
-	std::vector<std::function<void()>> open_handlers_;
+	std::vector<std::function<void(Peer &)>> open_handlers_;
 	//std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_
-	std::vector<std::function<void()>> close_handlers_;
+	std::vector<std::function<void(Peer &)>> close_handlers_;
 	std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
 	
 	static int rpcid__;
diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp
index bfda37aee..11557f923 100644
--- a/net/cpp/include/ftl/net/universe.hpp
+++ b/net/cpp/include/ftl/net/universe.hpp
@@ -57,6 +57,8 @@ class Universe {
 	
 	Peer *getPeer(const ftl::UUID &pid) const;
 	
+	int numberOfSubscribers(const std::string &res) const;
+	
 	/**
 	 * Bind a function to an RPC or service call name. This will implicitely
 	 * be called by any peer making the request.
@@ -138,8 +140,7 @@ void Universe::bind(const std::string &name, F func) {
 template <typename F>
 bool Universe::subscribe(const std::string &res, F func) {
 	bind(res, func);
-	_subscribe(res);
-	return true;
+	return _subscribe(res);
 }
 
 template <typename... ARGS>
@@ -190,7 +191,10 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
 template <typename R, typename... ARGS>
 R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
 	Peer *p = getPeer(pid);
-	if (p == nullptr) throw -1;
+	if (p == nullptr) {
+		LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string();
+		throw -1;
+	}
 	return p->call<R>(name, args...);
 }
 
diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp
index f0dd6784d..a782bc67c 100644
--- a/net/cpp/src/peer.cpp
+++ b/net/cpp/src/peer.cpp
@@ -149,6 +149,8 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
 				status_ = kConnected;
 				version_ = version;
 				peerid_ = pid;
+				
+				_trigger(open_handlers_);
 			}
 		});
 	
@@ -216,6 +218,8 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
 				version_ = version;
 				peerid_ = pid;
 				send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
+				
+				_trigger(open_handlers_);
 			}
 		}); 
 	}
@@ -262,6 +266,8 @@ void Peer::close(bool retry) {
 		
 		//auto i = find(sockets.begin(),sockets.end(),this);
 		//sockets.erase(i);
+		
+		_trigger(close_handlers_);
 	}
 }
 
@@ -481,9 +487,9 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
 	_send();
 }
 
-void Peer::onConnect(std::function<void()> &f) {
+void Peer::onConnect(const std::function<void(Peer&)> &f) {
 	if (status_ == kConnected) {
-		f();
+		f(*this);
 	} else {
 		open_handlers_.push_back(f);
 	}
@@ -491,10 +497,7 @@ void Peer::onConnect(std::function<void()> &f) {
 
 void Peer::_connected() {
 	status_ = kConnected;
-	for (auto h : open_handlers_) {
-		h();
-	}
-	//connect_handlers_.clear();
+
 }
 
 int Peer::_send() {
diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp
index 13036d103..e8d075c50 100644
--- a/net/cpp/src/universe.cpp
+++ b/net/cpp/src/universe.cpp
@@ -72,6 +72,10 @@ bool Universe::connect(const string &addr) {
 	
 	_installBindings(p);
 	
+	p->onConnect([this](Peer &p) {
+		peer_ids_[p.id()] = &p;
+	});
+	
 	return p->status() == Peer::kConnecting;
 }
 
@@ -116,6 +120,8 @@ void Universe::_installBindings(Peer *p) {
 void Universe::_installBindings() {
 	bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
 		LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
+		subscribers_[uri].push_back(id);
+		return true;
 	});
 	
 	bind("__owner__", [this](const std::string &res) -> optional<UUID> {
@@ -138,15 +144,25 @@ optional<UUID> Universe::findOwner(const string &res) {
 
 bool Universe::createResource(const std::string &uri) {
 	owned_.insert(uri);
+	subscribers_[uri];
 	return true;
 }
 
+int Universe::numberOfSubscribers(const std::string &res) const {
+	auto s = subscribers_.find(res);
+	if (s != subscribers_.end()) {
+		return s->second.size();
+	} else {
+		return -1;
+	}
+}
+
 bool Universe::_subscribe(const std::string &res) {
 	// Need to find who owns the resource
 	optional<UUID> pid = findOwner(res);
 	
 	if (pid) {
-		return call<bool>(*pid, "__subscribe__", id_, res);
+		return call<bool>(*pid, "__subscribe__", ftl::net::this_peer, res);
 	} else {
 		// No resource found
 		LOG(WARNING) << "Subscribe to unknown resource: " << res;
@@ -199,8 +215,10 @@ void Universe::_run() {
 						if (csock != INVALID_SOCKET) {
 							auto p = new Peer(csock, &disp_);
 							peers_.push_back(p);
-							
 							_installBindings(p);
+							p->onConnect([this](Peer &p) {
+								peer_ids_[p.id()] = &p;
+							});
 						}
 					//}
 				}
diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp
index 925776833..19a585423 100644
--- a/net/cpp/test/net_integration.cpp
+++ b/net/cpp/test/net_integration.cpp
@@ -162,6 +162,34 @@ TEST_CASE("Universe::findOwner()", "") {
 		a.createResource("ftl://test");
 		REQUIRE( *(b.findOwner("ftl://test")) == ftl::net::this_peer );
 	}
+	
+	SECTION("three peers and one owner") {
+		Universe c;
+		c.connect("tcp://localhost:7077");
+		while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
+
+		b.createResource("ftl://test");
+		REQUIRE( *(a.findOwner("ftl://test")) == ftl::net::this_peer );
+	}
+}
+
+TEST_CASE("Universe::subscribe()", "") {
+	Universe a;
+	Universe b;
+	a.listen("tcp://localhost:7077");
+	b.connect("tcp://localhost:7077");
+	while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
+
+	SECTION("no resource exists") {
+		REQUIRE( !b.subscribe("ftl://test", []() {}) );
+	}
+
+	SECTION("one resource exists") {
+		a.createResource("ftl://test");
+		REQUIRE( b.subscribe("ftl://test", []() {}) );
+		sleep_for(milliseconds(50));
+		REQUIRE( a.numberOfSubscribers("ftl://test") == 1);
+	}
 }
 
 /*TEST_CASE("net::listen()", "[net]") {
-- 
GitLab