Skip to content
Snippets Groups Projects
Commit adc8ec91 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Rework net socket into net peer and rely fundamentally on msgpack

parent 876e7d11
No related branches found
No related tags found
No related merge requests found
#include <ftl/config.h>
const char *FTL_VERSION_LONG = "@VERSION@"; const char *FTL_VERSION_LONG = "@VERSION@";
const char *FTL_VERSION = @FTL_VERSION@; const char *FTL_VERSION = @FTL_VERSION@;
......
...@@ -6,11 +6,9 @@ include_directories(${PROJECT_SOURCE_DIR}/net/cpp/include) ...@@ -6,11 +6,9 @@ include_directories(${PROJECT_SOURCE_DIR}/net/cpp/include)
add_library(ftlnet add_library(ftlnet
src/net.cpp src/net.cpp
src/listener.cpp src/listener.cpp
src/socket.cpp src/peer.cpp
src/dispatcher.cpp src/dispatcher.cpp
src/protocol.cpp
src/ws_internal.cpp src/ws_internal.cpp
src/p2p.cpp
) )
check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE) check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE)
......
...@@ -8,7 +8,7 @@ namespace ftl { ...@@ -8,7 +8,7 @@ namespace ftl {
namespace net { namespace net {
class Listener; class Listener;
class Socket; class Peer;
const int MAX_CONNECTIONS = 100; // TODO Is this a good number? const int MAX_CONNECTIONS = 100; // TODO Is this a good number?
...@@ -23,7 +23,7 @@ std::shared_ptr<Listener> listen(const char *uri); ...@@ -23,7 +23,7 @@ std::shared_ptr<Listener> listen(const char *uri);
* Accepts tcp, ipc and ws URIs. An example would be: * Accepts tcp, ipc and ws URIs. An example would be:
* ws://ftl.utu.fi/api/connect * ws://ftl.utu.fi/api/connect
*/ */
std::shared_ptr<Socket> connect(const char *uri); std::shared_ptr<Peer> connect(const char *uri);
/** /**
* Start a loop to continually check for network messages. If the async * Start a loop to continually check for network messages. If the async
......
...@@ -39,13 +39,14 @@ namespace internal { ...@@ -39,13 +39,14 @@ namespace internal {
} }
namespace net { namespace net {
class Socket; class Peer;
class Dispatcher { class Dispatcher {
public: public:
Dispatcher() {} Dispatcher() {}
void dispatch(Socket &, const std::string &msg); //void dispatch(Peer &, const std::string &msg);
void dispatch(Peer &, const msgpack::object &msg);
template <typename F> template <typename F>
void bind(std::string const &name, F func, void bind(std::string const &name, F func,
...@@ -135,9 +136,8 @@ class Dispatcher { ...@@ -135,9 +136,8 @@ class Dispatcher {
void enforce_unique_name(std::string const &func); void enforce_unique_name(std::string const &func);
void dispatch(Socket &, const msgpack::object &msg); void dispatch_call(Peer &, const msgpack::object &msg);
void dispatch_call(Socket &, const msgpack::object &msg); void dispatch_notification(Peer &, msgpack::object const &msg);
void dispatch_notification(Socket &, msgpack::object const &msg);
}; };
} }
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#endif #endif
#include <ftl/net/handlers.hpp> #include <ftl/net/handlers.hpp>
#include <ftl/net/peer.hpp>
#include <vector> #include <vector>
...@@ -31,7 +32,7 @@ class Listener { ...@@ -31,7 +32,7 @@ class Listener {
void setProtocol(Protocol *p) { default_proto_ = p; } void setProtocol(Protocol *p) { default_proto_ = p; }
void connection(std::shared_ptr<Socket> &s); void connection(std::shared_ptr<Peer> &s);
void onConnection(connecthandler_t h) { handler_connect_.push_back(h); }; void onConnection(connecthandler_t h) { handler_connect_.push_back(h); };
private: private:
......
#ifndef _FTL_NET_SOCKET_HPP_ #ifndef _FTL_NET_PEER_HPP_
#define _FTL_NET_SOCKET_HPP_ #define _FTL_NET_PEER_HPP_
#define GLOG_NO_ABBREVIATED_SEVERITIES #define GLOG_NO_ABBREVIATED_SEVERITIES
#include <glog/logging.h> #include <glog/logging.h>
#include <ftl/net.hpp> #include <ftl/net.hpp>
#include <ftl/net/protocol.hpp> #include <ftl/net/protocol.hpp>
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/uuid.hpp>
#ifndef WIN32 #ifndef WIN32
#define INVALID_SOCKET -1 #define INVALID_SOCKET -1
#include <netinet/in.h> #include <netinet/in.h>
#endif #else
#ifdef WIN32
//#include <windows.h>
//#include <winsock.h>
#include <winsock2.h> #include <winsock2.h>
#endif #endif
...@@ -28,6 +25,7 @@ ...@@ -28,6 +25,7 @@
typename std::enable_if<(__VA_ARGS__), bool>::type = true typename std::enable_if<(__VA_ARGS__), bool>::type = true
extern bool _run(bool blocking, bool nodelay); extern bool _run(bool blocking, bool nodelay);
extern int setDescriptors();
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -53,26 +51,40 @@ struct decrypt{};*/ ...@@ -53,26 +51,40 @@ struct decrypt{};*/
* A single socket connection object, to be constructed using the connect() * A single socket connection object, to be constructed using the connect()
* function and not to be created directly. * function and not to be created directly.
*/ */
class Socket { class Peer {
public: public:
friend bool ::_run(bool blocking, bool nodelay); friend bool ::_run(bool blocking, bool nodelay);
public: friend int ::setDescriptors();
explicit Socket(const char *uri);
explicit Socket(int s);
~Socket();
int close(); enum Status {
kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting
};
void setProtocol(Protocol *p); public:
Protocol *protocol() const { return proto_; } explicit Peer(const char *uri);
explicit Peer(int s);
~Peer();
/** /**
* Get the internal OS dependent socket. * Close the peer if open. Setting retry parameter to true will initiate
* backoff retry attempts.
*/ */
int _socket() const { return sock_; }; void close(bool retry=false);
bool isConnected() const {
return sock_ != INVALID_SOCKET && status_ == kConnected;
};
bool isValid() const {
return status_ != kInvalid && sock_ != INVALID_SOCKET;
};
Status status() const { return status_; }
bool isConnected() const { return sock_ != INVALID_SOCKET && connected_; }; uint32_t getFTLVersion() const { return version_; }
bool isValid() const { return valid_ && sock_ != INVALID_SOCKET; }; uint8_t getFTLMajor() const { return version_ >> 16; }
uint8_t getFTLMinor() const { return (version_ >> 8) & 0xFF; }
uint8_t getFTLPatch() const { return version_ & 0xFF; }
/** /**
* Get the sockets protocol, address and port as a url string. This will be * Get the sockets protocol, address and port as a url string. This will be
...@@ -80,7 +92,15 @@ class Socket { ...@@ -80,7 +92,15 @@ class Socket {
*/ */
std::string getURI() const { return uri_; }; std::string getURI() const { return uri_; };
std::string to_string() const { return peerid_; }; /**
* Get the UUID for this peer.
*/
const ftl::UUID &id() const { return peerid_; };
/**
* Get the peer id as a string.
*/
std::string to_string() const { return peerid_.to_string(); };
/** /**
* Non-blocking Remote Procedure Call using a callback function. * Non-blocking Remote Procedure Call using a callback function.
...@@ -91,51 +111,44 @@ class Socket { ...@@ -91,51 +111,44 @@ class Socket {
ARGS... args); ARGS... args);
/** /**
* Blocking Remote Procedure Call. * Blocking Remote Procedure Call using a string name.
*/ */
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
R call(const std::string &name, ARGS... args); R call(const std::string &name, ARGS... args);
/**
* Non-blocking send using RPC function, but with no return value.
*/
template <typename... ARGS> template <typename... ARGS>
int send(uint32_t s, ARGS... args); int send(const std::string &name, ARGS... args);
void begin(uint32_t s);
template <typename T>
Socket &operator<<(T &t);
void end();
template <typename T>
int read(T *b, size_t count=1);
int read(char *b, size_t count);
int read(std::string &s, size_t count=0);
template <typename T>
int read(std::vector<T> &b, size_t count=0);
template <typename T>
int read(T &b);
template <typename T> /**
Socket &operator>>(T &t); * Bind a function to an RPC call name.
*/
//SocketStream stream(uint32_t service); template <typename F>
void bind(const std::string &name, F func);
size_t size() const { return header_->size-4; } //void onError(std::function<void(Socket&, int err, const char *msg)> &f) {}
void onConnect(std::function<void()> &f);
void onDisconnect(std::function<void()> &f) {}
void onError(std::function<void(Socket&, int err, const char *msg)> &f) {} public:
void onConnect(std::function<void(Socket&)> &f); static const int kMaxMessage = 10*1024*1024; // 10Mb currently
void onDisconnect(std::function<void(Socket&)> &f) {}
protected: protected:
bool data(); // Process one message from socket bool data(); // Process one message from socket
void error(); // Process one error from socket void socketError(); // Process one error from socket
void error(int e);
/**
* Get the internal OS dependent socket.
* TODO(nick) Work out if this should be private.
*/
int _socket() const { return sock_; };
/** /**
* Internal handlers for specific event types. This should be private but * Internal handlers for specific event types. This should be private but
* is current here for testing purposes. * is currently here for testing purposes.
* @{ * @{
*/ */
void handshake1(); void handshake1();
...@@ -166,52 +179,50 @@ class Socket { ...@@ -166,52 +179,50 @@ class Socket {
int _send(const T &t, ARGS... args); int _send(const T &t, ARGS... args);
private: // Data private: // Data
bool valid_; Status status_;
bool connected_;
int sock_; int sock_;
ftl::URI::scheme_t scheme_; ftl::URI::scheme_t scheme_;
uint32_t version_;
// Receive buffers // Receive buffers
size_t pos_; msgpack::unpacker recv_buf_;
size_t gpos_;
char *buffer_;
ftl::net::Header *header_;
char *data_;
// Send buffers // Send buffers
char *buffer_w_; msgpack::vrefbuffer send_buf_;
std::vector<iovec> send_vec_;
ftl::net::Header *header_w_;
std::string uri_; std::string uri_;
std::string peerid_; ftl::UUID peerid_;
std::string remote_proto_;
Protocol *proto_;
std::vector<std::function<void(Socket&)>> connect_handlers_; ftl::net::Dispatcher disp_;
std::vector<std::function<void()>> open_handlers_;
//std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_
std::vector<std::function<void()>> close_handlers_;
std::map<int, std::unique_ptr<virtual_caller>> callbacks_; std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
static int rpcid__; static int rpcid__;
static const int MAX_MESSAGE = 10*1024*1024; // 10Mb currently
static const int BUFFER_SIZE = MAX_MESSAGE + 16;
}; };
// --- Inline Template Implementations ----------------------------------------- // --- Inline Template Implementations -----------------------------------------
template <typename... ARGS> template <typename... ARGS>
int Socket::send(uint32_t s, ARGS... args) { int Peer::send(const std::string &s, ARGS... args) {
// Leave a blank entry for websocket header // Leave a blank entry for websocket header
if (scheme_ == ftl::URI::SCHEME_WS) send_vec_.push_back({nullptr,0}); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
//msgpack::pack(send_buf_, std::make_tuple(s, std::make_tuple(args...)));
auto args_obj = std::make_tuple(args...);
auto call_obj = std::make_tuple(0,s,args_obj);
msgpack::pack(send_buf_, call_obj);
return _send();
}
header_w_->service = s; template <typename F>
header_w_->size = 4; void Peer::bind(const std::string &name, F func) {
send_vec_.push_back({header_w_,sizeof(ftl::net::Header)}); disp_.bind(name, func,
return _send(args...); typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind());
} }
template <typename T> /*template <typename T>
int Socket::read(T *b, size_t count) { int Socket::read(T *b, size_t count) {
static_assert(std::is_trivial<T>::value, "Can only read trivial types"); static_assert(std::is_trivial<T>::value, "Can only read trivial types");
return read((char*)b, sizeof(T)*count); return read((char*)b, sizeof(T)*count);
...@@ -235,47 +246,52 @@ Socket &Socket::operator>>(T &t) { ...@@ -235,47 +246,52 @@ Socket &Socket::operator>>(T &t) {
if (std::is_array<T>::value) read(&t,std::extent<T>::value); if (std::is_array<T>::value) read(&t,std::extent<T>::value);
else read(&t); else read(&t);
return *this; return *this;
} }*/
template <typename... ARGS> /*template <typename... ARGS>
int Socket::_send(const std::string &t, ARGS... args) { int Peer::_send(const std::string &t, ARGS... args) {
send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); //send_vec_.push_back({const_cast<char*>(t.data()),t.size()});
header_w_->size += t.size(); //header_w_->size += t.size();
msgpack::pack(send_buf_, t);
return _send(args...)+t.size(); return _send(args...)+t.size();
} }
template <typename... ARGS> template <typename... ARGS>
int Socket::_send(const ftl::net::array &b, ARGS... args) { int Peer::_send(const ftl::net::array &b, ARGS... args) {
send_vec_.push_back({const_cast<char*>(std::get<0>(b)),std::get<1>(b)}); //send_vec_.push_back({const_cast<char*>(std::get<0>(b)),std::get<1>(b)});
header_w_->size += std::get<1>(b); //header_w_->size += std::get<1>(b);
msgpack::pack(send_buf_, msgpack::type::raw_ref(std::get<0>(b), std::get<1>(b)));
return std::get<1>(b)+_send(args...); return std::get<1>(b)+_send(args...);
} }
template <typename T, typename... ARGS> template <typename T, typename... ARGS>
int Socket::_send(const std::vector<T> &t, ARGS... args) { int Peer::_send(const std::vector<T> &t, ARGS... args) {
send_vec_.push_back({const_cast<char*>(t.data()),t.size()}); //send_vec_.push_back({const_cast<char*>(t.data()),t.size()});
header_w_->size += t.size(); //header_w_->size += t.size();
msgpack::pack(send_buf_, t);
return _send(args...)+t.size(); return _send(args...)+t.size();
} }
template <typename... Types, typename... ARGS> template <typename... Types, typename... ARGS>
int Socket::_send(const std::tuple<Types...> &t, ARGS... args) { int Peer::_send(const std::tuple<Types...> &t, ARGS... args) {
send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)}); //send_vec_.push_back({const_cast<char*>((char*)&t),sizeof(t)});
header_w_->size += sizeof(t); //header_w_->size += sizeof(t);
msgpack::pack(send_buf_, t);
return sizeof(t)+_send(args...); return sizeof(t)+_send(args...);
} }
template <typename T, typename... ARGS, template <typename T, typename... ARGS,
ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)>
int Socket::_send(const T &t, ARGS... args) { int Peer::_send(const T &t, ARGS... args) {
send_vec_.push_back({const_cast<T*>(&t),sizeof(T)}); //send_vec_.push_back({const_cast<T*>(&t),sizeof(T)});
header_w_->size += sizeof(T); //header_w_->size += sizeof(T);
msgpack::pack(send_buf_, t);
return sizeof(T)+_send(args...); return sizeof(T)+_send(args...);
} }*/
//template <typename T, typename... ARGS> //template <typename T, typename... ARGS>
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
R Socket::call(const std::string &name, ARGS... args) { R Peer::call(const std::string &name, ARGS... args) {
bool hasreturned = false; bool hasreturned = false;
R result; R result;
asyncCall<R>(name, [&result,&hasreturned](const R &r) { asyncCall<R>(name, [&result,&hasreturned](const R &r) {
...@@ -294,7 +310,7 @@ R Socket::call(const std::string &name, ARGS... args) { ...@@ -294,7 +310,7 @@ R Socket::call(const std::string &name, ARGS... args) {
} }
template <typename T, typename... ARGS> template <typename T, typename... ARGS>
void Socket::asyncCall( void Peer::asyncCall(
const std::string &name, const std::string &name,
std::function<void(const T&)> cb, std::function<void(const T&)> cb,
ARGS... args) { ARGS... args) {
...@@ -310,7 +326,7 @@ void Socket::asyncCall( ...@@ -310,7 +326,7 @@ void Socket::asyncCall(
// Register the CB // Register the CB
callbacks_[rpcid] = std::make_unique<caller<T>>(cb); callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
send(FTL_PROTOCOL_RPC, buf.str()); send("__rpc__", buf.str());
} }
}; };
......
#ifndef _FTL_NET_PROTOCOL_HPP_ #ifndef _FTL_NET_PROTOCOL_HPP_
#define _FTL_NET_PROTOCOL_HPP_ #define _FTL_NET_PROTOCOL_HPP_
#include <ftl/uuid.hpp>
#include <ftl/net/func_traits.hpp> #include <ftl/net/func_traits.hpp>
#include <ftl/net/dispatcher.hpp> #include <ftl/net/dispatcher.hpp>
#include <map> #include <ftl/config.h>
#include <string> #include <tuple>
#define FTL_PROTOCOL_HS1 0x0001 // Handshake step 1
#define FTL_PROTOCOL_HS2 0x0002 // Handshake step 2
#define FTL_PROTOCOL_RPC 0x0100
#define FTL_PROTOCOL_RPCRETURN 0x0101
#define FTL_PROTOCOL_FREE 0x1000 // Custom protocols above this
namespace ftl { namespace ftl {
namespace net { namespace net {
class Reader; typedef std::tuple<uint64_t, uint32_t, ftl::UUID> Handshake;
class Socket;
#pragma pack(push,1)
struct Header {
uint32_t size;
uint32_t service;
};
struct Handshake {
uint64_t magic;
uint32_t name_size;
uint32_t proto_size;
};
#pragma pack(pop)
static const uint64_t MAGIC = 0x1099340053640912;
/**
* Each instance of this Protocol class represents a specific protocol. A
* protocol is a set of RPC bindings and raw message handlers. A protocol is
* identified, selected and validated using an automatically generated hash of
* all its supported bindings. The choice of protocol for a socket is made
* during the initial connection handshake.
*/
class Protocol {
public:
friend class Socket;
public:
explicit Protocol(const std::string &id);
~Protocol();
/**
* Bind a function to an RPC call name.
*/
template <typename F>
void bind(const std::string &name, F func);
/**
* Bind a function to a raw message type.
*/
void bind(int service, std::function<void(uint32_t,Socket&)> func);
// broadcast?
const std::string &id() const { return id_; }
static Protocol *find(const std::string &id);
//protected:
void dispatchRPC(Socket &, const std::string &d);
void dispatchReturn(Socket &, const std::string &d);
void dispatchRaw(uint32_t service, Socket &);
void addSocket(std::shared_ptr<Socket> s);
void removeSocket(const Socket &s);
private:
ftl::net::Dispatcher disp_;
std::map<uint32_t,std::function<void(uint32_t,Socket&)>> handlers_;
std::string id_;
static std::map<std::string,Protocol*> protocols__;
};
// --- Template Implementations ------------------------------------------------
template <typename F> static const uint64_t kMagic = 0x1099340053640912;
void Protocol::bind(const std::string &name, F func) { static const uint32_t kVersion = (FTL_VERSION_MAJOR << 16) +
disp_.bind(name, func, (FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH;
typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind());
}
}; };
}; };
......
#define GLOG_NO_ABBREVIATED_SEVERITIES #define GLOG_NO_ABBREVIATED_SEVERITIES
#include <glog/logging.h> #include <glog/logging.h>
#include <ftl/net/dispatcher.hpp> #include <ftl/net/dispatcher.hpp>
#include <ftl/net/socket.hpp> #include <ftl/net/peer.hpp>
#include <iostream> #include <iostream>
using ftl::net::Socket; using ftl::net::Peer;
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
...@@ -17,13 +17,13 @@ using ftl::net::Socket; ...@@ -17,13 +17,13 @@ using ftl::net::Socket;
return ss.str(); return ss.str();
}*/ }*/
void ftl::net::Dispatcher::dispatch(Socket &s, const std::string &msg) { //void ftl::net::Dispatcher::dispatch(Peer &s, const std::string &msg) {
//std::cout << "Received dispatch : " << hexStr(msg) << std::endl; //std::cout << "Received dispatch : " << hexStr(msg) << std::endl;
auto unpacked = msgpack::unpack(msg.data(), msg.size()); // auto unpacked = msgpack::unpack(msg.data(), msg.size());
dispatch(s, unpacked.get()); // dispatch(s, unpacked.get());
} //}
void ftl::net::Dispatcher::dispatch(Socket &s, const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) {
switch (msg.via.array.size) { switch (msg.via.array.size) {
case 3: case 3:
dispatch_notification(s, msg); break; dispatch_notification(s, msg); break;
...@@ -35,7 +35,7 @@ void ftl::net::Dispatcher::dispatch(Socket &s, const msgpack::object &msg) { ...@@ -35,7 +35,7 @@ void ftl::net::Dispatcher::dispatch(Socket &s, const msgpack::object &msg) {
} }
} }
void ftl::net::Dispatcher::dispatch_call(Socket &s, const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
call_t the_call; call_t the_call;
msg.convert(the_call); msg.convert(the_call);
...@@ -57,26 +57,26 @@ void ftl::net::Dispatcher::dispatch_call(Socket &s, const msgpack::object &msg) ...@@ -57,26 +57,26 @@ void ftl::net::Dispatcher::dispatch_call(Socket &s, const msgpack::object &msg)
response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
s.send(FTL_PROTOCOL_RPCRETURN, buf.str()); s.send("__return__", buf.str());
} catch (const std::exception &e) { } catch (const std::exception &e) {
//throw; //throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object());
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
s.send(FTL_PROTOCOL_RPCRETURN, buf.str()); s.send("__return__", buf.str());
} catch (int e) { } catch (int e) {
//throw; //throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object());
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
s.send(FTL_PROTOCOL_RPCRETURN, buf.str()); s.send("__return__", buf.str());
} }
} }
} }
void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object const &msg) { void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) {
notification_t the_call; notification_t the_call;
msg.convert(the_call); msg.convert(the_call);
...@@ -87,6 +87,8 @@ void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object cons ...@@ -87,6 +87,8 @@ void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object cons
auto &&name = std::get<1>(the_call); auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call); auto &&args = std::get<2>(the_call);
LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name); auto it_func = funcs_.find(name);
if (it_func != end(funcs_)) { if (it_func != end(funcs_)) {
...@@ -95,6 +97,8 @@ void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object cons ...@@ -95,6 +97,8 @@ void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object cons
} catch (int e) { } catch (int e) {
throw e; throw e;
} }
} else {
LOG(ERROR) << "Missing handler for incoming message";
} }
} }
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/net/listener.hpp> #include <ftl/net/listener.hpp>
#include <ftl/net/socket.hpp> #include <ftl/net/peer.hpp>
#include <ftl/net/protocol.hpp> #include <ftl/net/protocol.hpp>
#include <iostream> #include <iostream>
...@@ -24,10 +24,10 @@ ...@@ -24,10 +24,10 @@
typedef int socklen_t; typedef int socklen_t;
#endif #endif
using namespace ftl;
using ftl::net::Listener; using ftl::net::Listener;
using std::shared_ptr; using std::shared_ptr;
using ftl::net::Socket; using ftl::net::Peer;
using ftl::URI;
int tcpListen(URI &uri) { int tcpListen(URI &uri) {
int ssock; int ssock;
...@@ -113,10 +113,10 @@ Listener::~Listener() { ...@@ -113,10 +113,10 @@ Listener::~Listener() {
close(); close();
} }
void Listener::connection(shared_ptr<Socket> &s) { void Listener::connection(shared_ptr<Peer> &s) {
Handshake hs1; /*Handshake hs1;
hs1.magic = ftl::net::MAGIC; hs1.magic = ftl::net::MAGIC;
hs1.name_size = 0; memset(hs1.id, 0, 16);
if (default_proto_) { if (default_proto_) {
s->setProtocol(default_proto_); s->setProtocol(default_proto_);
...@@ -129,7 +129,7 @@ void Listener::connection(shared_ptr<Socket> &s) { ...@@ -129,7 +129,7 @@ void Listener::connection(shared_ptr<Socket> &s) {
} }
LOG(INFO) << "Handshake initiated with " << s->getURI(); LOG(INFO) << "Handshake initiated with " << s->getURI();
for (auto h : handler_connect_) h(s); for (auto h : handler_connect_) h(s);*/
} }
void Listener::close() { void Listener::close() {
......
#include <ftl/net.hpp> #include <ftl/net.hpp>
#include <ftl/net/listener.hpp> #include <ftl/net/listener.hpp>
#include <ftl/net/socket.hpp> #include <ftl/net/peer.hpp>
#ifdef WIN32 #ifdef WIN32
#include <Ws2tcpip.h> #include <Ws2tcpip.h>
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
using namespace std; using namespace std;
using namespace std::chrono; using namespace std::chrono;
using ftl::net::Listener; using ftl::net::Listener;
using ftl::net::Socket; using ftl::net::Peer;
std::vector<shared_ptr<ftl::net::Socket>> sockets; std::vector<shared_ptr<ftl::net::Peer>> peers;
static std::vector<shared_ptr<ftl::net::Listener>> listeners; static std::vector<shared_ptr<ftl::net::Listener>> listeners;
static fd_set sfdread; static fd_set sfdread;
static fd_set sfderror; static fd_set sfderror;
...@@ -45,7 +45,7 @@ static fd_set sfderror; ...@@ -45,7 +45,7 @@ static fd_set sfderror;
return freeclient; return freeclient;
}*/ }*/
static int setDescriptors() { int setDescriptors() {
//Reset all file descriptors //Reset all file descriptors
FD_ZERO(&sfdread); FD_ZERO(&sfdread);
FD_ZERO(&sfderror); FD_ZERO(&sfderror);
...@@ -62,7 +62,7 @@ static int setDescriptors() { ...@@ -62,7 +62,7 @@ static int setDescriptors() {
} }
//Set the file descriptors for each client //Set the file descriptors for each client
for (auto s : sockets) { for (auto s : peers) {
if (s != nullptr && s->isValid()) { if (s != nullptr && s->isValid()) {
if (s->_socket() > n) { if (s->_socket() > n) {
...@@ -83,18 +83,18 @@ shared_ptr<Listener> ftl::net::listen(const char *uri) { ...@@ -83,18 +83,18 @@ shared_ptr<Listener> ftl::net::listen(const char *uri) {
return l; return l;
} }
shared_ptr<Socket> ftl::net::connect(const char *uri) { shared_ptr<Peer> ftl::net::connect(const char *uri) {
shared_ptr<Socket> s(new Socket((uri == NULL) ? "" : uri)); shared_ptr<Peer> s(new Peer((uri == NULL) ? "" : uri));
sockets.push_back(s); peers.push_back(s);
return s; return s;
} }
void ftl::net::stop() { void ftl::net::stop() {
for (auto s : sockets) { for (auto s : peers) {
s->close(); s->close();
} }
sockets.clear(); peers.clear();
for (auto l : listeners) { for (auto l : listeners) {
l->close(); l->close();
...@@ -150,8 +150,8 @@ bool _run(bool blocking, bool nodelay) { ...@@ -150,8 +150,8 @@ bool _run(bool blocking, bool nodelay) {
int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize);
if (csock != INVALID_SOCKET) { if (csock != INVALID_SOCKET) {
auto sock = make_shared<Socket>(csock); auto sock = make_shared<Peer>(csock);
sockets.push_back(sock); peers.push_back(sock);
// Call connection handlers // Call connection handlers
l->connection(sock); l->connection(sock);
...@@ -162,22 +162,22 @@ bool _run(bool blocking, bool nodelay) { ...@@ -162,22 +162,22 @@ bool _run(bool blocking, bool nodelay) {
} }
//Also check each clients socket to see if any messages or errors are waiting //Also check each clients socket to see if any messages or errors are waiting
for (auto s : sockets) { for (auto s : peers) {
if (s != NULL && s->isValid()) { if (s != NULL && s->isValid()) {
//If message received from this client then deal with it //If message received from this client then deal with it
if (FD_ISSET(s->_socket(), &sfdread)) { if (FD_ISSET(s->_socket(), &sfdread)) {
repeat |= s->data(); repeat |= s->data();
} }
if (FD_ISSET(s->_socket(), &sfderror)) { if (FD_ISSET(s->_socket(), &sfderror)) {
s->error(); s->socketError();
} }
} else if (s != NULL) { } else if (s != NULL) {
// Erase it // Erase it
for (auto i=sockets.begin(); i!=sockets.end(); i++) { for (auto i=peers.begin(); i!=peers.end(); i++) {
if ((*i) == s) { if ((*i) == s) {
std::cout << "REMOVING SOCKET" << std::endl; std::cout << "REMOVING SOCKET" << std::endl;
sockets.erase(i); break; peers.erase(i); break;
} }
} }
} }
......
...@@ -4,8 +4,9 @@ ...@@ -4,8 +4,9 @@
#include <fcntl.h> #include <fcntl.h>
#include <ftl/uri.hpp> #include <ftl/uri.hpp>
#include <ftl/net/socket.hpp> #include <ftl/net/peer.hpp>
#include <ftl/net/ws_internal.hpp> #include <ftl/net/ws_internal.hpp>
#include <ftl/config.h>
#include "net_internal.hpp" #include "net_internal.hpp"
#ifndef WIN32 #ifndef WIN32
...@@ -28,13 +29,13 @@ ...@@ -28,13 +29,13 @@
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <algorithm> #include <algorithm>
#include <tuple>
using namespace ftl; using std::tuple;
using ftl::net::Socket; using std::get;
using ftl::net::Protocol; using ftl::net::Peer;
using ftl::URI; using ftl::URI;
using ftl::net::ws_connect; using ftl::net::ws_connect;
using namespace std;
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
...@@ -47,7 +48,7 @@ using namespace std; ...@@ -47,7 +48,7 @@ using namespace std;
return ss.str(); return ss.str();
}*/ }*/
int Socket::rpcid__ = 0; int Peer::rpcid__ = 0;
// TODO(nick) Move to tcp_internal.cpp // TODO(nick) Move to tcp_internal.cpp
static int tcpConnect(URI &uri) { static int tcpConnect(URI &uri) {
...@@ -122,32 +123,33 @@ static int tcpConnect(URI &uri) { ...@@ -122,32 +123,33 @@ static int tcpConnect(URI &uri) {
return csocket; return csocket;
} }
Socket::Socket(int s) : sock_(s), pos_(0), proto_(nullptr) { Peer::Peer(int s) : sock_(s) {
valid_ = true; status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
_updateURI();
buffer_ = new char[BUFFER_SIZE]; // Send the initiating handshake if valid
header_ = (Header*)buffer_; if (status_ == kConnecting) {
data_ = buffer_+sizeof(Header); // Install return handshake handler.
buffer_w_ = new char[BUFFER_SIZE]; bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) {
header_w_ = (Header*)buffer_w_; if (magic != ftl::net::kMagic) {
close();
LOG(ERROR) << "Invalid magic during handshake";
} else {
status_ = kConnected;
version_ = version;
}
});
connected_ = false; ftl::UUID uuid;
_updateURI(); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid);
}
} }
Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { Peer::Peer(const char *pUri) : uri_(pUri) {
// Allocate buffer
buffer_ = new char[BUFFER_SIZE];
header_ = (Header*)buffer_;
data_ = buffer_+sizeof(Header);
buffer_w_ = new char[BUFFER_SIZE];
header_w_ = (Header*)buffer_w_;
URI uri(pUri); URI uri(pUri);
valid_ = false; status_ = kInvalid;
connected_ = false;
sock_ = INVALID_SOCKET; sock_ = INVALID_SOCKET;
scheme_ = uri.getProtocol(); scheme_ = uri.getProtocol();
...@@ -161,7 +163,7 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { ...@@ -161,7 +163,7 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) {
fcntl(sock_, F_SETFL, O_NONBLOCK); fcntl(sock_, F_SETFL, O_NONBLOCK);
#endif #endif
valid_ = true; status_ = kConnecting;
} else if (uri.getProtocol() == URI::SCHEME_WS) { } else if (uri.getProtocol() == URI::SCHEME_WS) {
LOG(INFO) << "Websocket connect " << uri.getPath(); LOG(INFO) << "Websocket connect " << uri.getPath();
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri);
...@@ -181,13 +183,13 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) { ...@@ -181,13 +183,13 @@ Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) {
fcntl(sock_, F_SETFL, O_NONBLOCK); fcntl(sock_, F_SETFL, O_NONBLOCK);
#endif #endif
valid_ = true; status_ = kConnecting;
} else { } else {
LOG(ERROR) << "Unrecognised connection protocol: " << pUri; LOG(ERROR) << "Unrecognised connection protocol: " << pUri;
} }
} }
void Socket::_updateURI() { void Peer::_updateURI() {
sockaddr_storage addr; sockaddr_storage addr;
int rsize = sizeof(sockaddr_storage); int rsize = sizeof(sockaddr_storage);
if (getpeername(sock_, (sockaddr*)&addr, (socklen_t*)&rsize) == 0) { if (getpeername(sock_, (sockaddr*)&addr, (socklen_t*)&rsize) == 0) {
...@@ -214,7 +216,7 @@ void Socket::_updateURI() { ...@@ -214,7 +216,7 @@ void Socket::_updateURI() {
} }
} }
int Socket::close() { void Peer::close(bool retry) {
if (sock_ != INVALID_SOCKET) { if (sock_ != INVALID_SOCKET) {
#ifndef WIN32 #ifndef WIN32
::close(sock_); ::close(sock_);
...@@ -222,17 +224,16 @@ int Socket::close() { ...@@ -222,17 +224,16 @@ int Socket::close() {
closesocket(sock_); closesocket(sock_);
#endif #endif
sock_ = INVALID_SOCKET; sock_ = INVALID_SOCKET;
connected_ = false; status_ = kDisconnected;
// Attempt auto reconnect? // Attempt auto reconnect?
//auto i = find(sockets.begin(),sockets.end(),this); //auto i = find(sockets.begin(),sockets.end(),this);
//sockets.erase(i); //sockets.erase(i);
} }
return 0;
} }
void Socket::setProtocol(Protocol *p) { /*void Peer::setProtocol(Protocol *p) {
if (p != NULL) { if (p != NULL) {
if (proto_ == p) return; if (proto_ == p) return;
if (proto_ && proto_->id() == p->id()) return; if (proto_ && proto_->id() == p->id()) return;
...@@ -240,7 +241,7 @@ void Socket::setProtocol(Protocol *p) { ...@@ -240,7 +241,7 @@ void Socket::setProtocol(Protocol *p) {
if (remote_proto_ != "") { if (remote_proto_ != "") {
Handshake hs1; Handshake hs1;
hs1.magic = ftl::net::MAGIC; hs1.magic = ftl::net::MAGIC;
hs1.name_size = 0; //hs1.name_size = 0;
hs1.proto_size = p->id().size(); hs1.proto_size = p->id().size();
send(FTL_PROTOCOL_HS1, hs1, p->id()); send(FTL_PROTOCOL_HS1, hs1, p->id());
LOG(INFO) << "Handshake initiated with " << uri_; LOG(INFO) << "Handshake initiated with " << uri_;
...@@ -248,16 +249,10 @@ void Socket::setProtocol(Protocol *p) { ...@@ -248,16 +249,10 @@ void Socket::setProtocol(Protocol *p) {
proto_ = p; proto_ = p;
} else { } else {
/*Handshake hs1;
hs1.magic = ftl::net::MAGIC;
hs1.name_size = 0;
hs1.proto_size = 0;
send(FTL_PROTOCOL_HS1, hs1);
LOG(INFO) << "Handshake initiated with " << uri_;*/
}
} }
}*/
void Socket::error() { void Peer::socketError() {
int err; int err;
#ifdef WIN32 #ifdef WIN32
int optlen = sizeof(err); int optlen = sizeof(err);
...@@ -268,7 +263,35 @@ void Socket::error() { ...@@ -268,7 +263,35 @@ void Socket::error() {
LOG(ERROR) << "Socket: " << uri_ << " - error " << err; LOG(ERROR) << "Socket: " << uri_ << " - error " << err;
} }
bool Socket::data() { void Peer::error(int e) {
}
bool Peer::data() {
recv_buf_.reserve_buffer(kMaxMessage);
size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
recv_buf_.buffer_consumed(rc);
msgpack::object_handle msg;
while (recv_buf_.next(msg)) {
msgpack::object obj = msg.get();
if (status_ != kConnected) {
// First message must be a handshake
tuple<uint32_t, std::string, msgpack::object> hs;
obj.convert(hs);
if (get<1>(hs) != "__handshake__") {
close();
LOG(ERROR) << "Missing handshake";
return false;
}
}
disp_.dispatch(*this, obj);
}
return false;
}
/*bool Socket::data() {
//Read data from socket //Read data from socket
size_t n = 0; size_t n = 0;
int c = 0; int c = 0;
...@@ -336,9 +359,9 @@ bool Socket::data() { ...@@ -336,9 +359,9 @@ bool Socket::data() {
} }
return true; return true;
} }*/
int Socket::read(char *b, size_t count) { /*int Socket::read(char *b, size_t count) {
if (count > size()) LOG(WARNING) << "Reading too much data for service " << header_->service; if (count > size()) LOG(WARNING) << "Reading too much data for service " << header_->service;
count = (count > size() || count==0) ? size() : count; count = (count > size() || count==0) ? size() : count;
// TODO, utilise recv directly here... // TODO, utilise recv directly here...
...@@ -376,9 +399,9 @@ void Socket::handshake1() { ...@@ -376,9 +399,9 @@ void Socket::handshake1() {
void Socket::handshake2() { void Socket::handshake2() {
LOG(INFO) << "Handshake finalised for " << uri_; LOG(INFO) << "Handshake finalised for " << uri_;
_connected(); _connected();
} }*/
void Socket::_dispatchReturn(const std::string &d) { void Peer::_dispatchReturn(const std::string &d) {
auto unpacked = msgpack::unpack(d.data(), d.size()); auto unpacked = msgpack::unpack(d.data(), d.size());
Dispatcher::response_t the_result; Dispatcher::response_t the_result;
unpacked.get().convert(the_result); unpacked.get().convert(the_result);
...@@ -406,32 +429,34 @@ void Socket::_dispatchReturn(const std::string &d) { ...@@ -406,32 +429,34 @@ void Socket::_dispatchReturn(const std::string &d) {
} }
} }
void Socket::onConnect(std::function<void(Socket&)> &f) { void Peer::onConnect(std::function<void()> &f) {
if (connected_) { if (status_ == kConnected) {
f(*this); f();
} else { } else {
connect_handlers_.push_back(f); open_handlers_.push_back(f);
} }
} }
void Socket::_connected() { void Peer::_connected() {
connected_ = true; status_ = kConnected;
for (auto h : connect_handlers_) { for (auto h : open_handlers_) {
h(*this); h();
} }
//connect_handlers_.clear(); //connect_handlers_.clear();
} }
int Socket::_send() { int Peer::_send() {
// Are we using a websocket? // Are we using a websocket?
if (scheme_ == ftl::URI::SCHEME_WS) { if (scheme_ == ftl::URI::SCHEME_WS) {
// Create a websocket header as well. // Create a websocket header as well.
size_t len = 0; size_t len = 0;
const iovec *sendvec = send_buf_.vector();
size_t size = send_buf_.vector_size();
char buf[20]; // TODO(nick) Should not be a stack buffer. char buf[20]; // TODO(nick) Should not be a stack buffer.
// Calculate total size of message // Calculate total size of message
for (auto v : send_vec_) { for (size_t i=0; i < size; i++) {
len += v.iov_len; len += sendvec[i].iov_len;
} }
// Pack correct websocket header into buffer // Pack correct websocket header into buffer
...@@ -439,8 +464,8 @@ int Socket::_send() { ...@@ -439,8 +464,8 @@ int Socket::_send() {
if (rc == -1) return -1; if (rc == -1) return -1;
// Patch the first io vector to be ws header // Patch the first io vector to be ws header
send_vec_[0].iov_base = buf; const_cast<iovec*>(&sendvec[0])->iov_base = buf;
send_vec_[0].iov_len = rc; const_cast<iovec*>(&sendvec[0])->iov_len = rc;
} }
#ifdef WIN32 #ifdef WIN32
...@@ -450,19 +475,13 @@ int Socket::_send() { ...@@ -450,19 +475,13 @@ int Socket::_send() {
c += ftl::net::internal::send(sock_, (char*)v.iov_base, v.iov_len, 0); c += ftl::net::internal::send(sock_, (char*)v.iov_base, v.iov_len, 0);
} }
#else #else
int c = ftl::net::internal::writev(sock_, send_vec_.data(), send_vec_.size()); int c = ftl::net::internal::writev(sock_, send_buf_.vector(), send_buf_.vector_size());
#endif #endif
send_vec_.clear(); send_buf_.clear();
return c; return c;
} }
Socket::~Socket() { Peer::~Peer() {
close(); close();
// Delete socket buffer
if (buffer_) delete [] buffer_;
buffer_ = NULL;
if (buffer_w_) delete [] buffer_w_;
buffer_w_ = NULL;
} }
### Protocol Unit ##############################################################
add_executable(protocol_unit
./tests.cpp
./protocol_unit.cpp
)
target_link_libraries(protocol_unit glog::glog)
### Socket Unit ################################################################ ### Socket Unit ################################################################
add_executable(socket_unit add_executable(peer_unit
./tests.cpp ./tests.cpp
../src/ws_internal.cpp ../src/ws_internal.cpp
./socket_unit.cpp ../src/dispatcher.cpp
./peer_unit.cpp
../../../common/cpp/src/config.cpp
) )
target_link_libraries(socket_unit target_link_libraries(peer_unit
${URIPARSER_LIBRARIES} ${URIPARSER_LIBRARIES}
glog::glog) glog::glog
${UUID_LIBRARIES})
### URI ######################################################################## ### URI ########################################################################
add_executable(uri_unit add_executable(uri_unit
...@@ -25,15 +21,15 @@ target_link_libraries(uri_unit ...@@ -25,15 +21,15 @@ target_link_libraries(uri_unit
### P2P Base Unit ############################################################## ### P2P Base Unit ##############################################################
# TODO(nick) Actually make this a unit test # TODO(nick) Actually make this a unit test
add_executable(p2p_base_unit #add_executable(p2p_base_unit
./tests.cpp # ./tests.cpp
./p2p_base_unit.cpp) # ./p2p_base_unit.cpp)
add_dependencies(p2p_base_unit ftlnet) #add_dependencies(p2p_base_unit ftlnet)
target_link_libraries(p2p_base_unit #target_link_libraries(p2p_base_unit
ftlnet # ftlnet
${URIPARSER_LIBRARIES} # ${URIPARSER_LIBRARIES}
glog::glog # glog::glog
${UUID_LIBRARIES}) # ${UUID_LIBRARIES})
### Net Integration ############################################################ ### Net Integration ############################################################
add_executable(net_integration add_executable(net_integration
...@@ -50,10 +46,10 @@ target_link_libraries(net_integration ...@@ -50,10 +46,10 @@ target_link_libraries(net_integration
add_test(URIUnitTest uri_unit) add_test(URIUnitTest uri_unit)
add_test(ProtocolUnitTest protocol_unit) #add_test(ProtocolUnitTest protocol_unit)
add_test(SocketUnitTest socket_unit) add_test(PeerUnitTest peer_unit)
add_test(NetIntegrationTest net_integration) add_test(NetIntegrationTest net_integration)
add_custom_target(tests) add_custom_target(tests)
add_dependencies(tests socket_unit protocol_unit net_integration uri_unit) add_dependencies(tests peer_unit net_integration uri_unit)
#include "catch.hpp" #include "catch.hpp"
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <map> //#include <map>
#include <tuple>
#include <ftl/net/peer.hpp>
#include <ftl/net/protocol.hpp> #include <ftl/net/protocol.hpp>
#include <ftl/net/socket.hpp> #include <ftl/config.h>
/* Allow socket functions to be mocked */ /* Allow socket functions to be mocked */
#define TEST_MOCKS #define TEST_MOCKS
#include "../src/net_internal.hpp" #include "../src/net_internal.hpp"
using ftl::net::Socket; using std::tuple;
using std::get;
using ftl::net::Peer;
#ifdef WIN32 #ifdef WIN32
#pragma comment(lib, "Ws2_32.lib") #pragma comment(lib, "Ws2_32.lib")
...@@ -18,50 +22,16 @@ using ftl::net::Socket; ...@@ -18,50 +22,16 @@ using ftl::net::Socket;
// --- Mock -------------------------------------------------------------------- // --- Mock --------------------------------------------------------------------
class MockSocket : public Socket { class MockPeer : public Peer {
public: public:
MockSocket() : Socket(0) {} MockPeer() : Peer(0) {}
void mock_data() { data(); } void mock_data() { data(); }
}; };
static std::string last_rpc;
void ftl::net::Protocol::dispatchRPC(Socket &s, const std::string &d) {
last_rpc = d;
// This should send a return value
}
void ftl::net::Protocol::dispatchRaw(uint32_t service, Socket &s) {
}
ftl::net::Protocol::Protocol(const std::string &id) {
}
ftl::net::Protocol::~Protocol() {
}
ftl::net::Protocol *ftl::net::Protocol::find(const std::string &p) {
return NULL;
}
// --- Support ----------------------------------------------------------------- // --- Support -----------------------------------------------------------------
static std::map<int, std::string> fakedata; static std::map<int, std::string> fakedata;
void fake_send(int sd, uint32_t service, const std::string &data) {
//std::cout << "HEX SEND: " << hexStr(data) << std::endl;
char buf[8+1024];
assert(data.size() < 1024);
ftl::net::Header *h = (ftl::net::Header*)&buf;
h->size = data.size()+4;
h->service = service;
std::memcpy(&buf[8],data.data(),data.size());
fakedata[sd] = std::string(&buf[0], 8+data.size());
//std::cout << "HEX SEND2: " << hexStr(fakedata[sd]) << std::endl;
}
#ifdef WIN32 #ifdef WIN32
int ftl::net::internal::recv(SOCKET sd, char *buf, int n, int f) { int ftl::net::internal::recv(SOCKET sd, char *buf, int n, int f) {
#else #else
...@@ -103,26 +73,6 @@ ssize_t ftl::net::internal::writev(int sd, const struct iovec *v, int cnt) { ...@@ -103,26 +73,6 @@ ssize_t ftl::net::internal::writev(int sd, const struct iovec *v, int cnt) {
} }
#endif #endif
uint32_t get_service(int sd) {
auto h = (ftl::net::Header*)fakedata[sd].data();
return h->service;
}
size_t get_size(int sd) {
auto h = (ftl::net::Header*)fakedata[sd].data();
return h->size-4;
}
template <typename T>
T get_value(int sd) {
auto h = (T*)(fakedata[sd].data()+sizeof(ftl::net::Header));
return *h;
}
template <>
std::string get_value(int sd) {
return std::string((char*)(fakedata[sd].data()+sizeof(ftl::net::Header)),get_size(sd));
}
static std::function<void()> waithandler; static std::function<void()> waithandler;
...@@ -136,16 +86,89 @@ bool wait() { ...@@ -136,16 +86,89 @@ bool wait() {
}; };
}; };
/*void fake_send(int sd, uint32_t service, ARGS) {
//std::cout << "HEX SEND: " << hexStr(data) << std::endl;
char buf[8+1024];
assert(data.size() < 1024);
ftl::net::Header *h = (ftl::net::Header*)&buf;
h->size = data.size()+4;
h->service = service;
std::memcpy(&buf[8],data.data(),data.size());
fakedata[sd] = std::string(&buf[0], 8+data.size());
//std::cout << "HEX SEND2: " << hexStr(fakedata[sd]) << std::endl;
}*/
void send_handshake(Peer &p) {
ftl::UUID id;
p.send("__handshake__", ftl::net::kMagic, ((8 << 16) + (5 << 8) + 2), id);
}
template <typename T>
tuple<std::string, T> readResponse(int s) {
msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size());
tuple<uint8_t, std::string, T> req;
msg.get().convert(req);
return std::make_tuple(get<1>(req), get<2>(req));
}
// --- Files to test ----------------------------------------------------------- // --- Files to test -----------------------------------------------------------
#include "../src/socket.cpp" #include "../src/peer.cpp"
// --- Tests ------------------------------------------------------------------- // --- Tests -------------------------------------------------------------------
using ftl::net::Protocol; TEST_CASE("Peer(int)", "[]") {
SECTION("initiates a valid handshake") {
MockPeer s;
TEST_CASE("Socket::call()", "[rpc]") { auto [name, hs] = readResponse<ftl::net::Handshake>(0);
MockSocket s;
REQUIRE( name == "__handshake__" );
// 1) Sends magic (64bits)
REQUIRE( get<0>(hs) == ftl::net::kMagic );
// 2) Sends FTL Version
REQUIRE( get<1>(hs) == (FTL_VERSION_MAJOR << 16) + (FTL_VERSION_MINOR << 8) + FTL_VERSION_PATCH );
// 3) Sends peer UUID
REQUIRE( s.status() == Peer::kConnecting );
}
SECTION("completes on full handshake") {
MockPeer s;
// Send handshake response
send_handshake(s);
s.mock_data();
REQUIRE( s.status() == Peer::kConnected );
}
SECTION("has correct version on full handshake") {
MockPeer s;
// Send handshake response
send_handshake(s);
s.mock_data();
REQUIRE( s.getFTLVersion() == (8 << 16) + (5 << 8) + 2 );
}
SECTION("has correct peer id on full handshake") {
MockPeer s;
// Send handshake response
//REQUIRE( s.id() == );
}
}
/*TEST_CASE("Peer::call()", "[rpc]") {
MockPeer s;
SECTION("no argument call") { SECTION("no argument call") {
waithandler = [&]() { waithandler = [&]() {
...@@ -186,42 +209,59 @@ TEST_CASE("Socket::call()", "[rpc]") { ...@@ -186,42 +209,59 @@ TEST_CASE("Socket::call()", "[rpc]") {
} }
waithandler = nullptr; waithandler = nullptr;
} }*/
TEST_CASE("Socket receive RPC", "[rpc]") { TEST_CASE("Peer::bind()", "[rpc]") {
MockSocket s; MockPeer s;
auto p = new Protocol("ftl://utu.fi");
s.setProtocol(p);
SECTION("no argument call") { SECTION("no argument call") {
// Do a fake send bool done = false;
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test1",args_obj); s.bind("hello", [&]() {
std::stringstream buf; done = true;
msgpack::pack(buf, call_obj); });
fake_send(0, FTL_PROTOCOL_RPC, buf.str()); send_handshake(s);
s.mock_data();
s.send("hello");
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
REQUIRE( (last_rpc == buf.str()) ); REQUIRE( done );
} }
SECTION("one argument call") { SECTION("one argument call") {
// Do a fake send int done = 0;
auto args_obj = std::make_tuple(55);
auto call_obj = std::make_tuple(0,0,"test2",args_obj); s.bind("hello", [&](int a) {
std::stringstream buf; done = a;
msgpack::pack(buf, call_obj); });
send_handshake(s);
s.mock_data();
s.send("hello", 55);
s.mock_data(); // Force it to read the fake send...
fake_send(0, FTL_PROTOCOL_RPC, buf.str()); REQUIRE( (done == 55) );
}
SECTION("two argument call") {
std::string done;
s.bind("hello", [&](int a, std::string b) {
done = b;
});
send_handshake(s);
s.mock_data();
s.send("hello", 55, "world");
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
REQUIRE( (last_rpc == buf.str()) ); REQUIRE( (done == "world") );
} }
} }
TEST_CASE("Socket::operator>>()", "[io]") { /*TEST_CASE("Socket::operator>>()", "[io]") {
MockSocket s; MockPeer s;
SECTION("stream ints") { SECTION("stream ints") {
int i[2]; int i[2];
...@@ -238,21 +278,22 @@ TEST_CASE("Socket::operator>>()", "[io]") { ...@@ -238,21 +278,22 @@ TEST_CASE("Socket::operator>>()", "[io]") {
REQUIRE( (i[0] == 99) ); REQUIRE( (i[0] == 99) );
REQUIRE( (i[1] == 101) ); REQUIRE( (i[1] == 101) );
} }
} }*/
TEST_CASE("Socket::send()", "[io]") { TEST_CASE("Socket::send()", "[io]") {
MockSocket s; MockPeer s;
SECTION("send an int") { SECTION("send an int") {
int i = 607; int i = 607;
s.send(100,i); s.send("dummy",i);
REQUIRE( (get_service(0) == 100) ); auto [name, value] = readResponse<tuple<int>>(0);
REQUIRE( (get_size(0) == sizeof(int)) );
REQUIRE( (get_value<int>(0) == 607) ); REQUIRE( (name == "dummy") );
REQUIRE( (get<0>(value) == 607) );
} }
SECTION("send a string") { /*SECTION("send a string") {
std::string str("hello world"); std::string str("hello world");
s.send(100,str); s.send(100,str);
...@@ -294,10 +335,10 @@ TEST_CASE("Socket::send()", "[io]") { ...@@ -294,10 +335,10 @@ TEST_CASE("Socket::send()", "[io]") {
REQUIRE( (get_service(0) == 100) ); REQUIRE( (get_service(0) == 100) );
REQUIRE( (get_size(0) == str.size()+str2.size()) ); REQUIRE( (get_size(0) == str.size()+str2.size()) );
REQUIRE( (get_value<std::string>(0) == "hello world") ); REQUIRE( (get_value<std::string>(0) == "hello world") );
} }*/
} }
TEST_CASE("Socket::read()", "[io]") { /*TEST_CASE("Socket::read()", "[io]") {
MockSocket s; MockSocket s;
SECTION("read an int") { SECTION("read an int") {
...@@ -380,5 +421,5 @@ TEST_CASE("Socket::read()", "[io]") { ...@@ -380,5 +421,5 @@ TEST_CASE("Socket::read()", "[io]") {
REQUIRE( (s.read(&i,2) == sizeof(int)) ); REQUIRE( (s.read(&i,2) == sizeof(int)) );
REQUIRE( (i == 99) ); REQUIRE( (i == 99) );
} }
} }*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment