diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp index 056fcc6ea445bdfa5f77a917b69c5c3469f58eb1..264f5fc3b003119d292f5b541054bfdbe0ec9d0d 100644 --- a/net/include/ftl/net/socket.hpp +++ b/net/include/ftl/net/socket.hpp @@ -16,8 +16,12 @@ #endif #include <sstream> +#include <tuple> #include <type_traits> +# define ENABLE_IF(...) \ + typename std::enable_if<(__VA_ARGS__), bool>::type = true + extern bool _run(bool blocking, bool nodelay); namespace ftl { @@ -72,42 +76,49 @@ class Socket { void asyncCall(const std::string &name, std::function<void(const T&)> cb, ARGS... args); - // TODO use "call" instead of "acall" causes compiler to loop. /** * Blocking Remote Procedure Call. */ template <typename R, typename... ARGS> R call(const std::string &name, ARGS... args); - - /** - * Send data to given service number. - */ - int send(uint32_t service, const std::string &data); + + template <typename... ARGS> + int send(uint32_t s, ARGS... args); - /** - * Send with two distinct data source. Avoids the need to memcpy them to a - * single buffer. - */ - int send2(uint32_t service, const std::string &data1, - const std::string &data2); + void begin(uint32_t s); template <typename T> - int read(T *b, size_t count=1) { - static_assert(std::is_trivial<T>::value); - return read((char*)b, sizeof(T)*count); - } + Socket &operator<<(T &t); + + void end(); - //template <> + template <typename T> + int read(T *b, size_t count=1); + int read(char *b, size_t count); int read(std::string &s, size_t count=0); template <typename T> - int read(T &b) { - return read(&b); - } + int read(std::vector<T> &b, size_t count=0); + + template <typename T> + int read(T &b); + + template <typename T> + Socket &operator>>(T &t); + + //SocketStream stream(uint32_t service); size_t size() const { return header_->size-4; } + + void onError(std::function<void(Socket&, int err, const char *msg)> f) {} + void onConnect(std::function<void(Socket&)> f); + void onDisconnect(std::function<void(Socket&)> f) {} + + protected: + bool data(); // Process one message from socket + void error(); // Process one error from socket /** * Internal handlers for specific event types. This should be private but @@ -117,30 +128,47 @@ class Socket { void handshake1(const std::string &d); void handshake2(const std::string &d); /** @} */ - - //void onError(sockerrorhandler_t handler) {} - void onConnect(std::function<void(Socket&)> f); - //void onDisconnect(sockdisconnecthandler_t handler) {} - - protected: - bool data(); // Process one message from socket - void error(); // Process one error from socket private: // Functions void _connected(); void _updateURI(); void _dispatchReturn(const std::string &d); + + int _send(); + + template <typename... ARGS> + int _send(const std::string &t, ARGS... args); + + template <typename T, typename... ARGS> + int _send(const T *t, int s, ARGS... args); + + template <typename T, typename... ARGS> + int _send(const std::vector<T> &t, ARGS... args); + + template <typename... Types, typename... ARGS> + int _send(const std::tuple<Types...> &t, ARGS... args); + + template <typename T, typename... ARGS, + ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> + int _send(const T &t, ARGS... args); private: // Data bool valid_; bool connected_; int sock_; + + // Receive buffers size_t pos_; size_t gpos_; char *buffer_; ftl::net::Header *header_; char *data_; + // Send buffers + char *buffer_w_; + std::vector<iovec> send_vec_; + ftl::net::Header *header_w_; + std::string uri_; std::string peerid_; @@ -157,6 +185,76 @@ class Socket { // --- Inline Template Implementations ----------------------------------------- +template <typename... ARGS> +int Socket::send(uint32_t s, ARGS... args) { + header_w_->service = s; + header_w_->size = 4; + send_vec_.push_back({header_w_,sizeof(ftl::net::Header)}); + return _send(args...); +} + +template <typename T> +int Socket::read(T *b, size_t count) { + static_assert(std::is_trivial<T>::value, "Can only read trivial types"); + return read((char*)b, sizeof(T)*count); +} + +template <typename T> +int Socket::read(std::vector<T> &b, size_t count) { + count = (count == 0) ? size()/sizeof(T) : count; // TODO Round this! + if (b.size() != count) b.resize(count); + return read((char*)b.data(), sizeof(T)*count); +} + +template <typename T> +int Socket::read(T &b) { + if (std::is_array<T>::value) return read(&b,std::extent<T>::value); + else return read(&b); +} + +template <typename T> +Socket &Socket::operator>>(T &t) { + if (std::is_array<T>::value) read(&t,std::extent<T>::value); + else read(&t); + return *this; +} + +template <typename... ARGS> +int Socket::_send(const std::string &t, ARGS... args) { + send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); + header_w_->size += t.size(); + return t.size()+_send(args...); +} + +template <typename T, typename... ARGS> +int Socket::_send(const T *t, int s, ARGS... args) { + send_vec_.push_back({const_cast<char*>(t),(size_t)s}); + header_w_->size += s; + return s+_send(args...); +} + +template <typename T, typename... ARGS> +int Socket::_send(const std::vector<T> &t, ARGS... args) { + send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); + header_w_->size += t.size(); + return t.size()+_send(args...); +} + +template <typename... Types, typename... ARGS> +int Socket::_send(const std::tuple<Types...> &t, ARGS... args) { + send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)}); + header_w_->size += sizeof(t); + return sizeof(t)+_send(args...); +} + +template <typename T, typename... ARGS, + ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> +int Socket::_send(const T &t, ARGS... args) { + send_vec_.push_back({const_cast<T*>(&t),sizeof(T)}); + header_w_->size += sizeof(T); + return sizeof(T)+_send(args...); +} + //template <typename T, typename... ARGS> template <typename R, typename... ARGS> R Socket::call(const std::string &name, ARGS... args) { diff --git a/net/src/socket.cpp b/net/src/socket.cpp index 506847b3690959e9d8692f41114e0e1491daf820..6826b63de202474b457723b972bd8b054d6a6450 100644 --- a/net/src/socket.cpp +++ b/net/src/socket.cpp @@ -122,9 +122,13 @@ static int wsConnect(URI &uri) { Socket::Socket(int s) : sock_(s), pos_(0), proto_(nullptr) { valid_ = true; + buffer_ = new char[BUFFER_SIZE]; header_ = (Header*)buffer_; data_ = buffer_+sizeof(Header); + buffer_w_ = new char[BUFFER_SIZE]; + header_w_ = (Header*)buffer_w_; + connected_ = false; _updateURI(); @@ -135,6 +139,8 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { buffer_ = new char[BUFFER_SIZE]; header_ = (Header*)buffer_; data_ = buffer_+sizeof(Header); + buffer_w_ = new char[BUFFER_SIZE]; + header_w_ = (Header*)buffer_w_; URI uri(pUri); @@ -390,38 +396,10 @@ void Socket::_connected() { //connect_handlers_.clear(); } -int Socket::send(uint32_t service, const std::string &data) { - ftl::net::Header h; - h.size = data.size()+4; - h.service = service; - - iovec vec[2]; - vec[0].iov_base = &h; - vec[0].iov_len = sizeof(h); - vec[1].iov_base = const_cast<char*>(data.data()); - vec[1].iov_len = data.size(); - - ::writev(sock_, &vec[0], 2); - - return 0; -} - -int Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) { - ftl::net::Header h; - h.size = data1.size()+4+data2.size(); - h.service = service; - - iovec vec[3]; - vec[0].iov_base = &h; - vec[0].iov_len = sizeof(h); - vec[1].iov_base = const_cast<char*>(data1.data()); - vec[1].iov_len = data1.size(); - vec[2].iov_base = const_cast<char*>(data2.data()); - vec[2].iov_len = data2.size(); - - ::writev(sock_, &vec[0], 3); - - return 0; +int Socket::_send() { + int c = ::writev(sock_, send_vec_.data(), send_vec_.size()); + send_vec_.clear(); + return c; } Socket::~Socket() { @@ -431,5 +409,7 @@ Socket::~Socket() { // Delete socket buffer if (buffer_) delete [] buffer_; buffer_ = NULL; + if (buffer_w_) delete [] buffer_w_; + buffer_w_ = NULL; } diff --git a/net/test/socket_unit.cpp b/net/test/socket_unit.cpp index 8950335ba2217ac003d9847174cde463749f2e3a..27005e21b8f2bd2dea9aa011b7a331dee7799ca5 100644 --- a/net/test/socket_unit.cpp +++ b/net/test/socket_unit.cpp @@ -67,13 +67,39 @@ extern ssize_t recv(int sd, void *buf, size_t n, int f) { } extern ssize_t writev(int sd, const struct iovec *v, int cnt) { - // TODO Use count incase more sources exist... - size_t len = v[0].iov_len+v[1].iov_len; - char buf[len]; - std::memcpy(&buf[0],v[0].iov_base,v[0].iov_len); - std::memcpy(&buf[v[0].iov_len],v[1].iov_base,v[1].iov_len); + size_t len = 0; //v[0].iov_len+v[1].iov_len; + char buf[1000]; + char *bufp = &buf[0]; + + for (auto i=0; i<cnt; i++) { + std::memcpy(bufp,v[i].iov_base,v[i].iov_len); + len += v[i].iov_len; + bufp += v[i].iov_len; + } + fakedata[sd] = std::string(&buf[0], len); - return 0; + return len; +} + +uint32_t get_service(int sd) { + auto h = (ftl::net::Header*)fakedata[sd].data(); + return h->service; +} + +size_t get_size(int sd) { + auto h = (ftl::net::Header*)fakedata[sd].data(); + return h->size-4; +} + +template <typename T> +T get_value(int sd) { + auto h = (T*)(fakedata[sd].data()+sizeof(ftl::net::Header)); + return *h; +} + +template <> +std::string get_value(int sd) { + return std::string((char*)(fakedata[sd].data()+sizeof(ftl::net::Header)),get_size(sd)); } static std::function<void()> waithandler; @@ -172,6 +198,83 @@ TEST_CASE("Socket receive RPC", "[rpc]") { } } +TEST_CASE("Socket::operator>>()", "[io]") { + MockSocket s; + + SECTION("stream ints") { + int i[2]; + i[0] = 99; + i[1] = 101; + fake_send(0, 100, std::string((char*)&i,2*sizeof(int))); + + i[0] = 0; + i[1] = 0; + s.mock_data(); // Force a message read, but no protocol... + + REQUIRE( s.size() == 2*sizeof(int) ); + s >> i; + REQUIRE( i[0] == 99 ); + REQUIRE( i[1] == 101 ); + } +} + +TEST_CASE("Socket::send()", "[io]") { + MockSocket s; + + SECTION("send an int") { + int i = 607; + s.send(100,i); + + REQUIRE( get_service(0) == 100 ); + REQUIRE( get_size(0) == sizeof(int) ); + REQUIRE( get_value<int>(0) == 607 ); + } + + SECTION("send a string") { + std::string str("hello world"); + s.send(100,str); + + REQUIRE( get_service(0) == 100 ); + REQUIRE( get_size(0) == str.size() ); + REQUIRE( get_value<std::string>(0) == "hello world" ); + } + + SECTION("send const char* string") { + s.send(100,"hello world"); + + REQUIRE( get_service(0) == 100 ); + REQUIRE( get_size(0) == 11 ); + REQUIRE( get_value<std::string>(0) == "hello world" ); + } + + SECTION("send const char* array") { + s.send(100,"hello world",10); + + REQUIRE( get_service(0) == 100 ); + REQUIRE( get_size(0) == 10 ); + REQUIRE( get_value<std::string>(0) == "hello worl" ); + } + + SECTION("send a tuple") { + auto tup = std::make_tuple(55,66,true,6.7); + s.send(100,tup); + + REQUIRE( get_service(0) == 100 ); + REQUIRE( get_size(0) == sizeof(tup) ); + REQUIRE( get_value<decltype(tup)>(0) == tup ); + } + + SECTION("send multiple strings") { + std::string str("hello "); + std::string str2("world"); + s.send(100,str,str2); + + REQUIRE( get_service(0) == 100 ); + REQUIRE( get_size(0) == str.size()+str2.size() ); + REQUIRE( get_value<std::string>(0) == "hello world" ); + } +} + TEST_CASE("Socket::read()", "[io]") { MockSocket s;