Skip to content
Snippets Groups Projects
Commit e8d65b54 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Functioning and tested RPC bind and call

parent e6a94820
No related branches found
No related tags found
No related merge requests found
...@@ -33,9 +33,11 @@ namespace internal { ...@@ -33,9 +33,11 @@ namespace internal {
} }
namespace net { namespace net {
class Socket;
class Dispatcher { class Dispatcher {
public: public:
Dispatcher(Socket *s) : sock_(s) {}
void dispatch(const std::string &msg); void dispatch(const std::string &msg);
...@@ -115,8 +117,12 @@ class Dispatcher { ...@@ -115,8 +117,12 @@ class Dispatcher {
//! \brief This is the type of notification messages. //! \brief This is the type of notification messages.
using notification_t = std::tuple<int8_t, std::string, msgpack::object>; using notification_t = std::tuple<int8_t, std::string, msgpack::object>;
using response_t =
std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>;
private: private:
ftl::net::Socket *sock_;
std::unordered_map<std::string, adaptor_type> funcs_; std::unordered_map<std::string, adaptor_type> funcs_;
static void enforce_arg_count(std::string const &func, std::size_t found, static void enforce_arg_count(std::string const &func, std::size_t found,
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define _FTL_NET_PROTOCOL_HPP_ #define _FTL_NET_PROTOCOL_HPP_
#define FTL_PROTOCOL_RPC 0x0100 #define FTL_PROTOCOL_RPC 0x0100
#define FTL_PROTOCOL_RPCRETURN 0x0101
#define FTL_PROTOCOL_P2P 0x1000 #define FTL_PROTOCOL_P2P 0x1000
#endif // _FTL_NET_PROTOCOL_HPP_ #endif // _FTL_NET_PROTOCOL_HPP_
...@@ -21,6 +21,11 @@ ...@@ -21,6 +21,11 @@
namespace ftl { namespace ftl {
namespace net { namespace net {
struct Header {
uint32_t size;
uint32_t service;
};
class Socket { class Socket {
public: public:
Socket(const char *uri); Socket(const char *uri);
...@@ -49,13 +54,13 @@ class Socket { ...@@ -49,13 +54,13 @@ class Socket {
typename ftl::internal::func_kind_info<F>::args_kind()); typename ftl::internal::func_kind_info<F>::args_kind());
} }
template <typename... ARGS> template <typename T, typename... ARGS>
msgpack::object_handle call(const std::string &name, ARGS... args) { T call(const std::string &name, ARGS... args) {
bool hasreturned = false; bool hasreturned = false;
msgpack::object_handle result; T result;
async_call(name, [result,hasreturned](msgpack::object_handle r) { async_call(name, [&result,&hasreturned](msgpack::object &r) {
hasreturned = true; hasreturned = true;
result = r; result = r.as<T>();
}, std::forward<ARGS>(args)...); }, std::forward<ARGS>(args)...);
// Loop the network // Loop the network
...@@ -71,7 +76,7 @@ class Socket { ...@@ -71,7 +76,7 @@ class Socket {
template <typename... ARGS> template <typename... ARGS>
void async_call( void async_call(
const std::string &name, const std::string &name,
std::function<void(msgpack::object_handle)> cb, std::function<void(msgpack::object&)> cb,
ARGS... args) { ARGS... args) {
auto args_obj = std::make_tuple(args...); auto args_obj = std::make_tuple(args...);
auto rpcid = rpcid__++; auto rpcid = rpcid__++;
...@@ -107,7 +112,7 @@ class Socket { ...@@ -107,7 +112,7 @@ class Socket {
char *m_buffer; char *m_buffer;
sockdatahandler_t m_handler; sockdatahandler_t m_handler;
bool m_valid; bool m_valid;
std::map<int, std::function<void(msgpack::object_handle)>> callbacks_; std::map<int, std::function<void(msgpack::object&)>> callbacks_;
ftl::net::Dispatcher disp_; ftl::net::Dispatcher disp_;
static int rpcid__; static int rpcid__;
......
#include <ftl/net/dispatcher.hpp> #include <ftl/net/dispatcher.hpp>
#include <ftl/net/socket.hpp>
#include <iostream>
/*static std::string hexStr(const std::string &s)
{
const char *data = s.data();
int len = s.size();
std::stringstream ss;
ss << std::hex;
for(int i=0;i<len;++i)
ss << std::setw(2) << std::setfill('0') << (int)data[i];
return ss.str();
}*/
void ftl::net::Dispatcher::dispatch(const std::string &msg) { void ftl::net::Dispatcher::dispatch(const std::string &msg) {
//std::cout << "Received dispatch : " << hexStr(msg) << std::endl;
auto unpacked = msgpack::unpack(msg.data(), msg.size()); auto unpacked = msgpack::unpack(msg.data(), msg.size());
dispatch(unpacked.get()); dispatch(unpacked.get());
} }
...@@ -8,10 +22,11 @@ void ftl::net::Dispatcher::dispatch(const std::string &msg) { ...@@ -8,10 +22,11 @@ void ftl::net::Dispatcher::dispatch(const std::string &msg) {
void ftl::net::Dispatcher::dispatch(const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch(const msgpack::object &msg) {
switch (msg.via.array.size) { switch (msg.via.array.size) {
case 3: case 3:
dispatch_notification(msg); dispatch_notification(msg); break;
case 4: case 4:
dispatch_call(msg); dispatch_call(msg); break;
default: default:
std::cout << "Unrecognised msgpack : " << msg.via.array.size << std::endl;
return; return;
} }
} }
...@@ -24,7 +39,7 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { ...@@ -24,7 +39,7 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) {
// auto &&type = std::get<0>(the_call); // auto &&type = std::get<0>(the_call);
// assert(type == 0); // assert(type == 0);
// auto &&id = std::get<1>(the_call); auto &&id = std::get<1>(the_call);
auto &&name = std::get<2>(the_call); auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call); auto &&args = std::get<3>(the_call);
...@@ -32,8 +47,12 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) { ...@@ -32,8 +47,12 @@ void ftl::net::Dispatcher::dispatch_call(const msgpack::object &msg) {
if (it_func != end(funcs_)) { if (it_func != end(funcs_)) {
try { try {
auto result = (it_func->second)(args); auto result = (it_func->second)(args)->get();
// TODO SEND RESULTS auto res_obj = std::make_tuple(1,id,msgpack::object(),result);
std::stringstream buf;
msgpack::pack(buf, res_obj);
sock_->send(FTL_PROTOCOL_RPCRETURN, buf.str());
} catch (...) { } catch (...) {
throw; throw;
} }
...@@ -50,6 +69,8 @@ void ftl::net::Dispatcher::dispatch_notification(msgpack::object const &msg) { ...@@ -50,6 +69,8 @@ void ftl::net::Dispatcher::dispatch_notification(msgpack::object const &msg) {
auto &&name = std::get<1>(the_call); auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call); auto &&args = std::get<2>(the_call);
std::cout << "RPC NOTIFY" << name << std::endl;
auto it_func = funcs_.find(name); auto it_func = funcs_.find(name);
......
...@@ -25,6 +25,8 @@ using namespace ftl; ...@@ -25,6 +25,8 @@ using namespace ftl;
using ftl::net::Socket; using ftl::net::Socket;
using namespace std; using namespace std;
int Socket::rpcid__ = 0;
static int tcpConnect(URI &uri) { static int tcpConnect(URI &uri) {
int rc; int rc;
sockaddr_in destAddr; sockaddr_in destAddr;
...@@ -105,13 +107,13 @@ static int wsConnect(URI &uri) { ...@@ -105,13 +107,13 @@ static int wsConnect(URI &uri) {
return 1; return 1;
} }
Socket::Socket(int s) : m_sock(s), m_pos(0) { Socket::Socket(int s) : m_sock(s), m_pos(0), disp_(this) {
// TODO Get the remote address. // TODO Get the remote address.
m_valid = true; m_valid = true;
m_buffer = new char[BUFFER_SIZE]; m_buffer = new char[BUFFER_SIZE];
} }
Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0) { Socket::Socket(const char *pUri) : m_uri(pUri), m_pos(0), disp_(this) {
// Allocate buffer // Allocate buffer
m_buffer = new char[BUFFER_SIZE]; m_buffer = new char[BUFFER_SIZE];
...@@ -169,6 +171,7 @@ bool Socket::data() { ...@@ -169,6 +171,7 @@ bool Socket::data() {
while (m_pos < len+4) { while (m_pos < len+4) {
if (len > MAX_MESSAGE) { if (len > MAX_MESSAGE) {
close(); close();
std::cout << "Length is too big" << std::endl;
return false; // Prevent DoS return false; // Prevent DoS
} }
...@@ -185,8 +188,10 @@ bool Socket::data() { ...@@ -185,8 +188,10 @@ bool Socket::data() {
} }
} else if (rc == EWOULDBLOCK || rc == 0) { } else if (rc == EWOULDBLOCK || rc == 0) {
// Data not yet available // Data not yet available
std::cout << "No data to read" << std::endl;
return false; return false;
} else { } else {
std::cout << "Socket error" << std::endl;
// Close socket due to error // Close socket due to error
close(); close();
return false; return false;
...@@ -194,18 +199,55 @@ bool Socket::data() { ...@@ -194,18 +199,55 @@ bool Socket::data() {
} }
// All data available // All data available
if (m_handler) { //if (m_handler) {
uint32_t service = ((uint32_t*)m_buffer)[1]; uint32_t service = ((uint32_t*)m_buffer)[1];
auto d = std::string(m_buffer+8, len-4); auto d = std::string(m_buffer+8, len-4);
//std::cerr << "DATA : " << service << " -> " << d << std::endl; //std::cerr << "DATA : " << service << " -> " << d << std::endl;
m_handler(service, d);
} if (service == FTL_PROTOCOL_RPC) {
dispatch(d);
} else if (service == FTL_PROTOCOL_RPCRETURN) {
auto unpacked = msgpack::unpack(d.data(), d.size());
Dispatcher::response_t the_result;
unpacked.get().convert(the_result);
// TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call);
// assert(type == 0);
// auto &&id = std::get<1>(the_call);
auto &&id = std::get<1>(the_result);
//auto &&err = std::get<2>(the_result);
auto &&res = std::get<3>(the_result);
if (callbacks_.count(id) > 0) callbacks_[id](res);
else std::cout << "NO CALLBACK FOUND FOR RPC RESULT" << std::endl;
} else {
if (m_handler) m_handler(service, d);
}
//}
m_pos = 0; m_pos = 0;
return true; return true;
} }
int Socket::send(uint32_t service, const std::string &data) {
ftl::net::Header h;
h.size = data.size()+4;
h.service = service;
iovec vec[2];
vec[0].iov_base = &h;
vec[0].iov_len = sizeof(h);
vec[1].iov_base = const_cast<char*>(data.data());
vec[1].iov_len = data.size();
::writev(m_sock, &vec[0], 2);
return 0;
}
Socket::~Socket() { Socket::~Socket() {
close(); close();
......
add_executable(tests EXCLUDE_FROM_ALL include(CTest)
enable_testing()
add_executable(rpc_test EXCLUDE_FROM_ALL
./tests.cpp
./rpc.cpp
../src/dispatcher.cpp
../src/socket.cpp
)
target_include_directories(rpc_test PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(rpc_test uriparser)
add_executable(socket_test EXCLUDE_FROM_ALL
./tests.cpp ./tests.cpp
./net_raw.cpp ./net_raw.cpp
../src/net.cpp ../src/net.cpp
...@@ -7,9 +19,11 @@ add_executable(tests EXCLUDE_FROM_ALL ...@@ -7,9 +19,11 @@ add_executable(tests EXCLUDE_FROM_ALL
./ice.cpp ./ice.cpp
../src/ice.cpp ../src/ice.cpp
./uri.cpp ./uri.cpp
./rpc.cpp
../src/dispatcher.cpp ../src/dispatcher.cpp
) )
target_include_directories(tests PUBLIC ${PROJECT_SOURCE_DIR}/include) target_include_directories(socket_test PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(tests uriparser) target_link_libraries(socket_test uriparser)
add_custom_target(tests)
add_dependencies(tests rpc_test socket_test)
#include "catch.hpp" #include "catch.hpp"
#include <ftl/net/socket.hpp> #include <ftl/net/socket.hpp>
#include <iostream> #include <iostream>
#include <memory>
#include <map>
/*struct FakeHeader {
uint32_t size;
uint32_t service;
};*/
static std::map<int, std::string> fakedata;
/*static std::string hexStr(const std::string &s)
{
const char *data = s.data();
int len = s.size();
std::stringstream ss;
ss << std::hex;
for(int i=0;i<len;++i)
ss << std::setw(2) << std::setfill('0') << (int)data[i];
return ss.str();
}*/
void fake_send(int sd, uint32_t service, const std::string &data) {
//std::cout << "HEX SEND: " << hexStr(data) << std::endl;
char buf[8+data.size()];
ftl::net::Header *h = (ftl::net::Header*)&buf;
h->size = data.size()+4;
h->service = service;
std::memcpy(&buf[8],data.data(),data.size());
fakedata[sd] = std::string(&buf[0], 8+data.size());
//std::cout << "HEX SEND2: " << hexStr(fakedata[sd]) << std::endl;
}
extern ssize_t recv(int sd, void *buf, size_t n, int f) {
if (fakedata.count(sd) == 0) {
std::cout << "Unrecognised socket" << std::endl;
return 0;
}
int l = fakedata[sd].size();
std::memcpy(buf, fakedata[sd].c_str(), l);
fakedata.erase(sd);
return l;
}
extern ssize_t writev(int sd, const struct iovec *v, int cnt) {
size_t len = v[0].iov_len+v[1].iov_len;
char buf[len];
std::memcpy(&buf[0],v[0].iov_base,v[0].iov_len);
std::memcpy(&buf[v[0].iov_len],v[1].iov_base,v[1].iov_len);
fakedata[sd] = std::string(&buf[0], len);
return 0;
}
static std::function<void()> waithandler;
bool ftl::net::wait() {
if (waithandler) waithandler();
//waithandler = nullptr;
return true;
}
TEST_CASE("Socket::bind()", "[rpc]") { TEST_CASE("Socket::bind()", "[rpc]") {
SECTION("no argument bind") { SECTION("no argument bind fake dispatch") {
auto s = new ftl::net::Socket(0); auto s = new ftl::net::Socket(0);
bool called = false; bool called = false;
...@@ -20,7 +82,7 @@ TEST_CASE("Socket::bind()", "[rpc]") { ...@@ -20,7 +82,7 @@ TEST_CASE("Socket::bind()", "[rpc]") {
REQUIRE( called ); REQUIRE( called );
} }
SECTION("one argument bind") { SECTION("one argument bind fake dispatch") {
auto s = new ftl::net::Socket(0); auto s = new ftl::net::Socket(0);
bool called = false; bool called = false;
...@@ -38,7 +100,7 @@ TEST_CASE("Socket::bind()", "[rpc]") { ...@@ -38,7 +100,7 @@ TEST_CASE("Socket::bind()", "[rpc]") {
REQUIRE( called ); REQUIRE( called );
} }
SECTION("two argument bind") { SECTION("two argument bind fake dispatch") {
auto s = new ftl::net::Socket(0); auto s = new ftl::net::Socket(0);
bool called = false; bool called = false;
...@@ -56,5 +118,108 @@ TEST_CASE("Socket::bind()", "[rpc]") { ...@@ -56,5 +118,108 @@ TEST_CASE("Socket::bind()", "[rpc]") {
s->dispatch(buf.str()); s->dispatch(buf.str());
REQUIRE( called ); REQUIRE( called );
} }
SECTION("no argument bind fake data") {
auto s = new ftl::net::Socket(0);
bool called = false;
s->bind("test1", [&]() {
called = true;
});
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
fake_send(0, FTL_PROTOCOL_RPC, buf.str());
REQUIRE( s->data() );
REQUIRE( called );
}
SECTION("non-void bind fake data") {
auto s = new ftl::net::Socket(0);
bool called = false;
s->bind("test1", [&]() -> int {
called = true;
return 55;
});
auto args_obj = std::make_tuple();
auto call_obj = std::make_tuple(0,0,"test1",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
fake_send(0, FTL_PROTOCOL_RPC, buf.str());
REQUIRE( s->data() );
REQUIRE( called );
// TODO Require that a writev occurred with result value
}
}
TEST_CASE("Socket::call()", "[rpc]") {
SECTION("no argument call") {
auto s = new ftl::net::Socket(0);
waithandler = [&]() {
// Read fakedata sent
// TODO Validate data
// Do a fake send
auto res_obj = std::make_tuple(1,0,msgpack::object(),66);
std::stringstream buf;
msgpack::pack(buf, res_obj);
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s->data();
};
int res = s->call<int>("test1");
REQUIRE( res == 66 );
}
SECTION("one argument call") {
auto s = new ftl::net::Socket(0);
waithandler = [&]() {
// Read fakedata sent
// TODO Validate data
// Do a fake send
auto res_obj = std::make_tuple(1,1,msgpack::object(),43);
std::stringstream buf;
msgpack::pack(buf, res_obj);
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s->data();
};
int res = s->call<int>("test1", 78);
REQUIRE( res == 43 );
}
waithandler = nullptr;
}
TEST_CASE("Socket::call+bind loop", "[rpc]") {
auto s = new ftl::net::Socket(0);
// Just loop the send back to the recv
waithandler = [&]() {
s->data();
};
s->bind("test1", [](int a) -> int {
std::cout << "Bind test1 called" << std::endl;
return a*2;
});
int res = s->call<int>("test1", 5);
REQUIRE( res == 10 );
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment