Skip to content
Snippets Groups Projects
Commit ec560ae9 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Update URI class and pub sub mechanism

parent c1cec0ff
No related branches found
No related tags found
No related merge requests found
Pipeline #10309 passed
...@@ -4,6 +4,7 @@ include_directories(${PROJECT_SOURCE_DIR}/net/cpp/include) ...@@ -4,6 +4,7 @@ include_directories(${PROJECT_SOURCE_DIR}/net/cpp/include)
add_library(ftlnet add_library(ftlnet
src/uri.cpp
src/listener.cpp src/listener.cpp
src/peer.cpp src/peer.cpp
src/dispatcher.cpp src/dispatcher.cpp
......
...@@ -58,6 +58,9 @@ class Universe { ...@@ -58,6 +58,9 @@ class Universe {
Peer *getPeer(const ftl::UUID &pid) const; Peer *getPeer(const ftl::UUID &pid) const;
int numberOfSubscribers(const std::string &res) const; int numberOfSubscribers(const std::string &res) const;
bool hasSubscribers(const std::string &res) const;
bool hasSubscribers(const ftl::URI &res) const;
/** /**
* Bind a function to an RPC or service call name. This will implicitely * Bind a function to an RPC or service call name. This will implicitely
...@@ -73,6 +76,14 @@ class Universe { ...@@ -73,6 +76,14 @@ class Universe {
*/ */
template <typename F> template <typename F>
bool subscribe(const std::string &res, F func); bool subscribe(const std::string &res, F func);
/**
* Subscribe a function to a resource. The subscribed function is
* triggered whenever that resource is published to. It is akin to
* RPC broadcast (no return value) to a subgroup of peers.
*/
template <typename F>
bool subscribe(const ftl::URI &res, F func);
/** /**
* Send a non-blocking RPC call with no return value to all connected * Send a non-blocking RPC call with no return value to all connected
...@@ -89,10 +100,19 @@ class Universe { ...@@ -89,10 +100,19 @@ class Universe {
/** /**
* Send a non-blocking RPC call with no return value to all subscribers * Send a non-blocking RPC call with no return value to all subscribers
* of a resource. There may be no subscribers. * of a resource. There may be no subscribers. Note that query parameter
* order in the URI string is not important.
*/ */
template <typename... ARGS> template <typename... ARGS>
void publish(const std::string &res, ARGS... args); void publish(const std::string &res, ARGS... args);
/**
* Send a non-blocking RPC call with no return value to all subscribers
* of a resource. There may be no subscribers. This overload accepts a
* URI object directly to enable more efficient modification of parameters.
*/
template <typename... ARGS>
void publish(const ftl::URI &res, ARGS... args);
/** /**
* Register your ownership of a new resource. This must be called before * Register your ownership of a new resource. This must be called before
...@@ -139,8 +159,13 @@ void Universe::bind(const std::string &name, F func) { ...@@ -139,8 +159,13 @@ void Universe::bind(const std::string &name, F func) {
template <typename F> template <typename F>
bool Universe::subscribe(const std::string &res, F func) { bool Universe::subscribe(const std::string &res, F func) {
bind(res, func); return subscribe(ftl::URI(res), func);
return _subscribe(res); }
template <typename F>
bool Universe::subscribe(const ftl::URI &res, F func) {
bind(res.to_string(), func);
return _subscribe(res.to_string());
} }
template <typename... ARGS> template <typename... ARGS>
...@@ -200,11 +225,17 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { ...@@ -200,11 +225,17 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
template <typename... ARGS> template <typename... ARGS>
void Universe::publish(const std::string &res, ARGS... args) { void Universe::publish(const std::string &res, ARGS... args) {
auto subs = subscribers_[res]; ftl::URI uri(res);
publish(uri, args...);
}
template <typename... ARGS>
void Universe::publish(const ftl::URI &res, ARGS... args) {
auto subs = subscribers_[res.getBaseURI()];
for (auto p : subs) { for (auto p : subs) {
auto peer = getPeer(p); auto peer = getPeer(p);
if (peer) { if (peer) {
peer->send(res, args...); peer->send(res.getBaseURI(), args...);
} }
} }
} }
......
...@@ -4,65 +4,20 @@ ...@@ -4,65 +4,20 @@
#include <uriparser/Uri.h> #include <uriparser/Uri.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include <map>
namespace ftl { namespace ftl {
typedef const char * uri_t; typedef const char * uri_t;
/**
* Universal Resource Identifier. Parse, modify, represent and generate URIs.
*/
class URI { class URI {
public: public:
explicit URI(uri_t puri) { explicit URI(uri_t puri);
UriUriA uri; explicit URI(const std::string &puri);
explicit URI(const URI &c);
#ifdef HAVE_URIPARSESINGLE
const char *errpos;
if (uriParseSingleUriA(&uri, puri, &errpos) != URI_SUCCESS) {
#else
UriParserStateA uris;
uris.uri = &uri;
if (uriParseUriA(&uris, puri) != URI_SUCCESS) {
#endif
m_valid = false;
m_host = "none";
m_port = -1;
m_proto = SCHEME_NONE;
m_path = "";
} else {
m_host = std::string(uri.hostText.first, uri.hostText.afterLast - uri.hostText.first);
std::string prototext = std::string(uri.scheme.first, uri.scheme.afterLast - uri.scheme.first);
if (prototext == "tcp") m_proto = SCHEME_TCP;
else if (prototext == "udp") m_proto = SCHEME_UDP;
else if (prototext == "ftl") m_proto = SCHEME_FTL;
else if (prototext == "http") m_proto = SCHEME_HTTP;
else if (prototext == "ws") m_proto = SCHEME_WS;
else if (prototext == "ipc") m_proto = SCHEME_IPC;
else m_proto = SCHEME_OTHER;
std::string porttext = std::string(uri.portText.first, uri.portText.afterLast - uri.portText.first);
m_port = atoi(porttext.c_str());
for (auto h=uri.pathHead; h!=NULL; h=h->next) {
auto pstr = std::string(
h->text.first, h->text.afterLast - h->text.first);
m_path += "/";
m_path += pstr;
m_pathseg.push_back(pstr);
}
m_query = std::string(uri.query.first, uri.query.afterLast - uri.query.first);
uriFreeUriMembersA(&uri);
m_valid = m_proto != SCHEME_NONE && m_host.size() > 0;
if (m_valid) {
if (m_query.size() > 0) m_base = std::string(uri.scheme.first, uri.query.first - uri.scheme.first - 1);
else m_base = std::string(uri.scheme.first);
}
}
}
~URI() {}; ~URI() {};
...@@ -84,10 +39,23 @@ namespace ftl { ...@@ -84,10 +39,23 @@ namespace ftl {
scheme_t getProtocol() const { return m_proto; }; scheme_t getProtocol() const { return m_proto; };
scheme_t getScheme() const { return m_proto; }; scheme_t getScheme() const { return m_proto; };
const std::string &getPath() const { return m_path; }; const std::string &getPath() const { return m_path; };
const std::string &getQuery() const { return m_query; }; std::string getQuery() const;
const std::string &getBaseURI() const { return m_base; }; const std::string &getBaseURI() const { return m_base; };
const std::string &getPathSegment(int n) const { return m_pathseg[n]; }; const std::string &getPathSegment(int n) const { return m_pathseg[n]; };
void setAttribute(const std::string &key, const std::string &value);
void setAttribute(const std::string &key, int value);
template <typename T>
T getAttribute(const std::string &key) {
return T(m_qmap[key]);
}
std::string to_string() const;
private:
void _parse(uri_t puri);
private: private:
bool m_valid; bool m_valid;
std::string m_host; std::string m_host;
...@@ -96,8 +64,19 @@ namespace ftl { ...@@ -96,8 +64,19 @@ namespace ftl {
std::vector<std::string> m_pathseg; std::vector<std::string> m_pathseg;
int m_port; int m_port;
scheme_t m_proto; scheme_t m_proto;
std::string m_query; // std::string m_query;
std::map<std::string, std::string> m_qmap;
}; };
template <>
inline int URI::getAttribute<int>(const std::string &key) {
return std::stoi(m_qmap[key]);
}
template <>
inline std::string URI::getAttribute<std::string>(const std::string &key) {
return m_qmap[key];
}
} }
#endif // _FTL_URI_HPP_ #endif // _FTL_URI_HPP_
...@@ -120,7 +120,7 @@ void Universe::_installBindings(Peer *p) { ...@@ -120,7 +120,7 @@ void Universe::_installBindings(Peer *p) {
void Universe::_installBindings() { void Universe::_installBindings() {
bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
LOG(INFO) << "Subscription to " << uri << " by " << id.to_string(); LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
subscribers_[uri].push_back(id); subscribers_[ftl::URI(uri).to_string()].push_back(id);
return true; return true;
}); });
...@@ -148,6 +148,7 @@ bool Universe::createResource(const std::string &uri) { ...@@ -148,6 +148,7 @@ bool Universe::createResource(const std::string &uri) {
return true; return true;
} }
// TODO (nick) Add URI version and correctly parse URI query parameters
int Universe::numberOfSubscribers(const std::string &res) const { int Universe::numberOfSubscribers(const std::string &res) const {
auto s = subscribers_.find(res); auto s = subscribers_.find(res);
if (s != subscribers_.end()) { if (s != subscribers_.end()) {
...@@ -157,6 +158,15 @@ int Universe::numberOfSubscribers(const std::string &res) const { ...@@ -157,6 +158,15 @@ int Universe::numberOfSubscribers(const std::string &res) const {
} }
} }
bool Universe::hasSubscribers(const std::string &res) const {
// FIXME (nick) Need to parse URI and correct query order
return numberOfSubscribers(res) > 0;
}
bool Universe::hasSubscribers(const ftl::URI &res) const {
return numberOfSubscribers(res.to_string()) > 0;
}
bool Universe::_subscribe(const std::string &res) { bool Universe::_subscribe(const std::string &res) {
// Need to find who owns the resource // Need to find who owns the resource
optional<UUID> pid = findOwner(res); optional<UUID> pid = findOwner(res);
......
#include <ftl/uri.hpp>
using ftl::URI;
using std::string;
URI::URI(uri_t puri) {
_parse(puri);
}
URI::URI(const std::string &puri) {
_parse(puri.c_str());
}
URI::URI(const URI &c) {
m_valid = c.m_valid;
m_host = c.m_host;
m_port = c.m_port;
m_proto = c.m_proto;
m_path = c.m_path;
m_pathseg = c.m_pathseg;
m_qmap = c.m_qmap;
m_base = c.m_base;
}
void URI::_parse(uri_t puri) {
UriUriA uri;
#ifdef HAVE_URIPARSESINGLE
const char *errpos;
if (uriParseSingleUriA(&uri, puri, &errpos) != URI_SUCCESS) {
#else
UriParserStateA uris;
uris.uri = &uri;
if (uriParseUriA(&uris, puri) != URI_SUCCESS) {
#endif
m_valid = false;
m_host = "none";
m_port = -1;
m_proto = SCHEME_NONE;
m_path = "";
} else {
m_host = std::string(uri.hostText.first, uri.hostText.afterLast - uri.hostText.first);
std::string prototext = std::string(uri.scheme.first, uri.scheme.afterLast - uri.scheme.first);
if (prototext == "tcp") m_proto = SCHEME_TCP;
else if (prototext == "udp") m_proto = SCHEME_UDP;
else if (prototext == "ftl") m_proto = SCHEME_FTL;
else if (prototext == "http") m_proto = SCHEME_HTTP;
else if (prototext == "ws") m_proto = SCHEME_WS;
else if (prototext == "ipc") m_proto = SCHEME_IPC;
else m_proto = SCHEME_OTHER;
std::string porttext = std::string(uri.portText.first, uri.portText.afterLast - uri.portText.first);
m_port = atoi(porttext.c_str());
for (auto h=uri.pathHead; h!=NULL; h=h->next) {
auto pstr = std::string(
h->text.first, h->text.afterLast - h->text.first);
m_path += "/";
m_path += pstr;
m_pathseg.push_back(pstr);
}
//string query = std::string(uri.query.first, uri.query.afterLast - uri.query.first);
if (uri.query.afterLast - uri.query.first > 0) {
UriQueryListA *queryList;
int itemCount;
if (uriDissectQueryMallocA(&queryList, &itemCount, uri.query.first,
uri.query.afterLast) != URI_SUCCESS) {
// Failure
}
UriQueryListA *item = queryList;
while (item) {
m_qmap[item->key] = item->value;
item = item->next;
}
uriFreeQueryListA(queryList);
}
uriFreeUriMembersA(&uri);
m_valid = m_proto != SCHEME_NONE && m_host.size() > 0;
if (m_valid) {
if (m_qmap.size() > 0) m_base = std::string(uri.scheme.first, uri.query.first - uri.scheme.first - 1);
else m_base = std::string(uri.scheme.first);
}
}
}
string URI::to_string() const {
return (m_qmap.size() > 0) ? m_base + "?" + getQuery() : m_base;
}
string URI::getQuery() const {
string q;
for (auto x : m_qmap) {
if (q.length() > 0) q += "&";
q += x.first + "=" + x.second;
}
return q;
};
void URI::setAttribute(const string &key, const string &value) {
m_qmap[key] = value;
}
void URI::setAttribute(const string &key, int value) {
m_qmap[key] = std::to_string(value);
}
...@@ -3,6 +3,7 @@ add_executable(peer_unit ...@@ -3,6 +3,7 @@ add_executable(peer_unit
./tests.cpp ./tests.cpp
../src/ws_internal.cpp ../src/ws_internal.cpp
../src/dispatcher.cpp ../src/dispatcher.cpp
../src/uri.cpp
./peer_unit.cpp ./peer_unit.cpp
../../../common/cpp/src/config.cpp ../../../common/cpp/src/config.cpp
) )
...@@ -15,6 +16,7 @@ target_link_libraries(peer_unit ...@@ -15,6 +16,7 @@ target_link_libraries(peer_unit
### URI ######################################################################## ### URI ########################################################################
add_executable(uri_unit add_executable(uri_unit
./tests.cpp ./tests.cpp
../src/uri.cpp
./uri_unit.cpp) ./uri_unit.cpp)
target_link_libraries(uri_unit target_link_libraries(uri_unit
${URIPARSER_LIBRARIES}) ${URIPARSER_LIBRARIES})
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
using ftl::URI; using ftl::URI;
using std::string;
SCENARIO( "URI() can parse valid URIs", "[utility]" ) { SCENARIO( "URI() can parse valid URIs", "[utility]" ) {
GIVEN( "a valid scheme, no port or path" ) { GIVEN( "a valid scheme, no port or path" ) {
...@@ -59,3 +60,32 @@ SCENARIO( "URI() fails gracefully with invalid URIs", "[utility]" ) { ...@@ -59,3 +60,32 @@ SCENARIO( "URI() fails gracefully with invalid URIs", "[utility]" ) {
} }
} }
SCENARIO( "URI::to_string() from a valid URI" ) {
GIVEN( "no query component" ) {
URI uri("http://localhost:1000/hello");
REQUIRE( uri.to_string() == "http://localhost:1000/hello" );
}
GIVEN( "A single query component" ) {
URI uri("http://localhost:1000/hello?x=5");
REQUIRE( uri.to_string() == "http://localhost:1000/hello?x=5" );
}
GIVEN( "an unsorted set of query components" ) {
URI uri("http://localhost:1000/hello?z=5&y=4&x=2");
REQUIRE( uri.to_string() == "http://localhost:1000/hello?x=2&y=4&z=5" );
}
}
SCENARIO( "URI::getAttribute() from query" ) {
GIVEN( "a string value" ) {
URI uri("http://localhost:1000/hello?x=world");
REQUIRE( uri.getAttribute<string>("x") == "world" );
}
GIVEN( "an integer value" ) {
URI uri("http://localhost:1000/hello?x=56");
REQUIRE( uri.getAttribute<int>("x") == 56 );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment