diff --git a/net/cpp/include/ftl/net/dispatcher.hpp b/net/cpp/include/ftl/net/dispatcher.hpp index 67840a78b205c072a1de44e6e7153fcba93e1041..46505835223ad64419d6d1db95676e4888150206 100644 --- a/net/cpp/include/ftl/net/dispatcher.hpp +++ b/net/cpp/include/ftl/net/dispatcher.hpp @@ -12,6 +12,9 @@ #include <tuple> #include <functional> #include <iostream> +#include <vector> +#include <string> +#include <unordered_map> namespace ftl { @@ -116,6 +119,8 @@ class Dispatcher { })); } + std::vector<std::string> getBindings() const; + using adaptor_type = std::function<std::unique_ptr<msgpack::object_handle>( msgpack::object const &)>; diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index ec5562be34bd811af80117784604c1ac671e88b5..0de0b4ea2bce5d9fc5eab5706347314ce0accd9b 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -4,6 +4,7 @@ #define GLOG_NO_ABBREVIATED_SEVERITIES #include <glog/logging.h> #include <ftl/net/protocol.hpp> +#include <ftl/net/dispatcher.hpp> #include <ftl/uri.hpp> #include <ftl/uuid.hpp> @@ -42,7 +43,7 @@ struct caller : virtual_caller { std::function<void(const T&)> f_; }; -typedef std::tuple<const char*,size_t> array; +//typedef std::tuple<const char*,size_t> array; /*struct compress{}; struct encrypt{}; struct decompress{}; @@ -61,8 +62,8 @@ class Peer { }; public: - explicit Peer(const char *uri); - explicit Peer(int s); + explicit Peer(const char *uri, ftl::net::Dispatcher *d=nullptr); + explicit Peer(int s, ftl::net::Dispatcher *d=nullptr); ~Peer(); /** @@ -123,7 +124,9 @@ class Peer { int send(const std::string &name, ARGS... args); /** - * Bind a function to an RPC call name. + * Bind a function to an RPC call name. Note: if an overriding dispatcher + * is used then these bindings will propagate to all peers sharing that + * dispatcher. */ template <typename F> void bind(const std::string &name, F func); @@ -162,7 +165,7 @@ class Peer { int _send(); - template <typename... ARGS> + /*template <typename... ARGS> int _send(const std::string &t, ARGS... args); template <typename... ARGS> @@ -176,13 +179,14 @@ class Peer { template <typename T, typename... ARGS, ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> - int _send(const T &t, ARGS... args); + int _send(const T &t, ARGS... args);*/ private: // Data Status status_; int sock_; ftl::URI::scheme_t scheme_; uint32_t version_; + bool destroy_disp_; // Receive buffers msgpack::unpacker recv_buf_; @@ -193,7 +197,7 @@ class Peer { std::string uri_; ftl::UUID peerid_; - ftl::net::Dispatcher disp_; + ftl::net::Dispatcher *disp_; std::vector<std::function<void()>> open_handlers_; //std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_ std::vector<std::function<void()>> close_handlers_; @@ -208,7 +212,6 @@ template <typename... ARGS> int Peer::send(const std::string &s, ARGS... args) { // Leave a blank entry for websocket header if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); - //msgpack::pack(send_buf_, std::make_tuple(s, std::make_tuple(args...))); auto args_obj = std::make_tuple(args...); auto call_obj = std::make_tuple(0,s,args_obj); msgpack::pack(send_buf_, call_obj); @@ -217,79 +220,11 @@ int Peer::send(const std::string &s, ARGS... args) { template <typename F> void Peer::bind(const std::string &name, F func) { - disp_.bind(name, func, + disp_->bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::args_kind()); } -/*template <typename T> -int Socket::read(T *b, size_t count) { - static_assert(std::is_trivial<T>::value, "Can only read trivial types"); - return read((char*)b, sizeof(T)*count); -} - -template <typename T> -int Socket::read(std::vector<T> &b, size_t count) { - count = (count == 0) ? size()/sizeof(T) : count; // TODO Round this! - if (b.size() != count) b.resize(count); - return read((char*)b.data(), sizeof(T)*count); -} - -template <typename T> -int Socket::read(T &b) { - if (std::is_array<T>::value) return read(&b,std::extent<T>::value); - else return read(&b); -} - -template <typename T> -Socket &Socket::operator>>(T &t) { - if (std::is_array<T>::value) read(&t,std::extent<T>::value); - else read(&t); - return *this; -}*/ - -/*template <typename... ARGS> -int Peer::_send(const std::string &t, ARGS... args) { - //send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); - //header_w_->size += t.size(); - msgpack::pack(send_buf_, t); - return _send(args...)+t.size(); -} - -template <typename... ARGS> -int Peer::_send(const ftl::net::array &b, ARGS... args) { - //send_vec_.push_back({const_cast<char*>(std::get<0>(b)),std::get<1>(b)}); - //header_w_->size += std::get<1>(b); - msgpack::pack(send_buf_, msgpack::type::raw_ref(std::get<0>(b), std::get<1>(b))); - return std::get<1>(b)+_send(args...); -} - -template <typename T, typename... ARGS> -int Peer::_send(const std::vector<T> &t, ARGS... args) { - //send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); - //header_w_->size += t.size(); - msgpack::pack(send_buf_, t); - return _send(args...)+t.size(); -} - -template <typename... Types, typename... ARGS> -int Peer::_send(const std::tuple<Types...> &t, ARGS... args) { - //send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)}); - //header_w_->size += sizeof(t); - msgpack::pack(send_buf_, t); - return sizeof(t)+_send(args...); -} - -template <typename T, typename... ARGS, - ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> -int Peer::_send(const T &t, ARGS... args) { - //send_vec_.push_back({const_cast<T*>(&t),sizeof(T)}); - //header_w_->size += sizeof(T); - msgpack::pack(send_buf_, t); - return sizeof(T)+_send(args...); -}*/ - -//template <typename T, typename... ARGS> template <typename R, typename... ARGS> R Peer::call(const std::string &name, ARGS... args) { bool hasreturned = false; diff --git a/net/cpp/include/ftl/net/protocol.hpp b/net/cpp/include/ftl/net/protocol.hpp index 0fc0f7083f0c4b5957e55c9c791a38f2db493e91..b9effae8226571591051cc940bed56e8430ce248 100644 --- a/net/cpp/include/ftl/net/protocol.hpp +++ b/net/cpp/include/ftl/net/protocol.hpp @@ -2,13 +2,9 @@ #define _FTL_NET_PROTOCOL_HPP_ #include <ftl/uuid.hpp> -#include <ftl/net/func_traits.hpp> -#include <ftl/net/dispatcher.hpp> #include <ftl/config.h> #include <tuple> - - namespace ftl { namespace net { diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index aae8a6f0723015f01e6f6c28cac56eae8094b861..a60785654c4f1caa8a7a734cb769992e91db1158 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -3,6 +3,8 @@ #include <ftl/net/peer.hpp> #include <ftl/net/listener.hpp> +#include <ftl/net/dispatcher.hpp> +#include <ftl/uuid.hpp> #include <vector> #include <string> #include <thread> @@ -10,14 +12,59 @@ namespace ftl { namespace net { +/** + * Represents a group of network peers and their resources, managing the + * searching of and sharing of resources across peers. Each universe can + * listen on multiple ports/interfaces for connecting peers, and can connect + * to any number of peers. The creation of a Universe object also creates a + * new thread to manage the networking, therefore it is threadsafe but + * callbacks will execute in a different thread so must also be threadsafe in + * their actions. + */ class Universe { public: + /** + * Constructor with a URI base. The base uri is used as a base to validate + * resource identifiers. (it may be removed). This creates a new thread + * to monitor network sockets. + */ explicit Universe(const std::string &base); + + /** + * The destructor will terminate the network thread before completing. + */ ~Universe(); + /** + * Open a new listening port on a given interfaces. + * eg. "tcp://localhost:9000" + * @param addr URI giving protocol, interface and port + */ bool listen(const std::string &addr); + + /** + * Create a new peer connection. + * eg. "tcp://10.0.0.2:9000" + * Supported protocols include tcp and ws. + * + * @param addr URI giving protocol, interface and port + */ bool connect(const std::string &addr); + /** + * Bind a function to an RPC or service call name. This will implicitely + * be called by any peer making the request. + */ + template <typename F> + void bind(const std::string &name, F func); + + /** + * Send a non-blocking RPC call with no return value to all connected + * peers. + */ + template <typename... ARGS> + void broadcast(const std::string &name, ARGS... args); + private: void _run(); int _setDescriptors(); @@ -33,9 +80,12 @@ class Universe { fd_set sfdread_; std::vector<ftl::net::Listener*> listeners_; std::vector<ftl::net::Peer*> peers_; + ftl::UUID id_; + ftl::net::Dispatcher disp_; }; }; // namespace net }; // namespace ftl #endif // _FTL_NET_UNIVERSE_HPP_ + diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp index adc4758d0de3b918ba8e8a8c4ca770e08ff48308..394307e1949297f6b5fd549b80307d64d975bf46 100644 --- a/net/cpp/src/dispatcher.cpp +++ b/net/cpp/src/dispatcher.cpp @@ -5,6 +5,9 @@ #include <iostream> using ftl::net::Peer; +using ftl::net::Dispatcher; +using std::vector; +using std::string; /*static std::string hexStr(const std::string &s) { @@ -23,6 +26,14 @@ using ftl::net::Peer; // dispatch(s, unpacked.get()); //} +vector<string> Dispatcher::getBindings() const { + vector<string> res; + for (auto x : funcs_) { + res.push_back(x.first); + } + return res; +} + void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { switch (msg.via.array.size) { case 3: diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index a6efd0692bbc8e7fb2c984245f4519fca1894150..b01ac81d934823813e446829da447282c48bb38e 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -36,6 +36,7 @@ using std::get; using ftl::net::Peer; using ftl::URI; using ftl::net::ws_connect; +using ftl::net::Dispatcher; /*static std::string hexStr(const std::string &s) { @@ -123,10 +124,18 @@ static int tcpConnect(URI &uri) { return csocket; } -Peer::Peer(int s) : sock_(s) { +Peer::Peer(int s, Dispatcher *d) : sock_(s) { status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; _updateURI(); + if (d != nullptr) { + disp_ = d; + destroy_disp_ = false; + } else { + disp_ = new Dispatcher(); + destroy_disp_ = true; + } + // Send the initiating handshake if valid if (status_ == kConnecting) { // Install return handshake handler. @@ -146,11 +155,19 @@ Peer::Peer(int s) : sock_(s) { } } -Peer::Peer(const char *pUri) : uri_(pUri) { +Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { URI uri(pUri); status_ = kInvalid; sock_ = INVALID_SOCKET; + + if (d != nullptr) { + disp_ = d; + destroy_disp_ = false; + } else { + disp_ = new Dispatcher(); + destroy_disp_ = true; + } scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -301,7 +318,7 @@ bool Peer::data() { return false; } } - disp_.dispatch(*this, obj); + disp_->dispatch(*this, obj); } return false; } @@ -498,5 +515,9 @@ int Peer::_send() { Peer::~Peer() { close(); + + if (destroy_disp_) { + delete disp_; + } } diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index c7e52d6526566cb62c208d7924b146b10d106ac2..2e4659a7e0dd153ca0a38228b861a5cee2a192a1 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -36,7 +36,7 @@ bool Universe::listen(const string &addr) { } bool Universe::connect(const string &addr) { - auto p = new Peer(addr.c_str()); + auto p = new Peer(addr.c_str(), &disp_); if (!p) return false; if (p->status() != Peer::kInvalid) { @@ -81,7 +81,11 @@ int Universe::_setDescriptors() { } void Universe::_installBindings(Peer *p) { - + p->bind("__subscribe__", [this](const string &uri) { + // Add this peer to subscription list for uri resource + }); + + } void Universe::__start(Universe * u) { @@ -127,7 +131,7 @@ void Universe::_run() { int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); if (csock != INVALID_SOCKET) { - auto p = new Peer(csock); + auto p = new Peer(csock, &disp_); peers_.push_back(p); _installBindings(p);