diff --git a/net/CMakeLists.txt b/net/CMakeLists.txt index a372bcfd036710e528a72dd3c6cf1cb124c44624..386832cf37e11d2584c4de1ad3a07beaafcd2ee7 100644 --- a/net/CMakeLists.txt +++ b/net/CMakeLists.txt @@ -15,7 +15,7 @@ set(ftl_VERSION_MAJOR "1") set(ftl_VERSION_MINOR "0") set(ftl_VERSION_PATCH "0") -set(CMAKE_CXX_FLAGS "-pthread -fopenmp -std=c++14 -Wall -Wno-deprecated -Werror -Wno-psabi") +set(CMAKE_CXX_FLAGS "-pthread -fopenmp -std=c++17 -Wall -Wno-deprecated -Werror -Wno-psabi") set(CMAKE_CXX_FLAGS_DEBUG "-D_DEBUG -pg -Wall -Werror") set(CMAKE_CXX_FLAGS_RELEASE "-O3") diff --git a/net/include/ftl/net/func_traits.hpp b/net/include/ftl/net/func_traits.hpp index ae12b335a76d4231c7e497dc0c5440dc7301f5a6..6ea7cf2f317079fe55cf5a2eef5c4391b88ffcbb 100644 --- a/net/include/ftl/net/func_traits.hpp +++ b/net/include/ftl/net/func_traits.hpp @@ -14,6 +14,9 @@ namespace internal { template<typename T> using invoke = typename T::type; +template <typename T, typename... ARGS> +struct first_type { typedef T type; }; + template<typename T, T I> struct constant : std::integral_constant<T, I> {}; diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp index 08f8c25c5920e83bc3cf5e5b98cb29a73e1ae87e..ce515bce796fa0ff4e92ca8e47598f9757cfa4d0 100644 --- a/net/include/ftl/net/socket.hpp +++ b/net/include/ftl/net/socket.hpp @@ -18,10 +18,26 @@ #endif #include <sstream> +#include <type_traits> namespace ftl { namespace net { +struct virtual_caller { + virtual void operator()(msgpack::object &o)=0; +}; + +template <typename T> +struct caller : virtual_caller { + caller(std::function<void(const T&)> f) : f_(f) {}; + void operator()(msgpack::object &o) { T r = o.as<T>(); f_(r); }; + std::function<void(const T&)> f_; +}; + +/** + * A single socket connection object, to be constructed using the connect() + * function and not to be created directly. + */ class Socket { public: Socket(const char *uri); @@ -30,82 +46,62 @@ class Socket { int close(); - int send(uint32_t service, const std::string &data); - int send(uint32_t service, std::stringstream &data) { - return send(service, data.str()); }; - int send(uint32_t service, void *data, int length); - - int send2(uint32_t service, const std::string &data1, - const std::string &data2); - - //friend bool ftl::net::run(bool); - + /** + * Get the internal OS dependent socket. + */ int _socket() const { return sock_; }; bool isConnected() const { return sock_ != INVALID_SOCKET && connected_; }; bool isValid() const { return valid_ && sock_ != INVALID_SOCKET; }; + + /** + * Get the sockets protocol, address and port as a url string. This will be + * the same as the initial connection string on the client. + */ std::string getURI() const { return uri_; }; /** - * Bind a function to a RPC call name. + * Bind a function to an RPC call name. */ template <typename F> - void bind(const std::string &name, F func) { - disp_.bind(name, func, - typename ftl::internal::func_kind_info<F>::result_kind(), - typename ftl::internal::func_kind_info<F>::args_kind()); - } + void bind(const std::string &name, F func); /** * Bind a function to a raw message type. */ void bind(uint32_t service, std::function<void(Socket&, const std::string&)> func); - + /** - * Remote Procedure Call. + * Non-blocking Remote Procedure Call using a callback function. */ template <typename T, typename... ARGS> - T call(const std::string &name, ARGS... args) { - bool hasreturned = false; - T result; - async_call(name, [&result,&hasreturned](msgpack::object &r) { - hasreturned = true; - result = r.as<T>(); - }, std::forward<ARGS>(args)...); - - // Loop the network - int limit = 10; - while (limit > 0 && !hasreturned) { - limit--; - ftl::net::wait(); - } - - return result; - } + void asyncCall(const std::string &name, + std::function<void(const T&)> cb, + ARGS... args); + // TODO use "call" instead of "acall" causes compiler to loop. - template <typename... ARGS> - void async_call( - const std::string &name, - std::function<void(msgpack::object&)> cb, - ARGS... args) { - auto args_obj = std::make_tuple(args...); - auto rpcid = rpcid__++; - auto call_obj = std::make_tuple(0,rpcid,name,args_obj); - - LOG(INFO) << "RPC " << name << "() -> " << uri_; - - std::stringstream buf; - msgpack::pack(buf, call_obj); - - // Register the CB - callbacks_[rpcid] = cb; - - send(FTL_PROTOCOL_RPC, buf.str()); - } + /** + * Blocking Remote Procedure Call. + */ + template <typename R, typename... ARGS> + R call(const std::string &name, ARGS... args); + + /** + * Send data to given service number. + */ + int send(uint32_t service, const std::string &data); /** - * Internal handlers for specific event types. + * Send with two distinct data source. Avoids the need to memcpy them to a + * single buffer. + */ + int send2(uint32_t service, const std::string &data1, + const std::string &data2); + + /** + * Internal handlers for specific event types. This should be private but + * is current here for testing purposes. * @{ */ void dispatchRPC(const std::string &d) { disp_.dispatch(d); } @@ -120,23 +116,27 @@ class Socket { bool data(); void error(); + + private: // Functions + void _connected(); + void _updateURI(); - private: - std::string uri_; - int sock_; - size_t pos_; - char *buffer_; - std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_; - std::vector<std::function<void(Socket&)>> connect_handlers_; + private: // Data bool valid_; bool connected_; - std::map<int, std::function<void(msgpack::object&)>> callbacks_; - ftl::net::Dispatcher disp_; uint32_t version_; + int sock_; + size_t pos_; + char *buffer_; + + std::string uri_; std::string peerid_; - void _connected(); - void _updateURI(); + ftl::net::Dispatcher disp_; + + std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_; + std::vector<std::function<void(Socket&)>> connect_handlers_; + std::map<int, std::unique_ptr<virtual_caller>> callbacks_; static int rpcid__; @@ -144,6 +144,55 @@ class Socket { static const int BUFFER_SIZE = MAX_MESSAGE + 16; }; +// --- Inline Template Implementations ----------------------------------------- + +template <typename F> +void Socket::bind(const std::string &name, F func) { + disp_.bind(name, func, + typename ftl::internal::func_kind_info<F>::result_kind(), + typename ftl::internal::func_kind_info<F>::args_kind()); +} + +//template <typename T, typename... ARGS> +template <typename R, typename... ARGS> +R Socket::call(const std::string &name, ARGS... args) { + bool hasreturned = false; + R result; + asyncCall<R>(name, [&result,&hasreturned](const R &r) { + hasreturned = true; + result = r; + }, std::forward<ARGS>(args)...); + + // Loop the network + int limit = 10; + while (limit > 0 && !hasreturned) { + limit--; + ftl::net::wait(); + } + + return result; +} + +template <typename T, typename... ARGS> +void Socket::asyncCall( + const std::string &name, + std::function<void(const T&)> cb, + ARGS... args) { + auto args_obj = std::make_tuple(args...); + auto rpcid = rpcid__++; + auto call_obj = std::make_tuple(0,rpcid,name,args_obj); + + LOG(INFO) << "RPC " << name << "() -> " << uri_; + + std::stringstream buf; + msgpack::pack(buf, call_obj); + + // Register the CB + callbacks_[rpcid] = std::make_unique<caller<T>>(cb); + + send(FTL_PROTOCOL_RPC, buf.str()); +} + }; }; diff --git a/net/src/socket.cpp b/net/src/socket.cpp index 9457631362b68341b7f44833416ce6aef5dd6c7a..c1f6e2816cf8de327232250ea437668064631f7c 100644 --- a/net/src/socket.cpp +++ b/net/src/socket.cpp @@ -128,7 +128,7 @@ Socket::Socket(int s) : sock_(s), pos_(0), disp_(this) { _updateURI(); } -Socket::Socket(const char *pUri) : uri_(pUri), pos_(0), disp_(this) { +Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), disp_(this) { // Allocate buffer buffer_ = new char[BUFFER_SIZE]; @@ -340,7 +340,7 @@ void Socket::dispatchReturn(const std::string &d) { if (callbacks_.count(id) > 0) { LOG(INFO) << "Received return RPC value"; - callbacks_[id](res); + (*callbacks_[id])(res); callbacks_.erase(id); } else { LOG(ERROR) << "Missing RPC callback for result";