diff --git a/net/include/ftl/net/dispatcher.hpp b/net/include/ftl/net/dispatcher.hpp new file mode 100644 index 0000000000000000000000000000000000000000..0173858de485fb4cbf43c1042fd3a77eeea873ce --- /dev/null +++ b/net/include/ftl/net/dispatcher.hpp @@ -0,0 +1,136 @@ +#ifndef _FTL_NET_DISPATCHER_HPP_ +#define _FTL_NET_DISPATCHER_HPP_ + +#include <ftl/net/func_traits.hpp> +#include <msgpack.hpp> +#include <memory> +#include <tuple> +#include <functional> + +namespace ftl { + +namespace internal { + //! \brief Calls a functor with argument provided directly + template <typename Functor, typename Arg> + auto call(Functor f, Arg &&arg) + -> decltype(f(std::forward<Arg>(arg))) + { + return f(std::forward<Arg>(arg)); + } + + template <typename Functor, typename... Args, std::size_t... I> + decltype(auto) call_helper(Functor func, std::tuple<Args...> &¶ms, + std::index_sequence<I...>) { + return func(std::get<I>(params)...); + } + + //! \brief Calls a functor with arguments provided as a tuple + template <typename Functor, typename... Args> + decltype(auto) call(Functor f, std::tuple<Args...> &args) { + return call_helper(f, std::forward<std::tuple<Args...>>(args), + std::index_sequence_for<Args...>{}); + } +} + +namespace net { + +class Dispatcher { + public: + + void dispatch(const std::string &msg); + + template <typename F> + void bind(std::string const &name, F func, + ftl::internal::tags::void_result const &, + ftl::internal::tags::zero_arg const &) { + enforce_unique_name(name); + funcs_.insert( + std::make_pair(name, [func, name](msgpack::object const &args) { + enforce_arg_count(name, 0, args.via.array.size); + func(); + return std::make_unique<msgpack::object_handle>(); + })); + } + + template <typename F> + void bind(std::string const &name, F func, + ftl::internal::tags::void_result const &, + ftl::internal::tags::nonzero_arg const &) { + using ftl::internal::func_traits; + using args_type = typename func_traits<F>::args_type; + + enforce_unique_name(name); + funcs_.insert( + std::make_pair(name, [func, name](msgpack::object const &args) { + constexpr int args_count = std::tuple_size<args_type>::value; + enforce_arg_count(name, args_count, args.via.array.size); + args_type args_real; + args.convert(args_real); + ftl::internal::call(func, args_real); + return std::make_unique<msgpack::object_handle>(); + })); + } + + template <typename F> + void bind(std::string const &name, F func, + ftl::internal::tags::nonvoid_result const &, + ftl::internal::tags::zero_arg const &) { + using ftl::internal::func_traits; + + enforce_unique_name(name); + funcs_.insert(std::make_pair(name, [func, + name](msgpack::object const &args) { + enforce_arg_count(name, 0, args.via.array.size); + auto z = std::make_unique<msgpack::zone>(); + auto result = msgpack::object(func(), *z); + return std::make_unique<msgpack::object_handle>(result, std::move(z)); + })); + } + + template <typename F> + void bind(std::string const &name, F func, + ftl::internal::tags::nonvoid_result const &, + ftl::internal::tags::nonzero_arg const &) { + using ftl::internal::func_traits; + using args_type = typename func_traits<F>::args_type; + + enforce_unique_name(name); + funcs_.insert(std::make_pair(name, [func, + name](msgpack::object const &args) { + constexpr int args_count = std::tuple_size<args_type>::value; + enforce_arg_count(name, args_count, args.via.array.size); + args_type args_real; + args.convert(args_real); + auto z = std::make_unique<msgpack::zone>(); + auto result = msgpack::object(ftl::internal::call(func, args_real), *z); + return std::make_unique<msgpack::object_handle>(result, std::move(z)); + })); + } + + using adaptor_type = std::function<std::unique_ptr<msgpack::object_handle>( + msgpack::object const &)>; + + //! \brief This is the type of messages as per the msgpack-rpc spec. + using call_t = std::tuple<int8_t, uint32_t, std::string, msgpack::object>; + + //! \brief This is the type of notification messages. + using notification_t = std::tuple<int8_t, std::string, msgpack::object>; + + private: + std::unordered_map<std::string, adaptor_type> funcs_; + + static void enforce_arg_count(std::string const &func, std::size_t found, + std::size_t expected); + + void enforce_unique_name(std::string const &func); + + void dispatch(const msgpack::object &msg); + void dispatch_call(const msgpack::object &msg); + void dispatch_notification(msgpack::object const &msg); +}; + +} +} + +#endif // _FTL_NET_DISPATCHER_HPP_ + diff --git a/net/include/ftl/net/func_traits.hpp b/net/include/ftl/net/func_traits.hpp new file mode 100644 index 0000000000000000000000000000000000000000..3a4f83be288db5cb307378068d24a1108a5ff82a --- /dev/null +++ b/net/include/ftl/net/func_traits.hpp @@ -0,0 +1,94 @@ +/* Taken from rpclib */ + +#pragma once + +#ifndef FUNC_TRAITS_H_HWIWA6G0 +#define FUNC_TRAITS_H_HWIWA6G0 + +#include <type_traits> +#include <tuple> + +namespace ftl { +namespace internal { + +template<typename T> +using invoke = typename T::type; + +template<typename T, T I> +struct constant : std::integral_constant<T, I> {}; + +template<bool B> +using bool_ = constant<bool, B>; + +using true_ = bool_<true>; + +using false_ = bool_<false>; + +template <int N> +using is_zero = invoke<std::conditional<(N == 0), true_, false_>>; + +template <int N, typename... Ts> +using nth_type = invoke<std::tuple_element<N, std::tuple<Ts...>>>; + +namespace tags { + +// tags for the function traits, used for tag dispatching +struct zero_arg {}; +struct nonzero_arg {}; +struct void_result {}; +struct nonvoid_result {}; + +template <int N> struct arg_count_trait { typedef nonzero_arg type; }; + +template <> struct arg_count_trait<0> { typedef zero_arg type; }; + +template <typename T> struct result_trait { typedef nonvoid_result type; }; + +template <> struct result_trait<void> { typedef void_result type; }; +} + +//! \brief Provides a small function traits implementation that +//! works with a reasonably large set of functors. +template <typename T> +struct func_traits : func_traits<decltype(&T::operator())> {}; + +template <typename C, typename R, typename... Args> +struct func_traits<R (C::*)(Args...)> : func_traits<R (*)(Args...)> {}; + +template <typename C, typename R, typename... Args> +struct func_traits<R (C::*)(Args...) const> : func_traits<R (*)(Args...)> {}; + +template <typename R, typename... Args> struct func_traits<R (*)(Args...)> { + using result_type = R; + using arg_count = std::integral_constant<std::size_t, sizeof...(Args)>; + using args_type = std::tuple<typename std::decay<Args>::type...>; +}; + +template <typename T> +struct func_kind_info : func_kind_info<decltype(&T::operator())> {}; + +template <typename C, typename R, typename... Args> +struct func_kind_info<R (C::*)(Args...)> : func_kind_info<R (*)(Args...)> {}; + +template <typename C, typename R, typename... Args> +struct func_kind_info<R (C::*)(Args...) const> + : func_kind_info<R (*)(Args...)> {}; + +template <typename R, typename... Args> struct func_kind_info<R (*)(Args...)> { + typedef typename tags::arg_count_trait<sizeof...(Args)>::type args_kind; + typedef typename tags::result_trait<R>::type result_kind; +}; + +template <typename F> using is_zero_arg = is_zero<func_traits<F>::arg_count>; + +template <typename F> +using is_single_arg = + invoke<std::conditional<func_traits<F>::arg_count == 1, true_, false_>>; + +template <typename F> +using is_void_result = std::is_void<typename func_traits<F>::result_type>; +} +} + +#endif /* end of include guard: FUNC_TRAITS_H_HWIWA6G0 */ + diff --git a/net/include/ftl/net/handlers.hpp b/net/include/ftl/net/handlers.hpp index 9a515db88a6d776075bca2858380c6e73a04a0e9..3ddf3def2e381eaf9de16778efea2db17547733b 100644 --- a/net/include/ftl/net/handlers.hpp +++ b/net/include/ftl/net/handlers.hpp @@ -2,6 +2,7 @@ #define _FTL_NET_HANDLERS_HPP_ #include <functional> +#include <memory> namespace ftl { namespace net { @@ -13,10 +14,10 @@ typedef std::function<void(int)> sockerrorhandler_t; typedef std::function<void()> sockconnecthandler_t; typedef std::function<void(int)> sockdisconnecthandler_t; -typedef std::function<void(Socket&, int, std::string&)> datahandler_t; -typedef std::function<void(Socket&, int)> errorhandler_t; -typedef std::function<void(Socket&)> connecthandler_t; -typedef std::function<void(Socket&)> disconnecthandler_t; +typedef std::function<void(std::shared_ptr<Socket>, int, std::string&)> datahandler_t; +typedef std::function<void(std::shared_ptr<Socket>, int)> errorhandler_t; +typedef std::function<void(std::shared_ptr<Socket>)> connecthandler_t; +typedef std::function<void(std::shared_ptr<Socket>)> disconnecthandler_t; }; }; diff --git a/net/include/ftl/net/listener.hpp b/net/include/ftl/net/listener.hpp index 97171b5f1bf3ff9ff7ab4954ec2483f9e495ee05..55e49b3e10f6a10cf91740dbd920b99f0be2b1e9 100644 --- a/net/include/ftl/net/listener.hpp +++ b/net/include/ftl/net/listener.hpp @@ -27,7 +27,7 @@ class Listener { void close(); int _socket() { return descriptor_; } - void connection(Socket &s); + void connection(std::shared_ptr<Socket> s); void onConnection(connecthandler_t h) { handler_connect_.push_back(h); }; private: diff --git a/net/include/ftl/net/protocol.hpp b/net/include/ftl/net/protocol.hpp index 59de85de039fc56e47a459ffb527b82bf619870b..3e85f9c1f15ca7f867b66859648f6be4044f8589 100644 --- a/net/include/ftl/net/protocol.hpp +++ b/net/include/ftl/net/protocol.hpp @@ -1,6 +1,7 @@ #ifndef _FTL_NET_PROTOCOL_HPP_ #define _FTL_NET_PROTOCOL_HPP_ +#define FTL_PROTOCOL_RPC 0x0100 #define FTL_PROTOCOL_P2P 0x1000 #endif // _FTL_NET_PROTOCOL_HPP_ diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp index dd5dc8a8a1a67c0bd9f5ecad0d3fecba3e2553ab..bebcade26f37fe487675ebd5ca600ef1411fb083 100644 --- a/net/include/ftl/net/socket.hpp +++ b/net/include/ftl/net/socket.hpp @@ -3,6 +3,8 @@ #include <ftl/net.hpp> #include <ftl/net/handlers.hpp> +#include <ftl/net/dispatcher.hpp> +#include <ftl/net/protocol.hpp> #ifndef WIN32 #define INVALID_SOCKET -1 @@ -14,6 +16,8 @@ #include <winsock.h> #endif +#include <sstream> + namespace ftl { namespace net { @@ -25,9 +29,11 @@ class Socket { int close(); - int send(uint32_t service, std::string &data); - int send(uint32_t service, std::ostringstream &data); + int send(uint32_t service, const std::string &data); + int send(uint32_t service, std::stringstream &data) { return send(service, data.str()); }; int send(uint32_t service, void *data, int length); + + int send2(uint32_t service, const std::string &data1, const std::string &data2); //friend bool ftl::net::run(bool); @@ -35,6 +41,52 @@ class Socket { bool isConnected() { return m_sock != INVALID_SOCKET; }; bool isValid() { return m_valid; }; + + template <typename F> + void bind(const std::string &name, F func) { + //disp_.enforce_unique_name(name); + disp_.bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), + typename ftl::internal::func_kind_info<F>::args_kind()); + } + + template <typename... ARGS> + msgpack::object_handle call(const std::string &name, ARGS... args) { + bool hasreturned = false; + msgpack::object_handle result; + async_call(name, [result,hasreturned](msgpack::object_handle r) { + hasreturned = true; + result = r; + }, std::forward<ARGS>(args)...); + + // Loop the network + int limit = 10; + while (limit > 0 && !hasreturned) { + limit--; + ftl::net::wait(); + } + + return result; + } + + template <typename... ARGS> + void async_call( + const std::string &name, + std::function<void(msgpack::object_handle)> cb, + ARGS... args) { + auto args_obj = std::make_tuple(args...); + auto rpcid = rpcid__++; + auto call_obj = std::make_tuple(0,rpcid,name,args_obj); + + std::stringstream buf; + msgpack::pack(buf, call_obj); + + // Register the CB + callbacks_[rpcid] = cb; + + send(FTL_PROTOCOL_RPC, buf.str()); + } + + void dispatch(const std::string &b) { disp_.dispatch(b); } void onMessage(sockdatahandler_t handler) { m_handler = handler; } void onError(sockerrorhandler_t handler) {} @@ -55,6 +107,10 @@ class Socket { char *m_buffer; sockdatahandler_t m_handler; bool m_valid; + std::map<int, std::function<void(msgpack::object_handle)>> callbacks_; + ftl::net::Dispatcher disp_; + + static int rpcid__; static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently static const int BUFFER_SIZE = MAX_MESSAGE + 16; diff --git a/net/include/ftl/uri.hpp b/net/include/ftl/uri.hpp index 20cb83fce8fd23917e5156e070427bd7f78f4783..6da23f07dea076757df716773f579afb3fd9b02f 100644 --- a/net/include/ftl/uri.hpp +++ b/net/include/ftl/uri.hpp @@ -78,15 +78,15 @@ namespace ftl { SCHEME_OTHER }; - bool isValid() { return m_valid; }; - std::string &getHost() { return m_host; }; - int getPort() { return m_port; }; - scheme_t getProtocol() { return m_proto; }; - scheme_t getScheme() { return m_proto; }; - std::string &getPath() { return m_path; }; - std::string &getQuery() { return m_query; }; - std::string &getBaseURI() { return m_base; }; - std::string &getPathSegment(int n) { return m_pathseg[n]; }; + bool isValid() const { return m_valid; }; + const std::string &getHost() const { return m_host; }; + int getPort() const { return m_port; }; + scheme_t getProtocol() const { return m_proto; }; + scheme_t getScheme() const { return m_proto; }; + const std::string &getPath() const { return m_path; }; + const std::string &getQuery() const { return m_query; }; + const std::string &getBaseURI() const { return m_base; }; + const std::string &getPathSegment(int n) const { return m_pathseg[n]; }; private: bool m_valid; diff --git a/net/src/dispatcher.cpp b/net/src/dispatcher.cpp new file mode 100644 index 0000000000000000000000000000000000000000..294d1398b4b9a5fceae3dccdee60b6c62745972d --- /dev/null +++ b/net/src/dispatcher.cpp @@ -0,0 +1,78 @@ +#include <ftl/net/dispatcher.hpp> + +void ftl::net::Dispatcher::dispatch(const std::string &msg) { + auto unpacked = msgpack::unpack(msg.data(), msg.size()); + dispatch(unpacked.get()); +} + +void ftl::net::Dispatcher::dispatch(const msgpack::object &msg) { + switch (msg.via.array.size) { + case 3: + dispatch_notification(msg); + case 4: + dispatch_call(msg); + default: + return; + } +} + +void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { + call_t the_call; + msg.convert(the_call); + + // TODO: proper validation of protocol (and responding to it) + // auto &&type = std::get<0>(the_call); + // assert(type == 0); + + // auto &&id = std::get<1>(the_call); + auto &&name = std::get<2>(the_call); + auto &&args = std::get<3>(the_call); + + auto it_func = funcs_.find(name); + + if (it_func != end(funcs_)) { + try { + auto result = (it_func->second)(args); + // TODO SEND RESULTS + } catch (...) { + throw; + } + } +} + +void ftl::net::Dispatcher::dispatch_notification(msgpack::object const &msg) { + notification_t the_call; + msg.convert(the_call); + + // TODO: proper validation of protocol (and responding to it) + // auto &&type = std::get<0>(the_call); + // assert(type == static_cast<uint8_t>(request_type::notification)); + + auto &&name = std::get<1>(the_call); + auto &&args = std::get<2>(the_call); + + auto it_func = funcs_.find(name); + + if (it_func != end(funcs_)) { + try { + auto result = (it_func->second)(args); + } catch (...) { + throw; + } + } +} + +void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_t found, + std::size_t expected) { + if (found != expected) { + throw; + } +} + +void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) { + auto pos = funcs_.find(func); + if (pos != end(funcs_)) { + throw; + } +} + diff --git a/net/src/ice.cpp b/net/src/ice.cpp index 4b8aa88d609d056eaa330aa2b228cd9fda4dd0da..0929eaba638a241a6870bb797e8461a227d113cb 100644 --- a/net/src/ice.cpp +++ b/net/src/ice.cpp @@ -25,7 +25,7 @@ struct Candidate { uint16_t port; }; -int stun_internal(std::string &c, bool tcp, uint16_t lport, std::string &host, uint16_t port) { +int stun_internal(std::string &c, bool tcp, uint16_t lport, const std::string &host, uint16_t port) { int sockfd; sockaddr_in servaddr; sockaddr_in localaddr; diff --git a/net/src/listener.cpp b/net/src/listener.cpp index 4b9e8d1ef8d31875ef42da4b8426d35b5ae09c51..4b38ff4b37904c789dba50d3f91e769c74c339fe 100644 --- a/net/src/listener.cpp +++ b/net/src/listener.cpp @@ -22,6 +22,7 @@ typedef int socklen_t; using namespace ftl; using ftl::net::Listener; +using std::shared_ptr; int tcpListen(URI &uri) { int ssock; @@ -97,7 +98,7 @@ Listener::~Listener() { close(); } -void Listener::connection(Socket &s) { +void Listener::connection(shared_ptr<Socket> s) { for (auto h : handler_connect_) h(s); } diff --git a/net/src/net.cpp b/net/src/net.cpp index ad56f07b8a9e044972f3095574c7908ec199f827..3826df6bbce9be8251f71485d8edaad96235fbf3 100644 --- a/net/src/net.cpp +++ b/net/src/net.cpp @@ -145,7 +145,7 @@ bool _run(bool blocking, bool nodelay) { sockets.push_back(sock); // Call connection handlers - l->connection(*sock); + l->connection(sock); // TODO Save the ip address // deal with both IPv4 and IPv6: diff --git a/net/test/CMakeLists.txt b/net/test/CMakeLists.txt index 7d9a3198635257280287ad5408a87a1d3c2ea868..e565783b9e17719a3e91dec1ca8141f86a3f6191 100644 --- a/net/test/CMakeLists.txt +++ b/net/test/CMakeLists.txt @@ -7,6 +7,8 @@ add_executable(tests EXCLUDE_FROM_ALL ./ice.cpp ../src/ice.cpp ./uri.cpp + ./rpc.cpp + ../src/dispatcher.cpp ) target_include_directories(tests PUBLIC ${PROJECT_SOURCE_DIR}/include) target_link_libraries(tests uriparser) diff --git a/net/test/net_raw.cpp b/net/test/net_raw.cpp index 59a849064e3ae3a200c090bc5c65703701bb216a..c4a6fad3ebf2ec2a48acab3155795ad07bc1f38c 100644 --- a/net/test/net_raw.cpp +++ b/net/test/net_raw.cpp @@ -260,8 +260,8 @@ TEST_CASE("net::listen()", "[net]") { bool connected = false; - l->onConnection([&](Socket &s) { - REQUIRE( s.isConnected() ); + l->onConnection([&](shared_ptr<Socket> s) { + REQUIRE( s->isConnected() ); connected = true; }); diff --git a/net/test/rpc.cpp b/net/test/rpc.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9d48684fcbe50d0a2ac2a515ad89c314ecceaeee --- /dev/null +++ b/net/test/rpc.cpp @@ -0,0 +1,60 @@ +#include "catch.hpp" +#include <ftl/net/socket.hpp> +#include <iostream> + +TEST_CASE("Socket::bind()", "[rpc]") { + SECTION("no argument bind") { + auto s = new ftl::net::Socket(0); + bool called = false; + + s->bind("test1", [&]() { + called = true; + }); + + auto args_obj = std::make_tuple(); + auto call_obj = std::make_tuple(0,0,"test1",args_obj); + std::stringstream buf; + msgpack::pack(buf, call_obj); + + s->dispatch(buf.str()); + REQUIRE( called ); + } + + SECTION("one argument bind") { + auto s = new ftl::net::Socket(0); + bool called = false; + + s->bind("test1", [&](int a) { + called = true; + REQUIRE( a == 5 ); + }); + + auto args_obj = std::make_tuple(5); + auto call_obj = std::make_tuple(0,0,"test1",args_obj); + std::stringstream buf; + msgpack::pack(buf, call_obj); + + s->dispatch(buf.str()); + REQUIRE( called ); + } + + SECTION("two argument bind") { + auto s = new ftl::net::Socket(0); + bool called = false; + + s->bind("test1", [&](int a, float b) { + called = true; + REQUIRE( a == 5 ); + REQUIRE( b == 5.4f ); // Danger + }); + + auto args_obj = std::make_tuple(5, 5.4f); + auto call_obj = std::make_tuple(0,0,"test1",args_obj); + std::stringstream buf; + msgpack::pack(buf, call_obj); + + s->dispatch(buf.str()); + REQUIRE( called ); + } +} +