Skip to content
Snippets Groups Projects
Commit 3bcdd4e9 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Refactor net to include protocol object and more unit like unit tests

parent 9c99eed5
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include <memory> #include <memory>
#include <tuple> #include <tuple>
#include <functional> #include <functional>
#include <iostream> //#include <iostream>
namespace ftl { namespace ftl {
...@@ -38,9 +38,9 @@ class Socket; ...@@ -38,9 +38,9 @@ class Socket;
class Dispatcher { class Dispatcher {
public: public:
Dispatcher(Socket *s) : sock_(s) {} Dispatcher() {}
void dispatch(const std::string &msg); void dispatch(Socket &, const std::string &msg);
template <typename F> template <typename F>
void bind(std::string const &name, F func, void bind(std::string const &name, F func,
...@@ -123,7 +123,6 @@ class Dispatcher { ...@@ -123,7 +123,6 @@ class Dispatcher {
std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>; std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>;
private: private:
ftl::net::Socket *sock_;
std::unordered_map<std::string, adaptor_type> funcs_; std::unordered_map<std::string, adaptor_type> funcs_;
static void enforce_arg_count(std::string const &func, std::size_t found, static void enforce_arg_count(std::string const &func, std::size_t found,
...@@ -131,9 +130,9 @@ class Dispatcher { ...@@ -131,9 +130,9 @@ class Dispatcher {
void enforce_unique_name(std::string const &func); void enforce_unique_name(std::string const &func);
void dispatch(const msgpack::object &msg); void dispatch(Socket &, const msgpack::object &msg);
void dispatch_call(const msgpack::object &msg); void dispatch_call(Socket &, const msgpack::object &msg);
void dispatch_notification(msgpack::object const &msg); void dispatch_notification(Socket &, msgpack::object const &msg);
}; };
} }
......
...@@ -14,9 +14,6 @@ namespace internal { ...@@ -14,9 +14,6 @@ namespace internal {
template<typename T> template<typename T>
using invoke = typename T::type; using invoke = typename T::type;
template <typename T, typename... ARGS>
struct first_type { typedef T type; };
template<typename T, T I> template<typename T, T I>
struct constant : std::integral_constant<T, I> {}; struct constant : std::integral_constant<T, I> {};
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
namespace ftl { namespace ftl {
namespace net { namespace net {
class Protocol;
class Listener { class Listener {
public: public:
Listener(const char *uri); Listener(const char *uri);
...@@ -27,11 +29,14 @@ class Listener { ...@@ -27,11 +29,14 @@ class Listener {
void close(); void close();
int _socket() { return descriptor_; } int _socket() { return descriptor_; }
void setProtocol(Protocol *p) { default_proto_ = p; }
void connection(std::shared_ptr<Socket> &s); void connection(std::shared_ptr<Socket> &s);
void onConnection(connecthandler_t h) { handler_connect_.push_back(h); }; void onConnection(connecthandler_t h) { handler_connect_.push_back(h); };
private: private:
int descriptor_; int descriptor_;
Protocol *default_proto_;
sockaddr_in slocalAddr; sockaddr_in slocalAddr;
std::vector<connecthandler_t> handler_connect_; std::vector<connecthandler_t> handler_connect_;
}; };
......
#ifndef _FTL_NET_PROTOCOL_HPP_ #ifndef _FTL_NET_PROTOCOL_HPP_
#define _FTL_NET_PROTOCOL_HPP_ #define _FTL_NET_PROTOCOL_HPP_
#include <ftl/net/func_traits.hpp>
#include <ftl/net/dispatcher.hpp>
#include <map>
#include <string>
#define FTL_PROTOCOL_HS1 0x0001 // Handshake step 1 #define FTL_PROTOCOL_HS1 0x0001 // Handshake step 1
#define FTL_PROTOCOL_HS2 0x0002 // Handshake step 2 #define FTL_PROTOCOL_HS2 0x0002 // Handshake step 2
#define FTL_PROTOCOL_RPC 0x0100 #define FTL_PROTOCOL_RPC 0x0100
#define FTL_PROTOCOL_RPCRETURN 0x0101 #define FTL_PROTOCOL_RPCRETURN 0x0101
#define FTL_PROTOCOL_P2P 0x1000 #define FTL_PROTOCOL_FREE 0x1000 // Custom protocols above this
namespace ftl { namespace ftl {
namespace net { namespace net {
static const uint32_t MAGIC = 0x23995621; class Reader;
class Socket;
static const uint8_t PATCH = 0;
static const uint8_t MINOR = 0;
static const uint8_t MAJOR = 1;
inline uint32_t version(int maj, int min, int pat) {
return (maj << 16) | (min << 8) | pat;
}
inline uint32_t version() {
return version(MAJOR, MINOR, PATCH);
}
#pragma pack(push,1) #pragma pack(push,1)
...@@ -34,14 +28,69 @@ struct Header { ...@@ -34,14 +28,69 @@ struct Header {
}; };
struct Handshake { struct Handshake {
uint32_t magic; uint64_t proto; // The protocol the other party is expected to use.
uint32_t version; char peerid[16]; // GUID for the origin peer.
char peerid[16]; char reserved_[32]; // RESERVED, must be 0.
}; };
#pragma pack(pop) #pragma pack(pop)
/**
* Each instance of this Protocol class represents a specific protocol. A
* protocol is a set of RPC bindings and raw message handlers. A protocol is
* identified, selected and validated using an automatically generated hash of
* all its supported bindings. The choice of protocol for a socket is made
* during the initial connection handshake.
*/
class Protocol {
public:
friend class Socket;
public:
Protocol(uint64_t id);
~Protocol();
/**
* Bind a function to an RPC call name.
*/
template <typename F>
void bind(const std::string &name, F func);
/**
* Bind a function to a raw message type.
*/
void bind(int service, std::function<void(uint32_t,Socket&)> func);
// broadcast?
uint64_t id() const { return id_; }
static Protocol *find(uint64_t id);
protected:
void dispatchRPC(Socket &, const std::string &d);
void dispatchReturn(Socket &, const std::string &d);
void dispatchRaw(uint32_t service, Socket &);
void addSocket(std::shared_ptr<Socket> s);
void removeSocket(const Socket &s);
private:
ftl::net::Dispatcher disp_;
std::map<uint32_t,std::function<void(uint32_t,Socket&)>> handlers_;
uint64_t id_;
static std::map<uint64_t,Protocol*> protocols__;
};
// --- Template Implementations ------------------------------------------------
template <typename F>
void Protocol::bind(const std::string &name, F func) {
disp_.bind(name, func,
typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind());
}
}; };
}; };
......
...@@ -3,8 +3,6 @@ ...@@ -3,8 +3,6 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <ftl/net.hpp> #include <ftl/net.hpp>
#include <ftl/net/handlers.hpp>
#include <ftl/net/dispatcher.hpp>
#include <ftl/net/protocol.hpp> #include <ftl/net/protocol.hpp>
#ifndef WIN32 #ifndef WIN32
...@@ -20,6 +18,8 @@ ...@@ -20,6 +18,8 @@
#include <sstream> #include <sstream>
#include <type_traits> #include <type_traits>
extern bool _run(bool blocking, bool nodelay);
namespace ftl { namespace ftl {
namespace net { namespace net {
...@@ -39,12 +39,17 @@ struct caller : virtual_caller { ...@@ -39,12 +39,17 @@ struct caller : virtual_caller {
* function and not to be created directly. * function and not to be created directly.
*/ */
class Socket { class Socket {
public:
friend bool ::_run(bool blocking, bool nodelay);
public: public:
Socket(const char *uri); Socket(const char *uri);
Socket(int s); Socket(int s);
~Socket(); ~Socket();
int close(); int close();
void setProtocol(Protocol *p);
Protocol *protocol() const { return proto_; }
/** /**
* Get the internal OS dependent socket. * Get the internal OS dependent socket.
...@@ -59,18 +64,6 @@ class Socket { ...@@ -59,18 +64,6 @@ class Socket {
* the same as the initial connection string on the client. * the same as the initial connection string on the client.
*/ */
std::string getURI() const { return uri_; }; std::string getURI() const { return uri_; };
/**
* Bind a function to an RPC call name.
*/
template <typename F>
void bind(const std::string &name, F func);
/**
* Bind a function to a raw message type.
*/
void bind(uint32_t service, std::function<void(Socket&,
const std::string&)> func);
/** /**
* Non-blocking Remote Procedure Call using a callback function. * Non-blocking Remote Procedure Call using a callback function.
...@@ -99,42 +92,60 @@ class Socket { ...@@ -99,42 +92,60 @@ class Socket {
int send2(uint32_t service, const std::string &data1, int send2(uint32_t service, const std::string &data1,
const std::string &data2); const std::string &data2);
template <typename T>
int read(T *b, size_t count=1) {
static_assert(std::is_trivial<T>::value);
return read((char*)b, sizeof(T)*count);
}
//template <>
int read(char *b, size_t count);
int read(std::string &s, size_t count=0);
template <typename T>
int read(T &b) {
return read(&b);
}
size_t size() const { return header_->size-4; }
/** /**
* Internal handlers for specific event types. This should be private but * Internal handlers for specific event types. This should be private but
* is current here for testing purposes. * is current here for testing purposes.
* @{ * @{
*/ */
void dispatchRPC(const std::string &d) { disp_.dispatch(d); }
void dispatchReturn(const std::string &d);
void handshake1(const std::string &d); void handshake1(const std::string &d);
void handshake2(const std::string &d); void handshake2(const std::string &d);
/** @} */ /** @} */
void onError(sockerrorhandler_t handler) {} //void onError(sockerrorhandler_t handler) {}
void onConnect(std::function<void(Socket&)> f); void onConnect(std::function<void(Socket&)> f);
void onDisconnect(sockdisconnecthandler_t handler) {} //void onDisconnect(sockdisconnecthandler_t handler) {}
bool data(); protected:
void error(); bool data(); // Process one message from socket
void error(); // Process one error from socket
private: // Functions private: // Functions
void _connected(); void _connected();
void _updateURI(); void _updateURI();
void _dispatchReturn(const std::string &d);
private: // Data private: // Data
bool valid_; bool valid_;
bool connected_; bool connected_;
uint32_t version_;
int sock_; int sock_;
size_t pos_; size_t pos_;
size_t gpos_;
char *buffer_; char *buffer_;
ftl::net::Header *header_;
char *data_;
std::string uri_; std::string uri_;
std::string peerid_; std::string peerid_;
Protocol *proto_;
ftl::net::Dispatcher disp_;
std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_;
std::vector<std::function<void(Socket&)>> connect_handlers_; std::vector<std::function<void(Socket&)>> connect_handlers_;
std::map<int, std::unique_ptr<virtual_caller>> callbacks_; std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
...@@ -146,13 +157,6 @@ class Socket { ...@@ -146,13 +157,6 @@ class Socket {
// --- Inline Template Implementations ----------------------------------------- // --- Inline Template Implementations -----------------------------------------
template <typename F>
void Socket::bind(const std::string &name, F func) {
disp_.bind(name, func,
typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind());
}
//template <typename T, typename... ARGS> //template <typename T, typename... ARGS>
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
R Socket::call(const std::string &name, ARGS... args) { R Socket::call(const std::string &name, ARGS... args) {
......
...@@ -3,6 +3,8 @@ ...@@ -3,6 +3,8 @@
#include <ftl/net/socket.hpp> #include <ftl/net/socket.hpp>
#include <iostream> #include <iostream>
using ftl::net::Socket;
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
const char *data = s.data(); const char *data = s.data();
...@@ -14,25 +16,25 @@ ...@@ -14,25 +16,25 @@
return ss.str(); return ss.str();
}*/ }*/
void ftl::net::Dispatcher::dispatch(const std::string &msg) { void ftl::net::Dispatcher::dispatch(Socket &s, const std::string &msg) {
//std::cout << "Received dispatch : " << hexStr(msg) << std::endl; //std::cout << "Received dispatch : " << hexStr(msg) << std::endl;
auto unpacked = msgpack::unpack(msg.data(), msg.size()); auto unpacked = msgpack::unpack(msg.data(), msg.size());
dispatch(unpacked.get()); dispatch(s, unpacked.get());
} }
void ftl::net::Dispatcher::dispatch(const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch(Socket &s, const msgpack::object &msg) {
switch (msg.via.array.size) { switch (msg.via.array.size) {
case 3: case 3:
dispatch_notification(msg); break; dispatch_notification(s, msg); break;
case 4: case 4:
dispatch_call(msg); break; dispatch_call(s, msg); break;
default: default:
LOG(ERROR) << "Unrecognised msgpack : " << msg.via.array.size; LOG(ERROR) << "Unrecognised msgpack : " << msg.via.array.size;
return; return;
} }
} }
void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch_call(Socket &s, const msgpack::object &msg) {
call_t the_call; call_t the_call;
msg.convert(the_call); msg.convert(the_call);
...@@ -44,7 +46,7 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { ...@@ -44,7 +46,7 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) {
auto &&name = std::get<2>(the_call); auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call); auto &&args = std::get<3>(the_call);
LOG(INFO) << "RPC " << name << "() <- " << sock_->getURI(); LOG(INFO) << "RPC " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name); auto it_func = funcs_.find(name);
...@@ -57,14 +59,14 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { ...@@ -57,14 +59,14 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) {
//std::cout << " RESULT " << result.as<std::string>() << std::endl; //std::cout << " RESULT " << result.as<std::string>() << std::endl;
sock_->send(FTL_PROTOCOL_RPCRETURN, buf.str()); s.send(FTL_PROTOCOL_RPCRETURN, buf.str());
} catch (...) { } catch (...) {
throw; throw;
} }
} }
} }
void ftl::net::Dispatcher::dispatch_notification(msgpack::object const &msg) { void ftl::net::Dispatcher::dispatch_notification(Socket &s, msgpack::object const &msg) {
notification_t the_call; notification_t the_call;
msg.convert(the_call); msg.convert(the_call);
......
...@@ -111,11 +111,9 @@ Listener::~Listener() { ...@@ -111,11 +111,9 @@ Listener::~Listener() {
} }
void Listener::connection(shared_ptr<Socket> &s) { void Listener::connection(shared_ptr<Socket> &s) {
ftl::net::Handshake hs1; if (default_proto_) {
hs1.magic = ftl::net::MAGIC; s->setProtocol(default_proto_);
hs1.version = ftl::net::version(); }
s->send(FTL_PROTOCOL_HS1, std::string((char*)&hs1, sizeof(hs1)));
LOG(INFO) << "Handshake initiated with " << s->getURI();
for (auto h : handler_connect_) h(s); for (auto h : handler_connect_) h(s);
} }
......
#include <glog/logging.h>
#include <ftl/net/socket.hpp>
#include <ftl/net/protocol.hpp>
#include <functional>
using ftl::net::Socket;
using ftl::net::Protocol;
std::map<uint64_t,Protocol*> Protocol::protocols__;
Protocol *Protocol::find(uint64_t id) {
if (protocols__.count(id) > 0) return protocols__[id];
else return NULL;
}
Protocol::Protocol(uint64_t id) : id_(id) {
protocols__[id] = this;
}
Protocol::~Protocol() {
protocols__.erase(id_);
// TODO Make sure all dependent sockets are closed!
}
void Protocol::bind(int service, std::function<void(uint32_t,Socket&)> func) {
if (handlers_.count(service) == 0) {
handlers_[service] = func;
} else {
LOG(ERROR) << "Message service " << service << " already bound";
}
}
void Protocol::dispatchRPC(Socket &s, const std::string &d) {
disp_.dispatch(s,d);
}
void Protocol::dispatchRaw(uint32_t service, Socket &s) {
// Lookup raw message handler
if (handlers_.count(service) > 0) {
handlers_[service](service, s);
} else {
LOG(ERROR) << "Unrecognised service request (" << service << ") from " << s.getURI();
}
}
...@@ -120,17 +120,21 @@ static int wsConnect(URI &uri) { ...@@ -120,17 +120,21 @@ static int wsConnect(URI &uri) {
return 1; return 1;
} }
Socket::Socket(int s) : sock_(s), pos_(0), disp_(this) { Socket::Socket(int s) : sock_(s), pos_(0), proto_(nullptr) {
valid_ = true; valid_ = true;
buffer_ = new char[BUFFER_SIZE]; buffer_ = new char[BUFFER_SIZE];
header_ = (Header*)buffer_;
data_ = buffer_+sizeof(Header);
connected_ = false; connected_ = false;
_updateURI(); _updateURI();
} }
Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), disp_(this) { Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), proto_(nullptr) {
// Allocate buffer // Allocate buffer
buffer_ = new char[BUFFER_SIZE]; buffer_ = new char[BUFFER_SIZE];
header_ = (Header*)buffer_;
data_ = buffer_+sizeof(Header);
URI uri(pUri); URI uri(pUri);
...@@ -191,6 +195,17 @@ int Socket::close() { ...@@ -191,6 +195,17 @@ int Socket::close() {
return 0; return 0;
} }
void Socket::setProtocol(Protocol *p) {
if (proto_ == p) return;
if (proto_ && proto_->id() == p->id()) return;
proto_ = p;
ftl::net::Handshake hs1;
hs1.proto = p->id();
send(FTL_PROTOCOL_HS1, std::string((char*)&hs1, sizeof(hs1)));
LOG(INFO) << "Handshake initiated with " << uri_;
}
void Socket::error() { void Socket::error() {
int err; int err;
uint32_t optlen = sizeof(err); uint32_t optlen = sizeof(err);
...@@ -249,27 +264,40 @@ bool Socket::data() { ...@@ -249,27 +264,40 @@ bool Socket::data() {
auto d = std::string(buffer_+8, len-4); auto d = std::string(buffer_+8, len-4);
pos_ = 0; // DODGY, processing messages inside handlers is dangerous. pos_ = 0; // DODGY, processing messages inside handlers is dangerous.
gpos_ = 0;
if (service == FTL_PROTOCOL_HS1 && !connected_) { if (service == FTL_PROTOCOL_HS1 && !connected_) {
handshake1(d); handshake1(d);
} else if (service == FTL_PROTOCOL_HS2 && !connected_) { } else if (service == FTL_PROTOCOL_HS2 && !connected_) {
handshake2(d); handshake2(d);
} else if (service == FTL_PROTOCOL_RPC) { } else if (service == FTL_PROTOCOL_RPC) {
dispatchRPC(d); if (proto_) proto_->dispatchRPC(*this, d);
else LOG(WARNING) << "No protocol set for socket " << uri_;
} else if (service == FTL_PROTOCOL_RPCRETURN) { } else if (service == FTL_PROTOCOL_RPCRETURN) {
dispatchReturn(d); _dispatchReturn(d);
} else { } else {
// Lookup raw message handler if (proto_) proto_->dispatchRaw(service, *this);
if (handlers_.count(service) > 0) { else LOG(WARNING) << "No protocol set for socket " << uri_;
handlers_[service](*this, d);
} else {
LOG(ERROR) << "Unrecognised service request (" << service << ") from " << uri_;
}
} }
return true; return true;
} }
int Socket::read(char *b, size_t count) {
if (count > size()) LOG(WARNING) << "Reading too much data for service " << header_->service;
count = (count > size() || count==0) ? size() : count;
// TODO, utilise recv directly here...
memcpy(b,data_+gpos_,count);
gpos_+=count;
return count;
}
int Socket::read(std::string &s, size_t count) {
count = (count > size() || count==0) ? size() : count;
s = std::string(data_+gpos_,count);
return count;
}
void Socket::handshake1(const std::string &d) { void Socket::handshake1(const std::string &d) {
ftl::net::Handshake *hs; ftl::net::Handshake *hs;
if (d.size() != sizeof(ftl::net::Handshake)) { if (d.size() != sizeof(ftl::net::Handshake)) {
...@@ -279,23 +307,22 @@ void Socket::handshake1(const std::string &d) { ...@@ -279,23 +307,22 @@ void Socket::handshake1(const std::string &d) {
} }
hs = (ftl::net::Handshake*)d.data(); hs = (ftl::net::Handshake*)d.data();
if (hs->magic != ftl::net::MAGIC) { auto proto = Protocol::find(hs->proto);
LOG(ERROR) << "Handshake magic failed for " << uri_; if (proto == NULL) {
LOG(ERROR) << "Protocol (" << hs->proto << ") not found during handshake for " << uri_;
close(); close();
return; return;
} else {
proto_ = proto;
} }
version_ = (hs->version > ftl::net::version()) ?
ftl::net::version() :
hs->version;
peerid_ = std::string(&hs->peerid[0],16); peerid_ = std::string(&hs->peerid[0],16);
ftl::net::Handshake hs2; ftl::net::Handshake hs2;
hs2.magic = ftl::net::MAGIC; //hs2.magic = ftl::net::MAGIC;
hs2.version = version_; //hs2.version = version_;
// TODO Set peerid; // TODO Set peerid;
send(FTL_PROTOCOL_HS2, std::string((char*)&hs2, sizeof(hs2))); send(FTL_PROTOCOL_HS2, std::string((char*)&hs2, sizeof(hs2)));
LOG(INFO) << "Handshake v" << version_ << " confirmed from " << uri_; LOG(INFO) << "Handshake" << " confirmed from " << uri_;
_connected(); _connected();
} }
...@@ -308,7 +335,7 @@ void Socket::handshake2(const std::string &d) { ...@@ -308,7 +335,7 @@ void Socket::handshake2(const std::string &d) {
} }
hs = (ftl::net::Handshake*)d.data(); hs = (ftl::net::Handshake*)d.data();
if (hs->magic != ftl::net::MAGIC) { /*if (hs->magic != ftl::net::MAGIC) {
LOG(ERROR) << "Handshake magic failed for " << uri_; LOG(ERROR) << "Handshake magic failed for " << uri_;
close(); close();
return; return;
...@@ -316,13 +343,13 @@ void Socket::handshake2(const std::string &d) { ...@@ -316,13 +343,13 @@ void Socket::handshake2(const std::string &d) {
version_ = (hs->version > ftl::net::version()) ? version_ = (hs->version > ftl::net::version()) ?
ftl::net::version() : ftl::net::version() :
hs->version; hs->version;*/
peerid_ = std::string(&hs->peerid[0],16); peerid_ = std::string(&hs->peerid[0],16);
LOG(INFO) << "Handshake finalised for " << uri_; LOG(INFO) << "Handshake finalised for " << uri_;
_connected(); _connected();
} }
void Socket::dispatchReturn(const std::string &d) { void Socket::_dispatchReturn(const std::string &d) {
auto unpacked = msgpack::unpack(d.data(), d.size()); auto unpacked = msgpack::unpack(d.data(), d.size());
Dispatcher::response_t the_result; Dispatcher::response_t the_result;
unpacked.get().convert(the_result); unpacked.get().convert(the_result);
...@@ -363,15 +390,6 @@ void Socket::_connected() { ...@@ -363,15 +390,6 @@ void Socket::_connected() {
//connect_handlers_.clear(); //connect_handlers_.clear();
} }
void Socket::bind(uint32_t service, std::function<void(Socket&,
const std::string&)> func) {
if (handlers_.count(service) == 0) {
handlers_[service] = func;
} else {
LOG(ERROR) << "Message service " << service << " already bound";
}
}
int Socket::send(uint32_t service, const std::string &data) { int Socket::send(uint32_t service, const std::string &data) {
ftl::net::Header h; ftl::net::Header h;
h.size = data.size()+4; h.size = data.size()+4;
......
...@@ -10,6 +10,32 @@ add_executable(rpc_test EXCLUDE_FROM_ALL ...@@ -10,6 +10,32 @@ add_executable(rpc_test EXCLUDE_FROM_ALL
target_include_directories(rpc_test PUBLIC ${PROJECT_SOURCE_DIR}/include) target_include_directories(rpc_test PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(rpc_test uriparser glog) target_link_libraries(rpc_test uriparser glog)
add_executable(protocol_unit EXCLUDE_FROM_ALL
./tests.cpp
./protocol_unit.cpp
)
target_include_directories(protocol_unit PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(protocol_unit glog)
add_executable(socket_unit EXCLUDE_FROM_ALL
./tests.cpp
./socket_unit.cpp
)
target_include_directories(socket_unit PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(socket_unit uriparser glog)
add_executable(net_integration EXCLUDE_FROM_ALL
./tests.cpp
./net_integration.cpp
../src/socket.cpp
../src/dispatcher.cpp
../src/listener.cpp
../src/protocol.cpp
../src/net.cpp
)
target_include_directories(net_integration PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(net_integration uriparser glog)
add_executable(socket_test EXCLUDE_FROM_ALL add_executable(socket_test EXCLUDE_FROM_ALL
./tests.cpp ./tests.cpp
./net_raw.cpp ./net_raw.cpp
...@@ -25,5 +51,5 @@ target_include_directories(socket_test PUBLIC ${PROJECT_SOURCE_DIR}/include) ...@@ -25,5 +51,5 @@ target_include_directories(socket_test PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(socket_test uriparser glog) target_link_libraries(socket_test uriparser glog)
add_custom_target(tests) add_custom_target(tests)
add_dependencies(tests rpc_test socket_test) add_dependencies(tests rpc_test socket_test socket_unit protocol_unit net_integration)
#include "catch.hpp"
#include <ftl/net.hpp>
#include <ftl/net/socket.hpp>
#include <ftl/net/listener.hpp>
#include <memory>
using ftl::net::Socket;
using ftl::net::Protocol;
using std::shared_ptr;
TEST_CASE("Net Integration", "[integrate]") {
std::string data;
Protocol p(143);
p.bind("add", [](int a, int b) {
return a + b;
});
p.bind(100, [&data](uint32_t m, Socket &s) {
s.read(data);
});
auto l = ftl::net::listen("tcp://localhost:9000");
REQUIRE( l->isListening() );
l->setProtocol(&p);
shared_ptr<Socket> s1;
l->onConnection([&s1](auto &s) { s1 = s; });
shared_ptr<Socket> s2 = ftl::net::connect("tcp://localhost:9000");
ftl::net::wait(); // TODO, make setProtocol block until handshake complete
ftl::net::wait();
REQUIRE( s1 != nullptr );
REQUIRE( s2 != nullptr );
REQUIRE( s1->isConnected() );
REQUIRE( s2->isConnected() );
REQUIRE( s1->call<int>("add", 5, 6) == 11 );
REQUIRE( s2->call<int>("add", 10, 5) == 15);
s1->send(100, "hello world");
ftl::net::wait();
// TODO s2->wait(100);
REQUIRE( data == "hello world" );
}
...@@ -295,75 +295,3 @@ TEST_CASE("net::listen()", "[net]") { ...@@ -295,75 +295,3 @@ TEST_CASE("net::listen()", "[net]") {
} }
} }
TEST_CASE("Socket.bind(int)", "[net]") {
// Need a fake server...
init_server();
shared_ptr<Socket> sock = ftl::net::connect("tcp://127.0.0.1:7077");
REQUIRE(sock->isValid());
accept_connection();
ftl::net::wait(); // Wait for handshake
REQUIRE(sock->isConnected());
SECTION("small valid message") {
send_json(1, "{message: \"Hello\"}");
bool msg = false;
sock->bind(1, [&](Socket &s, const std::string &data) {
REQUIRE(data == "{message: \"Hello\"}");
msg = true;
});
ftl::net::wait();
REQUIRE(msg);
}
SECTION("empty message") {
send_json(1, "");
bool msg = false;
sock->bind(1, [&](Socket &s, const std::string &data) {
REQUIRE(data == "");
msg = true;
});
ftl::net::wait();
REQUIRE(msg);
}
SECTION("multiple valid messages") {
send_json(1, "{message: \"Hello\"}");
send_json(1, "{test: \"world\"}");
int msg = 0;
sock->bind(1, [&](Socket &s, const std::string &data) {
if (msg == 0) REQUIRE(data == "{message: \"Hello\"}");
else REQUIRE(data == "{test: \"world\"}");
msg++;
});
ftl::net::wait(); // MSG 1
ftl::net::wait(); // MSG 2
REQUIRE(msg == 2);
}
SECTION("disconnected does not get message") {
send_json(1, "world");
bool msg = false;
sock->bind(1, [&](Socket &s, const std::string &data) {
msg = true;
});
sock->close();
ftl::net::wait();
REQUIRE(!msg);
}
fin_server();
}
#include "catch.hpp"
#include <ftl/net/protocol.hpp>
using ftl::net::Protocol;
// --- Mock --------------------------------------------------------------------
#define _FTL_NET_SOCKET_HPP_ // Prevent include
namespace ftl {
namespace net {
class Socket {
public:
std::string getURI() { return "mock://"; }
int send(int msg, const std::string &d) { return 0; }
};
};
};
using ftl::net::Socket;
class MockProtocol : public Protocol {
public:
MockProtocol() : Protocol(33) {}
void mock_dispatchRPC(Socket &s, const std::string &d) { dispatchRPC(s,d); }
void mock_dispatchReturn(Socket &s, const std::string &d) { dispatchReturn(s,d); }
void mock_dispatchRaw(uint32_t msg, Socket &s) { dispatchRaw(msg,s); }
};
// --- Support -----------------------------------------------------------------
// --- Files to test -----------------------------------------------------------
#include "../src/protocol.cpp"
#include "../src/dispatcher.cpp"
// --- Tests -------------------------------------------------------------------
TEST_CASE("Protocol::bind(int,...)", "[proto]") {
MockProtocol p;
Socket s;
SECTION("a valid bind and dispatch") {
bool msg = false;
p.bind(5, [&](uint32_t m, Socket &s) {
msg = true;
});
p.mock_dispatchRaw(5, s);
REQUIRE(msg);
}
SECTION("an invalid dispatch") {
bool msg = false;
p.bind(5, [&](uint32_t m, Socket &s) {
msg = true;
});
p.mock_dispatchRaw(6, s);
REQUIRE( !msg );
// TODO How is failure reported?
}
}
TEST_CASE("Protocol::bind(string,...)", "[proto]") {
MockProtocol p;
Socket s;
SECTION("no argument bind with valid dispatch") {
bool called = false;
p.bind("test1", [&]() {
called = true;
});
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
p.mock_dispatchRPC(s, buf.str());
REQUIRE( called );
}
SECTION("multiple bindings") {
bool called1 = false;
bool called2 = false;
p.bind("test1", [&]() {
called1 = true;
});
p.bind("test2", [&]() {
called2 = true;
});
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test2",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
p.mock_dispatchRPC(s, buf.str());
REQUIRE( !called1 );
REQUIRE( called2 );
}
SECTION("one argument bind with valid dispatch") {
bool called = false;
p.bind("test1", [&](int a) {
called = true;
REQUIRE( a == 5 );
});
auto args_obj = std::make_tuple(5);
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
p.mock_dispatchRPC(s, buf.str());
REQUIRE( called );
}
SECTION("two argument bind fake dispatch") {
bool called = false;
p.bind("test1", [&](int a, float b) {
called = true;
REQUIRE( a == 5 );
REQUIRE( b == 5.4f ); // Danger
});
auto args_obj = std::make_tuple(5, 5.4f);
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
p.mock_dispatchRPC(s, buf.str());
REQUIRE( called );
}
SECTION("non-void bind no arguments") {
bool called = false;
p.bind("test1", [&]() -> int {
called = true;
return 55;
});
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
p.mock_dispatchRPC(s, buf.str());
REQUIRE( called );
// TODO Require that a writev occurred with result value
}
}
#include "catch.hpp"
#include <iostream>
#include <memory>
#include <map>
#include <ftl/net/protocol.hpp>
#include <ftl/net/socket.hpp>
using ftl::net::Socket;
// --- Mock --------------------------------------------------------------------
class MockSocket : public Socket {
public:
MockSocket() : Socket(0) {}
void mock_data() { data(); }
};
static std::string last_rpc;
void ftl::net::Protocol::dispatchRPC(Socket &s, const std::string &d) {
last_rpc = d;
// This should send a return value
}
void ftl::net::Protocol::dispatchRaw(uint32_t service, Socket &s) {
}
ftl::net::Protocol::Protocol(uint64_t id) {
}
ftl::net::Protocol::~Protocol() {
}
ftl::net::Protocol *ftl::net::Protocol::find(uint64_t p) {
return NULL;
}
// --- Support -----------------------------------------------------------------
static std::map<int, std::string> fakedata;
void fake_send(int sd, uint32_t service, const std::string &data) {
//std::cout << "HEX SEND: " << hexStr(data) << std::endl;
char buf[8+data.size()];
ftl::net::Header *h = (ftl::net::Header*)&buf;
h->size = data.size()+4;
h->service = service;
std::memcpy(&buf[8],data.data(),data.size());
fakedata[sd] = std::string(&buf[0], 8+data.size());
//std::cout << "HEX SEND2: " << hexStr(fakedata[sd]) << std::endl;
}
extern ssize_t recv(int sd, void *buf, size_t n, int f) {
if (fakedata.count(sd) == 0) {
std::cout << "Unrecognised socket" << std::endl;
return 0;
}
int l = fakedata[sd].size();
std::memcpy(buf, fakedata[sd].c_str(), l);
fakedata.erase(sd);
return l;
}
extern ssize_t writev(int sd, const struct iovec *v, int cnt) {
// TODO Use count incase more sources exist...
size_t len = v[0].iov_len+v[1].iov_len;
char buf[len];
std::memcpy(&buf[0],v[0].iov_base,v[0].iov_len);
std::memcpy(&buf[v[0].iov_len],v[1].iov_base,v[1].iov_len);
fakedata[sd] = std::string(&buf[0], len);
return 0;
}
static std::function<void()> waithandler;
namespace ftl {
namespace net {
bool wait() {
if (waithandler) waithandler();
//waithandler = nullptr;
return true;
}
};
};
// --- Files to test -----------------------------------------------------------
#include "../src/socket.cpp"
// --- Tests -------------------------------------------------------------------
using ftl::net::Protocol;
TEST_CASE("Socket::call()", "[rpc]") {
MockSocket s;
SECTION("no argument call") {
waithandler = [&]() {
// Read fakedata sent
// TODO Validate data
// Do a fake send
auto res_obj = std::make_tuple(1,0,msgpack::object(),66);
std::stringstream buf;
msgpack::pack(buf, res_obj);
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s.mock_data();
};
int res = s.call<int>("test1");
REQUIRE( res == 66 );
}
SECTION("one argument call") {
waithandler = [&]() {
// Read fakedata sent
// TODO Validate data
// Do a fake send
auto res_obj = std::make_tuple(1,1,msgpack::object(),43);
std::stringstream buf;
msgpack::pack(buf, res_obj);
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s.mock_data();
};
int res = s.call<int>("test1", 78);
REQUIRE( res == 43 );
}
waithandler = nullptr;
}
TEST_CASE("Socket receive RPC", "[rpc]") {
MockSocket s;
auto p = new Protocol(444);
s.setProtocol(p);
SECTION("no argument call") {
// Do a fake send
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
fake_send(0, FTL_PROTOCOL_RPC, buf.str());
s.mock_data(); // Force it to read the fake send...
REQUIRE( last_rpc == buf.str() );
}
SECTION("one argument call") {
// Do a fake send
auto args_obj = std::make_tuple(55);
auto call_obj = std::make_tuple(0,0,"test2",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
fake_send(0, FTL_PROTOCOL_RPC, buf.str());
s.mock_data(); // Force it to read the fake send...
REQUIRE( last_rpc == buf.str() );
}
}
TEST_CASE("Socket::read()", "[io]") {
MockSocket s;
SECTION("read an int") {
int i = 99;
fake_send(0, 100, std::string((char*)&i,4));
i = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( s.size() == sizeof(int) );
REQUIRE( s.read(i) == sizeof(int) );
REQUIRE( i == 99 );
}
SECTION("read two ints") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( s.size() == 2*sizeof(int) );
REQUIRE( s.read(&i,2) == 2*sizeof(int) );
REQUIRE( i[0] == 99 );
REQUIRE( i[1] == 101 );
}
SECTION("multiple reads") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( s.read(&i[0],1) == sizeof(int) );
REQUIRE( i[0] == 99 );
REQUIRE( s.read(&i[1],1) == sizeof(int) );
REQUIRE( i[1] == 101 );
}
SECTION("read a string") {
std::string str;
fake_send(0, 100, std::string("hello world"));
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( s.size() == 11 );
REQUIRE( s.read(str) == 11 );
REQUIRE( str == "hello world" );
}
SECTION("read into existing string") {
std::string str;
str.reserve(11);
void *ptr = str.data();
fake_send(0, 100, std::string("hello world"));
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( s.size() == 11 );
REQUIRE( s.read(str) == 11 );
REQUIRE( str == "hello world" );
REQUIRE( str.data() == ptr );
}
SECTION("read too much data") {
int i = 99;
fake_send(0, 100, std::string((char*)&i,4));
i = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( s.size() == sizeof(int) );
REQUIRE( s.read(&i,2) == sizeof(int) );
REQUIRE( i == 99 );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment