From ec560ae92b2d320890caccc8cf845439ac53e06f Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Wed, 24 Apr 2019 10:15:10 +0300
Subject: [PATCH] Update URI class and pub sub mechanism

---
 net/cpp/CMakeLists.txt               |   1 +
 net/cpp/include/ftl/net/universe.hpp |  41 ++++++++--
 net/cpp/include/ftl/uri.hpp          |  87 ++++++++-------------
 net/cpp/src/universe.cpp             |  12 ++-
 net/cpp/src/uri.cpp                  | 113 +++++++++++++++++++++++++++
 net/cpp/test/CMakeLists.txt          |   2 +
 net/cpp/test/uri_unit.cpp            |  30 +++++++
 7 files changed, 226 insertions(+), 60 deletions(-)
 create mode 100644 net/cpp/src/uri.cpp

diff --git a/net/cpp/CMakeLists.txt b/net/cpp/CMakeLists.txt
index d6c361ca7..d246cec8f 100644
--- a/net/cpp/CMakeLists.txt
+++ b/net/cpp/CMakeLists.txt
@@ -4,6 +4,7 @@ include_directories(${PROJECT_SOURCE_DIR}/net/cpp/include)
 
 
 add_library(ftlnet
+	src/uri.cpp
 	src/listener.cpp
 	src/peer.cpp
 	src/dispatcher.cpp
diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp
index a1cfce0c8..f87694407 100644
--- a/net/cpp/include/ftl/net/universe.hpp
+++ b/net/cpp/include/ftl/net/universe.hpp
@@ -58,6 +58,9 @@ class Universe {
 	Peer *getPeer(const ftl::UUID &pid) const;
 	
 	int numberOfSubscribers(const std::string &res) const;
+
+	bool hasSubscribers(const std::string &res) const;
+	bool hasSubscribers(const ftl::URI &res) const;
 	
 	/**
 	 * Bind a function to an RPC or service call name. This will implicitely
@@ -73,6 +76,14 @@ class Universe {
 	 */
 	template <typename F>
 	bool subscribe(const std::string &res, F func);
+
+	/**
+	 * Subscribe a function to a resource. The subscribed function is
+	 * triggered whenever that resource is published to. It is akin to
+	 * RPC broadcast (no return value) to a subgroup of peers.
+	 */
+	template <typename F>
+	bool subscribe(const ftl::URI &res, F func);
 	
 	/**
 	 * Send a non-blocking RPC call with no return value to all connected
@@ -89,10 +100,19 @@ class Universe {
 	
 	/**
 	 * Send a non-blocking RPC call with no return value to all subscribers
-	 * of a resource. There may be no subscribers.
+	 * of a resource. There may be no subscribers. Note that query parameter
+	 * order in the URI string is not important.
 	 */
 	template <typename... ARGS>
 	void publish(const std::string &res, ARGS... args);
+
+	/**
+	 * Send a non-blocking RPC call with no return value to all subscribers
+	 * of a resource. There may be no subscribers. This overload accepts a
+	 * URI object directly to enable more efficient modification of parameters.
+	 */
+	template <typename... ARGS>
+	void publish(const ftl::URI &res, ARGS... args);
 	
 	/**
 	 * Register your ownership of a new resource. This must be called before
@@ -139,8 +159,13 @@ void Universe::bind(const std::string &name, F func) {
 
 template <typename F>
 bool Universe::subscribe(const std::string &res, F func) {
-	bind(res, func);
-	return _subscribe(res);
+	return subscribe(ftl::URI(res), func);
+}
+
+template <typename F>
+bool Universe::subscribe(const ftl::URI &res, F func) {
+	bind(res.to_string(), func);
+	return _subscribe(res.to_string());
 }
 
 template <typename... ARGS>
@@ -200,11 +225,17 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
 
 template <typename... ARGS>
 void Universe::publish(const std::string &res, ARGS... args) {
-	auto subs = subscribers_[res];
+	ftl::URI uri(res);
+	publish(uri, args...);
+}
+
+template <typename... ARGS>
+void Universe::publish(const ftl::URI &res, ARGS... args) {
+	auto subs = subscribers_[res.getBaseURI()];
 	for (auto p : subs) {
 		auto peer = getPeer(p);
 		if (peer) {
-			peer->send(res, args...);
+			peer->send(res.getBaseURI(), args...);
 		}
 	}
 }
diff --git a/net/cpp/include/ftl/uri.hpp b/net/cpp/include/ftl/uri.hpp
index dbbb47dab..1931c8196 100644
--- a/net/cpp/include/ftl/uri.hpp
+++ b/net/cpp/include/ftl/uri.hpp
@@ -4,65 +4,20 @@
 #include <uriparser/Uri.h>
 #include <string>
 #include <vector>
+#include <map>
 
 namespace ftl {
 
 	typedef const char * uri_t;
 
+	/**
+	 * Universal Resource Identifier. Parse, modify, represent and generate URIs.
+	 */
 	class URI {
 		public:
-		explicit URI(uri_t puri) {
-			UriUriA uri;
-
-			#ifdef HAVE_URIPARSESINGLE
-			const char *errpos;
-			if (uriParseSingleUriA(&uri, puri, &errpos) != URI_SUCCESS) {
-			#else
-			UriParserStateA uris;
-			uris.uri = &uri;
-			if (uriParseUriA(&uris, puri) != URI_SUCCESS) {
-			#endif
-				m_valid = false;
-				m_host = "none";
-				m_port = -1;
-				m_proto = SCHEME_NONE;
-				m_path = "";
-			} else {
-				m_host = std::string(uri.hostText.first, uri.hostText.afterLast - uri.hostText.first);
-				
-				std::string prototext = std::string(uri.scheme.first, uri.scheme.afterLast - uri.scheme.first);
-				if (prototext == "tcp") m_proto = SCHEME_TCP;
-				else if (prototext == "udp") m_proto = SCHEME_UDP;
-				else if (prototext == "ftl") m_proto = SCHEME_FTL;
-				else if (prototext == "http") m_proto = SCHEME_HTTP;
-				else if (prototext == "ws") m_proto = SCHEME_WS;
-				else if (prototext == "ipc") m_proto = SCHEME_IPC;
-				else m_proto = SCHEME_OTHER;
-
-				std::string porttext = std::string(uri.portText.first, uri.portText.afterLast - uri.portText.first);
-				m_port = atoi(porttext.c_str());
-
-				for (auto h=uri.pathHead; h!=NULL; h=h->next) {
-					auto pstr = std::string(
-							h->text.first, h->text.afterLast - h->text.first);
-
-					m_path += "/";
-					m_path += pstr;
-					m_pathseg.push_back(pstr);
-				}
-
-				m_query = std::string(uri.query.first, uri.query.afterLast - uri.query.first);
-
-				uriFreeUriMembersA(&uri);
-
-				m_valid = m_proto != SCHEME_NONE && m_host.size() > 0;
-
-				if (m_valid) {
-					if (m_query.size() > 0) m_base = std::string(uri.scheme.first, uri.query.first - uri.scheme.first - 1);
-					else m_base = std::string(uri.scheme.first);
-				}
-			}
-		}
+		explicit URI(uri_t puri);
+		explicit URI(const std::string &puri);
+		explicit URI(const URI &c);
 
 		~URI() {};
 
@@ -84,10 +39,23 @@ namespace ftl {
 		scheme_t getProtocol() const { return m_proto; };
 		scheme_t getScheme() const { return m_proto; };
 		const std::string &getPath() const { return m_path; };
-		const std::string &getQuery() const { return m_query; };
+		std::string getQuery() const;
 		const std::string &getBaseURI() const { return m_base; };
 		const std::string &getPathSegment(int n) const { return m_pathseg[n]; };
 
+		void setAttribute(const std::string &key, const std::string &value);
+		void setAttribute(const std::string &key, int value);
+
+		template <typename T>
+		T getAttribute(const std::string &key) {
+			return T(m_qmap[key]);
+		}
+
+		std::string to_string() const;
+
+		private:
+		void _parse(uri_t puri);
+
 		private:
 		bool m_valid;
 		std::string m_host;
@@ -96,8 +64,19 @@ namespace ftl {
 		std::vector<std::string> m_pathseg;
 		int m_port;
 		scheme_t m_proto;
-		std::string m_query;
+		// std::string m_query;
+		std::map<std::string, std::string> m_qmap;
 	};
+
+	template <>
+	inline int URI::getAttribute<int>(const std::string &key) {
+		return std::stoi(m_qmap[key]);
+	}
+
+	template <>
+	inline std::string URI::getAttribute<std::string>(const std::string &key) {
+		return m_qmap[key];
+	}
 }
 
 #endif // _FTL_URI_HPP_
diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp
index e8d075c50..6fe2891c1 100644
--- a/net/cpp/src/universe.cpp
+++ b/net/cpp/src/universe.cpp
@@ -120,7 +120,7 @@ void Universe::_installBindings(Peer *p) {
 void Universe::_installBindings() {
 	bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
 		LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
-		subscribers_[uri].push_back(id);
+		subscribers_[ftl::URI(uri).to_string()].push_back(id);
 		return true;
 	});
 	
@@ -148,6 +148,7 @@ bool Universe::createResource(const std::string &uri) {
 	return true;
 }
 
+// TODO (nick) Add URI version and correctly parse URI query parameters
 int Universe::numberOfSubscribers(const std::string &res) const {
 	auto s = subscribers_.find(res);
 	if (s != subscribers_.end()) {
@@ -157,6 +158,15 @@ int Universe::numberOfSubscribers(const std::string &res) const {
 	}
 }
 
+bool Universe::hasSubscribers(const std::string &res) const {
+	// FIXME (nick) Need to parse URI and correct query order
+	return numberOfSubscribers(res) > 0;
+}
+
+bool Universe::hasSubscribers(const ftl::URI &res) const {
+	return numberOfSubscribers(res.to_string()) > 0;
+}
+
 bool Universe::_subscribe(const std::string &res) {
 	// Need to find who owns the resource
 	optional<UUID> pid = findOwner(res);
diff --git a/net/cpp/src/uri.cpp b/net/cpp/src/uri.cpp
new file mode 100644
index 000000000..7add854f6
--- /dev/null
+++ b/net/cpp/src/uri.cpp
@@ -0,0 +1,113 @@
+#include <ftl/uri.hpp>
+
+using ftl::URI;
+using std::string;
+
+URI::URI(uri_t puri) {
+    _parse(puri);
+}
+
+URI::URI(const std::string &puri) {
+    _parse(puri.c_str());
+}
+
+URI::URI(const URI &c) {
+    m_valid = c.m_valid;
+    m_host = c.m_host;
+    m_port = c.m_port;
+    m_proto = c.m_proto;
+    m_path = c.m_path;
+    m_pathseg = c.m_pathseg;
+    m_qmap = c.m_qmap;
+    m_base = c.m_base;
+}
+
+void URI::_parse(uri_t puri) {
+    UriUriA uri;
+
+#ifdef HAVE_URIPARSESINGLE
+    const char *errpos;
+    if (uriParseSingleUriA(&uri, puri, &errpos) != URI_SUCCESS) {
+#else
+    UriParserStateA uris;
+    uris.uri = &uri;
+    if (uriParseUriA(&uris, puri) != URI_SUCCESS) {
+#endif
+        m_valid = false;
+        m_host = "none";
+        m_port = -1;
+        m_proto = SCHEME_NONE;
+        m_path = "";
+    } else {
+        m_host = std::string(uri.hostText.first, uri.hostText.afterLast - uri.hostText.first);
+        
+        std::string prototext = std::string(uri.scheme.first, uri.scheme.afterLast - uri.scheme.first);
+        if (prototext == "tcp") m_proto = SCHEME_TCP;
+        else if (prototext == "udp") m_proto = SCHEME_UDP;
+        else if (prototext == "ftl") m_proto = SCHEME_FTL;
+        else if (prototext == "http") m_proto = SCHEME_HTTP;
+        else if (prototext == "ws") m_proto = SCHEME_WS;
+        else if (prototext == "ipc") m_proto = SCHEME_IPC;
+        else m_proto = SCHEME_OTHER;
+
+        std::string porttext = std::string(uri.portText.first, uri.portText.afterLast - uri.portText.first);
+        m_port = atoi(porttext.c_str());
+
+        for (auto h=uri.pathHead; h!=NULL; h=h->next) {
+            auto pstr = std::string(
+                    h->text.first, h->text.afterLast - h->text.first);
+
+            m_path += "/";
+            m_path += pstr;
+            m_pathseg.push_back(pstr);
+        }
+
+        //string query = std::string(uri.query.first, uri.query.afterLast - uri.query.first);
+        if (uri.query.afterLast - uri.query.first > 0) {
+            UriQueryListA *queryList;
+            int itemCount;
+            if (uriDissectQueryMallocA(&queryList, &itemCount, uri.query.first,
+                    uri.query.afterLast) != URI_SUCCESS) {
+                // Failure
+            }
+            
+            UriQueryListA *item = queryList;
+            while (item) {
+                m_qmap[item->key] = item->value;
+                item = item->next;
+            }
+
+            uriFreeQueryListA(queryList);
+        }
+
+        uriFreeUriMembersA(&uri);
+
+        m_valid = m_proto != SCHEME_NONE && m_host.size() > 0;
+
+        if (m_valid) {
+            if (m_qmap.size() > 0) m_base = std::string(uri.scheme.first, uri.query.first - uri.scheme.first - 1);
+            else m_base = std::string(uri.scheme.first);
+        }
+    }
+}
+
+string URI::to_string() const {
+    return (m_qmap.size() > 0) ? m_base + "?" + getQuery() : m_base;
+}
+
+string URI::getQuery() const {
+    string q;
+    for (auto x : m_qmap) {
+        if (q.length() > 0) q += "&";
+        q += x.first + "=" + x.second;
+    }
+    return q;
+};
+
+void URI::setAttribute(const string &key, const string &value) {
+    m_qmap[key] = value;
+}
+
+void URI::setAttribute(const string &key, int value) {
+    m_qmap[key] = std::to_string(value);
+}
diff --git a/net/cpp/test/CMakeLists.txt b/net/cpp/test/CMakeLists.txt
index c50a4d6df..b641391a3 100644
--- a/net/cpp/test/CMakeLists.txt
+++ b/net/cpp/test/CMakeLists.txt
@@ -3,6 +3,7 @@ add_executable(peer_unit
 	./tests.cpp
 	../src/ws_internal.cpp
 	../src/dispatcher.cpp
+	../src/uri.cpp
 	./peer_unit.cpp
 	../../../common/cpp/src/config.cpp
 )
@@ -15,6 +16,7 @@ target_link_libraries(peer_unit
 ### URI ########################################################################
 add_executable(uri_unit
 	./tests.cpp
+	../src/uri.cpp
 	./uri_unit.cpp)
 target_link_libraries(uri_unit
 	${URIPARSER_LIBRARIES})
diff --git a/net/cpp/test/uri_unit.cpp b/net/cpp/test/uri_unit.cpp
index 7aa68f79a..e11b320f4 100644
--- a/net/cpp/test/uri_unit.cpp
+++ b/net/cpp/test/uri_unit.cpp
@@ -2,6 +2,7 @@
 #include <ftl/uri.hpp>
 
 using ftl::URI;
+using std::string;
 
 SCENARIO( "URI() can parse valid URIs", "[utility]" ) {
 	GIVEN( "a valid scheme, no port or path" ) {
@@ -59,3 +60,32 @@ SCENARIO( "URI() fails gracefully with invalid URIs", "[utility]" ) {
 	}
 }
 
+SCENARIO( "URI::to_string() from a valid URI" ) {
+	GIVEN( "no query component" ) {
+		URI uri("http://localhost:1000/hello");
+		REQUIRE( uri.to_string() == "http://localhost:1000/hello" );
+	}
+
+	GIVEN( "A single query component" ) {
+		URI uri("http://localhost:1000/hello?x=5");
+		REQUIRE( uri.to_string() == "http://localhost:1000/hello?x=5" );
+	}
+
+	GIVEN( "an unsorted set of query components" ) {
+		URI uri("http://localhost:1000/hello?z=5&y=4&x=2");
+		REQUIRE( uri.to_string() == "http://localhost:1000/hello?x=2&y=4&z=5" );
+	}
+}
+
+SCENARIO( "URI::getAttribute() from query" ) {
+	GIVEN( "a string value" ) {
+		URI uri("http://localhost:1000/hello?x=world");
+		REQUIRE( uri.getAttribute<string>("x") == "world" );
+	}
+
+	GIVEN( "an integer value" ) {
+		URI uri("http://localhost:1000/hello?x=56");
+		REQUIRE( uri.getAttribute<int>("x") == 56 );
+	}
+}
+
-- 
GitLab