diff --git a/common/cpp/src/config.cpp.in b/common/cpp/src/config.cpp.in index 353e7ccbaa60ef204227a6e80c5bdde048ef7c9d..c0a62b67b1d3fcdcaf62e6c3802d3fa897b54229 100644 --- a/common/cpp/src/config.cpp.in +++ b/common/cpp/src/config.cpp.in @@ -1,3 +1,4 @@ +#include <ftl/config.h> const char *FTL_VERSION_LONG = "@VERSION@"; const char *FTL_VERSION = @FTL_VERSION@; diff --git a/net/cpp/CMakeLists.txt b/net/cpp/CMakeLists.txt index 65f902bdfc66df8ccc7ed7d20b66eb1309e61bcd..82658a58fb1c97535e6284e691804db534eab2b3 100644 --- a/net/cpp/CMakeLists.txt +++ b/net/cpp/CMakeLists.txt @@ -6,11 +6,9 @@ include_directories(${PROJECT_SOURCE_DIR}/net/cpp/include) add_library(ftlnet src/net.cpp src/listener.cpp - src/socket.cpp + src/peer.cpp src/dispatcher.cpp - src/protocol.cpp src/ws_internal.cpp - src/p2p.cpp ) check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE) diff --git a/net/cpp/include/ftl/net.hpp b/net/cpp/include/ftl/net.hpp index c0915ed7559c3a68660aa1d46055f830c31a14e0..f51cb0f01a8712fd66b39729828e3c93855fba63 100644 --- a/net/cpp/include/ftl/net.hpp +++ b/net/cpp/include/ftl/net.hpp @@ -8,7 +8,7 @@ namespace ftl { namespace net { class Listener; -class Socket; +class Peer; const int MAX_CONNECTIONS = 100; // TODO Is this a good number? @@ -23,7 +23,7 @@ std::shared_ptr<Listener> listen(const char *uri); * Accepts tcp, ipc and ws URIs. An example would be: * ws://ftl.utu.fi/api/connect */ -std::shared_ptr<Socket> connect(const char *uri); +std::shared_ptr<Peer> connect(const char *uri); /** * Start a loop to continually check for network messages. If the async diff --git a/net/cpp/include/ftl/net/dispatcher.hpp b/net/cpp/include/ftl/net/dispatcher.hpp index c80849e7d8b90a73a242779810fefd2609b60601..67840a78b205c072a1de44e6e7153fcba93e1041 100644 --- a/net/cpp/include/ftl/net/dispatcher.hpp +++ b/net/cpp/include/ftl/net/dispatcher.hpp @@ -39,13 +39,14 @@ namespace internal { } namespace net { -class Socket; +class Peer; class Dispatcher { public: Dispatcher() {} - void dispatch(Socket &, const std::string &msg); + //void dispatch(Peer &, const std::string &msg); + void dispatch(Peer &, const msgpack::object &msg); template <typename F> void bind(std::string const &name, F func, @@ -134,10 +135,9 @@ class Dispatcher { std::size_t expected); void enforce_unique_name(std::string const &func); - - void dispatch(Socket &, const msgpack::object &msg); - void dispatch_call(Socket &, const msgpack::object &msg); - void dispatch_notification(Socket &, msgpack::object const &msg); + + void dispatch_call(Peer &, const msgpack::object &msg); + void dispatch_notification(Peer &, msgpack::object const &msg); }; } diff --git a/net/cpp/include/ftl/net/listener.hpp b/net/cpp/include/ftl/net/listener.hpp index 09c1f68f0bea4a047010938ea72387f52c6749ba..4c3bb83ac6bf903a23abd2d55c2b38ab3b9e4739 100644 --- a/net/cpp/include/ftl/net/listener.hpp +++ b/net/cpp/include/ftl/net/listener.hpp @@ -11,6 +11,7 @@ #endif #include <ftl/net/handlers.hpp> +#include <ftl/net/peer.hpp> #include <vector> @@ -31,7 +32,7 @@ class Listener { void setProtocol(Protocol *p) { default_proto_ = p; } - void connection(std::shared_ptr<Socket> &s); + void connection(std::shared_ptr<Peer> &s); void onConnection(connecthandler_t h) { handler_connect_.push_back(h); }; private: diff --git a/net/cpp/include/ftl/net/socket.hpp b/net/cpp/include/ftl/net/peer.hpp similarity index 55% rename from net/cpp/include/ftl/net/socket.hpp rename to net/cpp/include/ftl/net/peer.hpp index 704ef00cc2aec9a901fb3f591a63c77cb783d53c..3b65d3b44978527f18b98212f4e1580807bc771d 100644 --- a/net/cpp/include/ftl/net/socket.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -1,20 +1,17 @@ -#ifndef _FTL_NET_SOCKET_HPP_ -#define _FTL_NET_SOCKET_HPP_ +#ifndef _FTL_NET_PEER_HPP_ +#define _FTL_NET_PEER_HPP_ #define GLOG_NO_ABBREVIATED_SEVERITIES #include <glog/logging.h> #include <ftl/net.hpp> #include <ftl/net/protocol.hpp> #include <ftl/uri.hpp> +#include <ftl/uuid.hpp> #ifndef WIN32 #define INVALID_SOCKET -1 #include <netinet/in.h> -#endif - -#ifdef WIN32 -//#include <windows.h> -//#include <winsock.h> +#else #include <winsock2.h> #endif @@ -28,6 +25,7 @@ typename std::enable_if<(__VA_ARGS__), bool>::type = true extern bool _run(bool blocking, bool nodelay); +extern int setDescriptors(); namespace ftl { namespace net { @@ -53,26 +51,40 @@ struct decrypt{};*/ * A single socket connection object, to be constructed using the connect() * function and not to be created directly. */ -class Socket { +class Peer { public: friend bool ::_run(bool blocking, bool nodelay); - public: - explicit Socket(const char *uri); - explicit Socket(int s); - ~Socket(); + friend int ::setDescriptors(); - int close(); - - void setProtocol(Protocol *p); - Protocol *protocol() const { return proto_; } + enum Status { + kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting + }; + public: + explicit Peer(const char *uri); + explicit Peer(int s); + ~Peer(); + /** - * Get the internal OS dependent socket. + * Close the peer if open. Setting retry parameter to true will initiate + * backoff retry attempts. */ - int _socket() const { return sock_; }; + void close(bool retry=false); - bool isConnected() const { return sock_ != INVALID_SOCKET && connected_; }; - bool isValid() const { return valid_ && sock_ != INVALID_SOCKET; }; + bool isConnected() const { + return sock_ != INVALID_SOCKET && status_ == kConnected; + }; + + bool isValid() const { + return status_ != kInvalid && sock_ != INVALID_SOCKET; + }; + + Status status() const { return status_; } + + uint32_t getFTLVersion() const { return version_; } + uint8_t getFTLMajor() const { return version_ >> 16; } + uint8_t getFTLMinor() const { return (version_ >> 8) & 0xFF; } + uint8_t getFTLPatch() const { return version_ & 0xFF; } /** * Get the sockets protocol, address and port as a url string. This will be @@ -80,7 +92,15 @@ class Socket { */ std::string getURI() const { return uri_; }; - std::string to_string() const { return peerid_; }; + /** + * Get the UUID for this peer. + */ + const ftl::UUID &id() const { return peerid_; }; + + /** + * Get the peer id as a string. + */ + std::string to_string() const { return peerid_.to_string(); }; /** * Non-blocking Remote Procedure Call using a callback function. @@ -91,51 +111,44 @@ class Socket { ARGS... args); /** - * Blocking Remote Procedure Call. + * Blocking Remote Procedure Call using a string name. */ template <typename R, typename... ARGS> R call(const std::string &name, ARGS... args); - - template <typename... ARGS> - int send(uint32_t s, ARGS... args); - void begin(uint32_t s); - - template <typename T> - Socket &operator<<(T &t); - - void end(); + /** + * Non-blocking send using RPC function, but with no return value. + */ + template <typename... ARGS> + int send(const std::string &name, ARGS... args); - template <typename T> - int read(T *b, size_t count=1); + /** + * Bind a function to an RPC call name. + */ + template <typename F> + void bind(const std::string &name, F func); - int read(char *b, size_t count); - int read(std::string &s, size_t count=0); - - template <typename T> - int read(std::vector<T> &b, size_t count=0); - - template <typename T> - int read(T &b); - - template <typename T> - Socket &operator>>(T &t); - - //SocketStream stream(uint32_t service); + //void onError(std::function<void(Socket&, int err, const char *msg)> &f) {} + void onConnect(std::function<void()> &f); + void onDisconnect(std::function<void()> &f) {} - size_t size() const { return header_->size-4; } - - void onError(std::function<void(Socket&, int err, const char *msg)> &f) {} - void onConnect(std::function<void(Socket&)> &f); - void onDisconnect(std::function<void(Socket&)> &f) {} + public: + static const int kMaxMessage = 10*1024*1024; // 10Mb currently protected: - bool data(); // Process one message from socket - void error(); // Process one error from socket + bool data(); // Process one message from socket + void socketError(); // Process one error from socket + void error(int e); + + /** + * Get the internal OS dependent socket. + * TODO(nick) Work out if this should be private. + */ + int _socket() const { return sock_; }; /** * Internal handlers for specific event types. This should be private but - * is current here for testing purposes. + * is currently here for testing purposes. * @{ */ void handshake1(); @@ -166,52 +179,50 @@ class Socket { int _send(const T &t, ARGS... args); private: // Data - bool valid_; - bool connected_; + Status status_; int sock_; ftl::URI::scheme_t scheme_; + uint32_t version_; // Receive buffers - size_t pos_; - size_t gpos_; - char *buffer_; - ftl::net::Header *header_; - char *data_; + msgpack::unpacker recv_buf_; // Send buffers - char *buffer_w_; - std::vector<iovec> send_vec_; - ftl::net::Header *header_w_; + msgpack::vrefbuffer send_buf_; std::string uri_; - std::string peerid_; - std::string remote_proto_; - - Protocol *proto_; + ftl::UUID peerid_; - std::vector<std::function<void(Socket&)>> connect_handlers_; + ftl::net::Dispatcher disp_; + std::vector<std::function<void()>> open_handlers_; + //std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_ + std::vector<std::function<void()>> close_handlers_; std::map<int, std::unique_ptr<virtual_caller>> callbacks_; static int rpcid__; - - static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently - static const int BUFFER_SIZE = MAX_MESSAGE + 16; }; // --- Inline Template Implementations ----------------------------------------- template <typename... ARGS> -int Socket::send(uint32_t s, ARGS... args) { +int Peer::send(const std::string &s, ARGS... args) { // Leave a blank entry for websocket header - if (scheme_ == ftl::URI::SCHEME_WS) send_vec_.push_back({nullptr,0}); - - header_w_->service = s; - header_w_->size = 4; - send_vec_.push_back({header_w_,sizeof(ftl::net::Header)}); - return _send(args...); + if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); + //msgpack::pack(send_buf_, std::make_tuple(s, std::make_tuple(args...))); + auto args_obj = std::make_tuple(args...); + auto call_obj = std::make_tuple(0,s,args_obj); + msgpack::pack(send_buf_, call_obj); + return _send(); } -template <typename T> +template <typename F> +void Peer::bind(const std::string &name, F func) { + disp_.bind(name, func, + typename ftl::internal::func_kind_info<F>::result_kind(), + typename ftl::internal::func_kind_info<F>::args_kind()); +} + +/*template <typename T> int Socket::read(T *b, size_t count) { static_assert(std::is_trivial<T>::value, "Can only read trivial types"); return read((char*)b, sizeof(T)*count); @@ -235,47 +246,52 @@ Socket &Socket::operator>>(T &t) { if (std::is_array<T>::value) read(&t,std::extent<T>::value); else read(&t); return *this; -} +}*/ -template <typename... ARGS> -int Socket::_send(const std::string &t, ARGS... args) { - send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); - header_w_->size += t.size(); +/*template <typename... ARGS> +int Peer::_send(const std::string &t, ARGS... args) { + //send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); + //header_w_->size += t.size(); + msgpack::pack(send_buf_, t); return _send(args...)+t.size(); } template <typename... ARGS> -int Socket::_send(const ftl::net::array &b, ARGS... args) { - send_vec_.push_back({const_cast<char*>(std::get<0>(b)),std::get<1>(b)}); - header_w_->size += std::get<1>(b); +int Peer::_send(const ftl::net::array &b, ARGS... args) { + //send_vec_.push_back({const_cast<char*>(std::get<0>(b)),std::get<1>(b)}); + //header_w_->size += std::get<1>(b); + msgpack::pack(send_buf_, msgpack::type::raw_ref(std::get<0>(b), std::get<1>(b))); return std::get<1>(b)+_send(args...); } template <typename T, typename... ARGS> -int Socket::_send(const std::vector<T> &t, ARGS... args) { - send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); - header_w_->size += t.size(); +int Peer::_send(const std::vector<T> &t, ARGS... args) { + //send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); + //header_w_->size += t.size(); + msgpack::pack(send_buf_, t); return _send(args...)+t.size(); } template <typename... Types, typename... ARGS> -int Socket::_send(const std::tuple<Types...> &t, ARGS... args) { - send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)}); - header_w_->size += sizeof(t); +int Peer::_send(const std::tuple<Types...> &t, ARGS... args) { + //send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)}); + //header_w_->size += sizeof(t); + msgpack::pack(send_buf_, t); return sizeof(t)+_send(args...); } template <typename T, typename... ARGS, ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> -int Socket::_send(const T &t, ARGS... args) { - send_vec_.push_back({const_cast<T*>(&t),sizeof(T)}); - header_w_->size += sizeof(T); +int Peer::_send(const T &t, ARGS... args) { + //send_vec_.push_back({const_cast<T*>(&t),sizeof(T)}); + //header_w_->size += sizeof(T); + msgpack::pack(send_buf_, t); return sizeof(T)+_send(args...); -} +}*/ //template <typename T, typename... ARGS> template <typename R, typename... ARGS> -R Socket::call(const std::string &name, ARGS... args) { +R Peer::call(const std::string &name, ARGS... args) { bool hasreturned = false; R result; asyncCall<R>(name, [&result,&hasreturned](const R &r) { @@ -294,7 +310,7 @@ R Socket::call(const std::string &name, ARGS... args) { } template <typename T, typename... ARGS> -void Socket::asyncCall( +void Peer::asyncCall( const std::string &name, std::function<void(const T&)> cb, ARGS... args) { @@ -310,7 +326,7 @@ void Socket::asyncCall( // Register the CB callbacks_[rpcid] = std::make_unique<caller<T>>(cb); - send(FTL_PROTOCOL_RPC, buf.str()); + send("__rpc__", buf.str()); } }; diff --git a/net/cpp/include/ftl/net/protocol.hpp b/net/cpp/include/ftl/net/protocol.hpp index 9e49358a4ccdde7377228f06b79c2345b20b2662..0fc0f7083f0c4b5957e55c9c791a38f2db493e91 100644 --- a/net/cpp/include/ftl/net/protocol.hpp +++ b/net/cpp/include/ftl/net/protocol.hpp @@ -1,98 +1,22 @@ #ifndef _FTL_NET_PROTOCOL_HPP_ #define _FTL_NET_PROTOCOL_HPP_ +#include <ftl/uuid.hpp> #include <ftl/net/func_traits.hpp> #include <ftl/net/dispatcher.hpp> -#include <map> -#include <string> +#include <ftl/config.h> +#include <tuple> -#define FTL_PROTOCOL_HS1 0x0001 // Handshake step 1 -#define FTL_PROTOCOL_HS2 0x0002 // Handshake step 2 -#define FTL_PROTOCOL_RPC 0x0100 -#define FTL_PROTOCOL_RPCRETURN 0x0101 - -#define FTL_PROTOCOL_FREE 0x1000 // Custom protocols above this namespace ftl { namespace net { -class Reader; -class Socket; - -#pragma pack(push,1) - -struct Header { - uint32_t size; - uint32_t service; -}; - -struct Handshake { - uint64_t magic; - uint32_t name_size; - uint32_t proto_size; -}; - -#pragma pack(pop) - -static const uint64_t MAGIC = 0x1099340053640912; - -/** - * Each instance of this Protocol class represents a specific protocol. A - * protocol is a set of RPC bindings and raw message handlers. A protocol is - * identified, selected and validated using an automatically generated hash of - * all its supported bindings. The choice of protocol for a socket is made - * during the initial connection handshake. - */ -class Protocol { - public: - friend class Socket; - - public: - explicit Protocol(const std::string &id); - ~Protocol(); - - /** - * Bind a function to an RPC call name. - */ - template <typename F> - void bind(const std::string &name, F func); - - /** - * Bind a function to a raw message type. - */ - void bind(int service, std::function<void(uint32_t,Socket&)> func); - - // broadcast? - - const std::string &id() const { return id_; } - - static Protocol *find(const std::string &id); - - //protected: - void dispatchRPC(Socket &, const std::string &d); - void dispatchReturn(Socket &, const std::string &d); - void dispatchRaw(uint32_t service, Socket &); - - void addSocket(std::shared_ptr<Socket> s); - void removeSocket(const Socket &s); - - private: - ftl::net::Dispatcher disp_; - std::map<uint32_t,std::function<void(uint32_t,Socket&)>> handlers_; - std::string id_; - - static std::map<std::string,Protocol*> protocols__; -}; - -// --- Template Implementations ------------------------------------------------ +typedef std::tuple<uint64_t, uint32_t, ftl::UUID> Handshake; -template <typename F> -void Protocol::bind(const std::string &name, F func) { - disp_.bind(name, func, - typename ftl::internal::func_kind_info<F>::result_kind(), - typename ftl::internal::func_kind_info<F>::args_kind()); -} +static const uint64_t kMagic = 0x1099340053640912; +static const uint32_t kVersion = (FTL_VERSION_MAJOR << 16) + + (FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH; }; }; diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp index 0d95a2a5528599dfbc3bc98e3a1aac6b6dbea0ba..adc4758d0de3b918ba8e8a8c4ca770e08ff48308 100644 --- a/net/cpp/src/dispatcher.cpp +++ b/net/cpp/src/dispatcher.cpp @@ -1,10 +1,10 @@ #define GLOG_NO_ABBREVIATED_SEVERITIES #include <glog/logging.h> #include <ftl/net/dispatcher.hpp> -#include <ftl/net/socket.hpp> +#include <ftl/net/peer.hpp> #include <iostream> -using ftl::net::Socket; +using ftl::net::Peer; /*static std::string hexStr(const std::string &s) { @@ -17,13 +17,13 @@ using ftl::net::Socket; return ss.str(); }*/ -void ftl::net::Dispatcher::dispatch(Socket &s, const std::string &msg) { +//void ftl::net::Dispatcher::dispatch(Peer &s, const std::string &msg) { //std::cout << "Received dispatch : " << hexStr(msg) << std::endl; - auto unpacked = msgpack::unpack(msg.data(), msg.size()); - dispatch(s, unpacked.get()); -} +// auto unpacked = msgpack::unpack(msg.data(), msg.size()); +// dispatch(s, unpacked.get()); +//} -void ftl::net::Dispatcher::dispatch(Socket &s, const msgpack::object &msg) { +void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { switch (msg.via.array.size) { case 3: dispatch_notification(s, msg); break; @@ -35,7 +35,7 @@ void ftl::net::Dispatcher::dispatch(Socket &s, const msgpack::object &msg) { } } -void ftl::net::Dispatcher::dispatch_call(Socket &s, const msgpack::object &msg) { +void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { call_t the_call; msg.convert(the_call); @@ -57,26 +57,26 @@ void ftl::net::Dispatcher::dispatch_call(Socket &s, const msgpack::object &msg) response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); std::stringstream buf; msgpack::pack(buf, res_obj); - s.send(FTL_PROTOCOL_RPCRETURN, buf.str()); + s.send("__return__", buf.str()); } catch (const std::exception &e) { //throw; //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); std::stringstream buf; msgpack::pack(buf, res_obj); - s.send(FTL_PROTOCOL_RPCRETURN, buf.str()); + s.send("__return__", buf.str()); } catch (int e) { //throw; //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); std::stringstream buf; msgpack::pack(buf, res_obj); - s.send(FTL_PROTOCOL_RPCRETURN, buf.str()); + s.send("__return__", buf.str()); } } } -void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object const &msg) { +void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) { notification_t the_call; msg.convert(the_call); @@ -86,6 +86,8 @@ void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object cons auto &&name = std::get<1>(the_call); auto &&args = std::get<2>(the_call); + + LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); auto it_func = funcs_.find(name); @@ -95,6 +97,8 @@ void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object cons } catch (int e) { throw e; } + } else { + LOG(ERROR) << "Missing handler for incoming message"; } } diff --git a/net/cpp/src/listener.cpp b/net/cpp/src/listener.cpp index 4a9deb08590409f4ba9fe140adb7d2735710f26e..6e76aa20e702dff5c9965822e2befeb0a6bb3804 100644 --- a/net/cpp/src/listener.cpp +++ b/net/cpp/src/listener.cpp @@ -3,7 +3,7 @@ #include <ftl/uri.hpp> #include <ftl/net/listener.hpp> -#include <ftl/net/socket.hpp> +#include <ftl/net/peer.hpp> #include <ftl/net/protocol.hpp> #include <iostream> @@ -24,10 +24,10 @@ typedef int socklen_t; #endif -using namespace ftl; using ftl::net::Listener; using std::shared_ptr; -using ftl::net::Socket; +using ftl::net::Peer; +using ftl::URI; int tcpListen(URI &uri) { int ssock; @@ -113,10 +113,10 @@ Listener::~Listener() { close(); } -void Listener::connection(shared_ptr<Socket> &s) { - Handshake hs1; +void Listener::connection(shared_ptr<Peer> &s) { + /*Handshake hs1; hs1.magic = ftl::net::MAGIC; - hs1.name_size = 0; + memset(hs1.id, 0, 16); if (default_proto_) { s->setProtocol(default_proto_); @@ -129,7 +129,7 @@ void Listener::connection(shared_ptr<Socket> &s) { } LOG(INFO) << "Handshake initiated with " << s->getURI(); - for (auto h : handler_connect_) h(s); + for (auto h : handler_connect_) h(s);*/ } void Listener::close() { diff --git a/net/cpp/src/net.cpp b/net/cpp/src/net.cpp index 3ea57c1c483bc4db84012ebd79cfc6d5c82182d6..3992447cbacdfaa605a4c98d7bb9ea190d47c131 100644 --- a/net/cpp/src/net.cpp +++ b/net/cpp/src/net.cpp @@ -1,6 +1,6 @@ #include <ftl/net.hpp> #include <ftl/net/listener.hpp> -#include <ftl/net/socket.hpp> +#include <ftl/net/peer.hpp> #ifdef WIN32 #include <Ws2tcpip.h> @@ -13,9 +13,9 @@ using namespace std; using namespace std::chrono; using ftl::net::Listener; -using ftl::net::Socket; +using ftl::net::Peer; - std::vector<shared_ptr<ftl::net::Socket>> sockets; +std::vector<shared_ptr<ftl::net::Peer>> peers; static std::vector<shared_ptr<ftl::net::Listener>> listeners; static fd_set sfdread; static fd_set sfderror; @@ -45,7 +45,7 @@ static fd_set sfderror; return freeclient; }*/ -static int setDescriptors() { +int setDescriptors() { //Reset all file descriptors FD_ZERO(&sfdread); FD_ZERO(&sfderror); @@ -62,7 +62,7 @@ static int setDescriptors() { } //Set the file descriptors for each client - for (auto s : sockets) { + for (auto s : peers) { if (s != nullptr && s->isValid()) { if (s->_socket() > n) { @@ -83,18 +83,18 @@ shared_ptr<Listener> ftl::net::listen(const char *uri) { return l; } -shared_ptr<Socket> ftl::net::connect(const char *uri) { - shared_ptr<Socket> s(new Socket((uri == NULL) ? "" : uri)); - sockets.push_back(s); +shared_ptr<Peer> ftl::net::connect(const char *uri) { + shared_ptr<Peer> s(new Peer((uri == NULL) ? "" : uri)); + peers.push_back(s); return s; } void ftl::net::stop() { - for (auto s : sockets) { + for (auto s : peers) { s->close(); } - sockets.clear(); + peers.clear(); for (auto l : listeners) { l->close(); @@ -150,8 +150,8 @@ bool _run(bool blocking, bool nodelay) { int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); if (csock != INVALID_SOCKET) { - auto sock = make_shared<Socket>(csock); - sockets.push_back(sock); + auto sock = make_shared<Peer>(csock); + peers.push_back(sock); // Call connection handlers l->connection(sock); @@ -162,22 +162,22 @@ bool _run(bool blocking, bool nodelay) { } //Also check each clients socket to see if any messages or errors are waiting - for (auto s : sockets) { + for (auto s : peers) { if (s != NULL && s->isValid()) { //If message received from this client then deal with it if (FD_ISSET(s->_socket(), &sfdread)) { repeat |= s->data(); } if (FD_ISSET(s->_socket(), &sfderror)) { - s->error(); + s->socketError(); } } else if (s != NULL) { // Erase it - for (auto i=sockets.begin(); i!=sockets.end(); i++) { + for (auto i=peers.begin(); i!=peers.end(); i++) { if ((*i) == s) { std::cout << "REMOVING SOCKET" << std::endl; - sockets.erase(i); break; + peers.erase(i); break; } } } diff --git a/net/cpp/src/socket.cpp b/net/cpp/src/peer.cpp similarity index 78% rename from net/cpp/src/socket.cpp rename to net/cpp/src/peer.cpp index 61ab82d727a1ea1d997c0170c23a099e33744c80..5154436199f4a1623bc7b981404f76ea46090cd3 100644 --- a/net/cpp/src/socket.cpp +++ b/net/cpp/src/peer.cpp @@ -4,8 +4,9 @@ #include <fcntl.h> #include <ftl/uri.hpp> -#include <ftl/net/socket.hpp> +#include <ftl/net/peer.hpp> #include <ftl/net/ws_internal.hpp> +#include <ftl/config.h> #include "net_internal.hpp" #ifndef WIN32 @@ -28,13 +29,13 @@ #include <iostream> #include <memory> #include <algorithm> +#include <tuple> -using namespace ftl; -using ftl::net::Socket; -using ftl::net::Protocol; +using std::tuple; +using std::get; +using ftl::net::Peer; using ftl::URI; using ftl::net::ws_connect; -using namespace std; /*static std::string hexStr(const std::string &s) { @@ -47,7 +48,7 @@ using namespace std; return ss.str(); }*/ -int Socket::rpcid__ = 0; +int Peer::rpcid__ = 0; // TODO(nick) Move to tcp_internal.cpp static int tcpConnect(URI &uri) { @@ -122,32 +123,33 @@ static int tcpConnect(URI &uri) { return csocket; } -Socket::Socket(int s) : sock_(s), pos_(0), proto_(nullptr) { - valid_ = true; - - buffer_ = new char[BUFFER_SIZE]; - header_ = (Header*)buffer_; - data_ = buffer_+sizeof(Header); - buffer_w_ = new char[BUFFER_SIZE]; - header_w_ = (Header*)buffer_w_; +Peer::Peer(int s) : sock_(s) { + status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; + _updateURI(); - connected_ = false; + // Send the initiating handshake if valid + if (status_ == kConnecting) { + // Install return handshake handler. + bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { + if (magic != ftl::net::kMagic) { + close(); + LOG(ERROR) << "Invalid magic during handshake"; + } else { + status_ = kConnected; + version_ = version; + } + }); - _updateURI(); + ftl::UUID uuid; + + send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid); + } } -Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { - // Allocate buffer - buffer_ = new char[BUFFER_SIZE]; - header_ = (Header*)buffer_; - data_ = buffer_+sizeof(Header); - buffer_w_ = new char[BUFFER_SIZE]; - header_w_ = (Header*)buffer_w_; - +Peer::Peer(const char *pUri) : uri_(pUri) { URI uri(pUri); - valid_ = false; - connected_ = false; + status_ = kInvalid; sock_ = INVALID_SOCKET; scheme_ = uri.getProtocol(); @@ -161,7 +163,7 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { fcntl(sock_, F_SETFL, O_NONBLOCK); #endif - valid_ = true; + status_ = kConnecting; } else if (uri.getProtocol() == URI::SCHEME_WS) { LOG(INFO) << "Websocket connect " << uri.getPath(); sock_ = tcpConnect(uri); @@ -181,13 +183,13 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { fcntl(sock_, F_SETFL, O_NONBLOCK); #endif - valid_ = true; + status_ = kConnecting; } else { LOG(ERROR) << "Unrecognised connection protocol: " << pUri; } } -void Socket::_updateURI() { +void Peer::_updateURI() { sockaddr_storage addr; int rsize = sizeof(sockaddr_storage); if (getpeername(sock_, (sockaddr*)&addr, (socklen_t*)&rsize) == 0) { @@ -214,7 +216,7 @@ void Socket::_updateURI() { } } -int Socket::close() { +void Peer::close(bool retry) { if (sock_ != INVALID_SOCKET) { #ifndef WIN32 ::close(sock_); @@ -222,17 +224,16 @@ int Socket::close() { closesocket(sock_); #endif sock_ = INVALID_SOCKET; - connected_ = false; + status_ = kDisconnected; // Attempt auto reconnect? //auto i = find(sockets.begin(),sockets.end(),this); //sockets.erase(i); } - return 0; } -void Socket::setProtocol(Protocol *p) { +/*void Peer::setProtocol(Protocol *p) { if (p != NULL) { if (proto_ == p) return; if (proto_ && proto_->id() == p->id()) return; @@ -240,7 +241,7 @@ void Socket::setProtocol(Protocol *p) { if (remote_proto_ != "") { Handshake hs1; hs1.magic = ftl::net::MAGIC; - hs1.name_size = 0; + //hs1.name_size = 0; hs1.proto_size = p->id().size(); send(FTL_PROTOCOL_HS1, hs1, p->id()); LOG(INFO) << "Handshake initiated with " << uri_; @@ -248,16 +249,10 @@ void Socket::setProtocol(Protocol *p) { proto_ = p; } else { - /*Handshake hs1; - hs1.magic = ftl::net::MAGIC; - hs1.name_size = 0; - hs1.proto_size = 0; - send(FTL_PROTOCOL_HS1, hs1); - LOG(INFO) << "Handshake initiated with " << uri_;*/ } -} +}*/ -void Socket::error() { +void Peer::socketError() { int err; #ifdef WIN32 int optlen = sizeof(err); @@ -268,7 +263,35 @@ void Socket::error() { LOG(ERROR) << "Socket: " << uri_ << " - error " << err; } -bool Socket::data() { +void Peer::error(int e) { + +} + +bool Peer::data() { + recv_buf_.reserve_buffer(kMaxMessage); + size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); + recv_buf_.buffer_consumed(rc); + + msgpack::object_handle msg; + while (recv_buf_.next(msg)) { + msgpack::object obj = msg.get(); + if (status_ != kConnected) { + // First message must be a handshake + tuple<uint32_t, std::string, msgpack::object> hs; + obj.convert(hs); + + if (get<1>(hs) != "__handshake__") { + close(); + LOG(ERROR) << "Missing handshake"; + return false; + } + } + disp_.dispatch(*this, obj); + } + return false; +} + +/*bool Socket::data() { //Read data from socket size_t n = 0; int c = 0; @@ -336,9 +359,9 @@ bool Socket::data() { } return true; -} +}*/ -int Socket::read(char *b, size_t count) { +/*int Socket::read(char *b, size_t count) { if (count > size()) LOG(WARNING) << "Reading too much data for service " << header_->service; count = (count > size() || count==0) ? size() : count; // TODO, utilise recv directly here... @@ -376,9 +399,9 @@ void Socket::handshake1() { void Socket::handshake2() { LOG(INFO) << "Handshake finalised for " << uri_; _connected(); -} +}*/ -void Socket::_dispatchReturn(const std::string &d) { +void Peer::_dispatchReturn(const std::string &d) { auto unpacked = msgpack::unpack(d.data(), d.size()); Dispatcher::response_t the_result; unpacked.get().convert(the_result); @@ -406,32 +429,34 @@ void Socket::_dispatchReturn(const std::string &d) { } } -void Socket::onConnect(std::function<void(Socket&)> &f) { - if (connected_) { - f(*this); +void Peer::onConnect(std::function<void()> &f) { + if (status_ == kConnected) { + f(); } else { - connect_handlers_.push_back(f); + open_handlers_.push_back(f); } } -void Socket::_connected() { - connected_ = true; - for (auto h : connect_handlers_) { - h(*this); +void Peer::_connected() { + status_ = kConnected; + for (auto h : open_handlers_) { + h(); } //connect_handlers_.clear(); } -int Socket::_send() { +int Peer::_send() { // Are we using a websocket? if (scheme_ == ftl::URI::SCHEME_WS) { // Create a websocket header as well. size_t len = 0; + const iovec *sendvec = send_buf_.vector(); + size_t size = send_buf_.vector_size(); char buf[20]; // TODO(nick) Should not be a stack buffer. // Calculate total size of message - for (auto v : send_vec_) { - len += v.iov_len; + for (size_t i=0; i < size; i++) { + len += sendvec[i].iov_len; } // Pack correct websocket header into buffer @@ -439,8 +464,8 @@ int Socket::_send() { if (rc == -1) return -1; // Patch the first io vector to be ws header - send_vec_[0].iov_base = buf; - send_vec_[0].iov_len = rc; + const_cast<iovec*>(&sendvec[0])->iov_base = buf; + const_cast<iovec*>(&sendvec[0])->iov_len = rc; } #ifdef WIN32 @@ -450,19 +475,13 @@ int Socket::_send() { c += ftl::net::internal::send(sock_, (char*)v.iov_base, v.iov_len, 0); } #else - int c = ftl::net::internal::writev(sock_, send_vec_.data(), send_vec_.size()); + int c = ftl::net::internal::writev(sock_, send_buf_.vector(), send_buf_.vector_size()); #endif - send_vec_.clear(); + send_buf_.clear(); return c; } -Socket::~Socket() { +Peer::~Peer() { close(); - - // Delete socket buffer - if (buffer_) delete [] buffer_; - buffer_ = NULL; - if (buffer_w_) delete [] buffer_w_; - buffer_w_ = NULL; } diff --git a/net/cpp/test/CMakeLists.txt b/net/cpp/test/CMakeLists.txt index c87241f4d22e7447a22d9929f92a6e8eab6e5fad..8baeeba5bb780f2956e105c81abbe5b65d4147bc 100644 --- a/net/cpp/test/CMakeLists.txt +++ b/net/cpp/test/CMakeLists.txt @@ -1,19 +1,15 @@ -### Protocol Unit ############################################################## -add_executable(protocol_unit - ./tests.cpp - ./protocol_unit.cpp -) -target_link_libraries(protocol_unit glog::glog) - ### Socket Unit ################################################################ -add_executable(socket_unit +add_executable(peer_unit ./tests.cpp ../src/ws_internal.cpp - ./socket_unit.cpp + ../src/dispatcher.cpp + ./peer_unit.cpp + ../../../common/cpp/src/config.cpp ) -target_link_libraries(socket_unit +target_link_libraries(peer_unit ${URIPARSER_LIBRARIES} - glog::glog) + glog::glog + ${UUID_LIBRARIES}) ### URI ######################################################################## add_executable(uri_unit @@ -25,15 +21,15 @@ target_link_libraries(uri_unit ### P2P Base Unit ############################################################## # TODO(nick) Actually make this a unit test -add_executable(p2p_base_unit - ./tests.cpp - ./p2p_base_unit.cpp) -add_dependencies(p2p_base_unit ftlnet) -target_link_libraries(p2p_base_unit - ftlnet - ${URIPARSER_LIBRARIES} - glog::glog - ${UUID_LIBRARIES}) +#add_executable(p2p_base_unit +# ./tests.cpp +# ./p2p_base_unit.cpp) +#add_dependencies(p2p_base_unit ftlnet) +#target_link_libraries(p2p_base_unit +# ftlnet +# ${URIPARSER_LIBRARIES} +# glog::glog +# ${UUID_LIBRARIES}) ### Net Integration ############################################################ add_executable(net_integration @@ -50,10 +46,10 @@ target_link_libraries(net_integration add_test(URIUnitTest uri_unit) -add_test(ProtocolUnitTest protocol_unit) -add_test(SocketUnitTest socket_unit) +#add_test(ProtocolUnitTest protocol_unit) +add_test(PeerUnitTest peer_unit) add_test(NetIntegrationTest net_integration) add_custom_target(tests) -add_dependencies(tests socket_unit protocol_unit net_integration uri_unit) +add_dependencies(tests peer_unit net_integration uri_unit) diff --git a/net/cpp/test/socket_unit.cpp b/net/cpp/test/peer_unit.cpp similarity index 71% rename from net/cpp/test/socket_unit.cpp rename to net/cpp/test/peer_unit.cpp index 36737e3f28045e6d6a4e95e58ca5bf02ecf3b706..a6507d615fb2b600710e152b0908d485da538fe7 100644 --- a/net/cpp/test/socket_unit.cpp +++ b/net/cpp/test/peer_unit.cpp @@ -1,16 +1,20 @@ #include "catch.hpp" #include <iostream> #include <memory> -#include <map> +//#include <map> +#include <tuple> +#include <ftl/net/peer.hpp> #include <ftl/net/protocol.hpp> -#include <ftl/net/socket.hpp> +#include <ftl/config.h> /* Allow socket functions to be mocked */ #define TEST_MOCKS #include "../src/net_internal.hpp" -using ftl::net::Socket; +using std::tuple; +using std::get; +using ftl::net::Peer; #ifdef WIN32 #pragma comment(lib, "Ws2_32.lib") @@ -18,50 +22,16 @@ using ftl::net::Socket; // --- Mock -------------------------------------------------------------------- -class MockSocket : public Socket { +class MockPeer : public Peer { public: - MockSocket() : Socket(0) {} + MockPeer() : Peer(0) {} void mock_data() { data(); } }; -static std::string last_rpc; -void ftl::net::Protocol::dispatchRPC(Socket &s, const std::string &d) { - last_rpc = d; - - // This should send a return value -} - -void ftl::net::Protocol::dispatchRaw(uint32_t service, Socket &s) { - -} - -ftl::net::Protocol::Protocol(const std::string &id) { -} - -ftl::net::Protocol::~Protocol() { -} - -ftl::net::Protocol *ftl::net::Protocol::find(const std::string &p) { - return NULL; -} - // --- Support ----------------------------------------------------------------- static std::map<int, std::string> fakedata; -void fake_send(int sd, uint32_t service, const std::string &data) { - //std::cout << "HEX SEND: " << hexStr(data) << std::endl; - char buf[8+1024]; - assert(data.size() < 1024); - ftl::net::Header *h = (ftl::net::Header*)&buf; - h->size = data.size()+4; - h->service = service; - std::memcpy(&buf[8],data.data(),data.size()); - fakedata[sd] = std::string(&buf[0], 8+data.size()); - - //std::cout << "HEX SEND2: " << hexStr(fakedata[sd]) << std::endl; -} - #ifdef WIN32 int ftl::net::internal::recv(SOCKET sd, char *buf, int n, int f) { #else @@ -103,26 +73,6 @@ ssize_t ftl::net::internal::writev(int sd, const struct iovec *v, int cnt) { } #endif -uint32_t get_service(int sd) { - auto h = (ftl::net::Header*)fakedata[sd].data(); - return h->service; -} - -size_t get_size(int sd) { - auto h = (ftl::net::Header*)fakedata[sd].data(); - return h->size-4; -} - -template <typename T> -T get_value(int sd) { - auto h = (T*)(fakedata[sd].data()+sizeof(ftl::net::Header)); - return *h; -} - -template <> -std::string get_value(int sd) { - return std::string((char*)(fakedata[sd].data()+sizeof(ftl::net::Header)),get_size(sd)); -} static std::function<void()> waithandler; @@ -136,16 +86,89 @@ bool wait() { }; }; +/*void fake_send(int sd, uint32_t service, ARGS) { + //std::cout << "HEX SEND: " << hexStr(data) << std::endl; + char buf[8+1024]; + assert(data.size() < 1024); + ftl::net::Header *h = (ftl::net::Header*)&buf; + h->size = data.size()+4; + h->service = service; + std::memcpy(&buf[8],data.data(),data.size()); + fakedata[sd] = std::string(&buf[0], 8+data.size()); + + //std::cout << "HEX SEND2: " << hexStr(fakedata[sd]) << std::endl; +}*/ + +void send_handshake(Peer &p) { + ftl::UUID id; + p.send("__handshake__", ftl::net::kMagic, ((8 << 16) + (5 << 8) + 2), id); +} + +template <typename T> +tuple<std::string, T> readResponse(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + tuple<uint8_t, std::string, T> req; + msg.get().convert(req); + return std::make_tuple(get<1>(req), get<2>(req)); +} + // --- Files to test ----------------------------------------------------------- -#include "../src/socket.cpp" +#include "../src/peer.cpp" // --- Tests ------------------------------------------------------------------- -using ftl::net::Protocol; +TEST_CASE("Peer(int)", "[]") { + SECTION("initiates a valid handshake") { + MockPeer s; -TEST_CASE("Socket::call()", "[rpc]") { - MockSocket s; + auto [name, hs] = readResponse<ftl::net::Handshake>(0); + + REQUIRE( name == "__handshake__" ); + + // 1) Sends magic (64bits) + REQUIRE( get<0>(hs) == ftl::net::kMagic ); + + // 2) Sends FTL Version + REQUIRE( get<1>(hs) == (FTL_VERSION_MAJOR << 16) + (FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH ); + + // 3) Sends peer UUID + + + REQUIRE( s.status() == Peer::kConnecting ); + } + + SECTION("completes on full handshake") { + MockPeer s; + + // Send handshake response + send_handshake(s); + s.mock_data(); + + REQUIRE( s.status() == Peer::kConnected ); + } + + SECTION("has correct version on full handshake") { + MockPeer s; + + // Send handshake response + send_handshake(s); + s.mock_data(); + + REQUIRE( s.getFTLVersion() == (8 << 16) + (5 << 8) + 2 ); + } + + SECTION("has correct peer id on full handshake") { + MockPeer s; + + // Send handshake response + + //REQUIRE( s.id() == ); + } +} + +/*TEST_CASE("Peer::call()", "[rpc]") { + MockPeer s; SECTION("no argument call") { waithandler = [&]() { @@ -186,42 +209,59 @@ TEST_CASE("Socket::call()", "[rpc]") { } waithandler = nullptr; -} +}*/ -TEST_CASE("Socket receive RPC", "[rpc]") { - MockSocket s; - auto p = new Protocol("ftl://utu.fi"); - s.setProtocol(p); - - SECTION("no argument call") { - // Do a fake send - auto args_obj = std::make_tuple(); - auto call_obj = std::make_tuple(0,0,"test1",args_obj); - std::stringstream buf; - msgpack::pack(buf, call_obj); +TEST_CASE("Peer::bind()", "[rpc]") { + MockPeer s; - fake_send(0, FTL_PROTOCOL_RPC, buf.str()); + SECTION("no argument call") { + bool done = false; + + s.bind("hello", [&]() { + done = true; + }); + + send_handshake(s); + s.mock_data(); + s.send("hello"); s.mock_data(); // Force it to read the fake send... - REQUIRE( (last_rpc == buf.str()) ); + REQUIRE( done ); } SECTION("one argument call") { - // Do a fake send - auto args_obj = std::make_tuple(55); - auto call_obj = std::make_tuple(0,0,"test2",args_obj); - std::stringstream buf; - msgpack::pack(buf, call_obj); + int done = 0; + + s.bind("hello", [&](int a) { + done = a; + }); + + send_handshake(s); + s.mock_data(); + s.send("hello", 55); + s.mock_data(); // Force it to read the fake send... + + REQUIRE( (done == 55) ); + } - fake_send(0, FTL_PROTOCOL_RPC, buf.str()); + SECTION("two argument call") { + std::string done; + + s.bind("hello", [&](int a, std::string b) { + done = b; + }); + + send_handshake(s); + s.mock_data(); + s.send("hello", 55, "world"); s.mock_data(); // Force it to read the fake send... - REQUIRE( (last_rpc == buf.str()) ); + REQUIRE( (done == "world") ); } } -TEST_CASE("Socket::operator>>()", "[io]") { - MockSocket s; +/*TEST_CASE("Socket::operator>>()", "[io]") { + MockPeer s; SECTION("stream ints") { int i[2]; @@ -238,21 +278,22 @@ TEST_CASE("Socket::operator>>()", "[io]") { REQUIRE( (i[0] == 99) ); REQUIRE( (i[1] == 101) ); } -} +}*/ TEST_CASE("Socket::send()", "[io]") { - MockSocket s; + MockPeer s; SECTION("send an int") { int i = 607; - s.send(100,i); + s.send("dummy",i); - REQUIRE( (get_service(0) == 100) ); - REQUIRE( (get_size(0) == sizeof(int)) ); - REQUIRE( (get_value<int>(0) == 607) ); + auto [name, value] = readResponse<tuple<int>>(0); + + REQUIRE( (name == "dummy") ); + REQUIRE( (get<0>(value) == 607) ); } - SECTION("send a string") { + /*SECTION("send a string") { std::string str("hello world"); s.send(100,str); @@ -294,10 +335,10 @@ TEST_CASE("Socket::send()", "[io]") { REQUIRE( (get_service(0) == 100) ); REQUIRE( (get_size(0) == str.size()+str2.size()) ); REQUIRE( (get_value<std::string>(0) == "hello world") ); - } + }*/ } -TEST_CASE("Socket::read()", "[io]") { +/*TEST_CASE("Socket::read()", "[io]") { MockSocket s; SECTION("read an int") { @@ -380,5 +421,5 @@ TEST_CASE("Socket::read()", "[io]") { REQUIRE( (s.read(&i,2) == sizeof(int)) ); REQUIRE( (i == 99) ); } -} +}*/