Skip to content
Snippets Groups Projects
Commit 9c99eed5 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Support virtual caller to allow return type extraction

parent 2e607bb6
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@ set(ftl_VERSION_MAJOR "1")
set(ftl_VERSION_MINOR "0")
set(ftl_VERSION_PATCH "0")
set(CMAKE_CXX_FLAGS "-pthread -fopenmp -std=c++14 -Wall -Wno-deprecated -Werror -Wno-psabi")
set(CMAKE_CXX_FLAGS "-pthread -fopenmp -std=c++17 -Wall -Wno-deprecated -Werror -Wno-psabi")
set(CMAKE_CXX_FLAGS_DEBUG "-D_DEBUG -pg -Wall -Werror")
set(CMAKE_CXX_FLAGS_RELEASE "-O3")
......
......@@ -14,6 +14,9 @@ namespace internal {
template<typename T>
using invoke = typename T::type;
template <typename T, typename... ARGS>
struct first_type { typedef T type; };
template<typename T, T I>
struct constant : std::integral_constant<T, I> {};
......
......@@ -18,10 +18,26 @@
#endif
#include <sstream>
#include <type_traits>
namespace ftl {
namespace net {
struct virtual_caller {
virtual void operator()(msgpack::object &o)=0;
};
template <typename T>
struct caller : virtual_caller {
caller(std::function<void(const T&)> f) : f_(f) {};
void operator()(msgpack::object &o) { T r = o.as<T>(); f_(r); };
std::function<void(const T&)> f_;
};
/**
* A single socket connection object, to be constructed using the connect()
* function and not to be created directly.
*/
class Socket {
public:
Socket(const char *uri);
......@@ -30,82 +46,62 @@ class Socket {
int close();
int send(uint32_t service, const std::string &data);
int send(uint32_t service, std::stringstream &data) {
return send(service, data.str()); };
int send(uint32_t service, void *data, int length);
int send2(uint32_t service, const std::string &data1,
const std::string &data2);
//friend bool ftl::net::run(bool);
/**
* Get the internal OS dependent socket.
*/
int _socket() const { return sock_; };
bool isConnected() const { return sock_ != INVALID_SOCKET && connected_; };
bool isValid() const { return valid_ && sock_ != INVALID_SOCKET; };
/**
* Get the sockets protocol, address and port as a url string. This will be
* the same as the initial connection string on the client.
*/
std::string getURI() const { return uri_; };
/**
* Bind a function to a RPC call name.
* Bind a function to an RPC call name.
*/
template <typename F>
void bind(const std::string &name, F func) {
disp_.bind(name, func,
typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind());
}
void bind(const std::string &name, F func);
/**
* Bind a function to a raw message type.
*/
void bind(uint32_t service, std::function<void(Socket&,
const std::string&)> func);
/**
* Remote Procedure Call.
* Non-blocking Remote Procedure Call using a callback function.
*/
template <typename T, typename... ARGS>
T call(const std::string &name, ARGS... args) {
bool hasreturned = false;
T result;
async_call(name, [&result,&hasreturned](msgpack::object &r) {
hasreturned = true;
result = r.as<T>();
}, std::forward<ARGS>(args)...);
// Loop the network
int limit = 10;
while (limit > 0 && !hasreturned) {
limit--;
ftl::net::wait();
}
return result;
}
void asyncCall(const std::string &name,
std::function<void(const T&)> cb,
ARGS... args);
// TODO use "call" instead of "acall" causes compiler to loop.
template <typename... ARGS>
void async_call(
const std::string &name,
std::function<void(msgpack::object&)> cb,
ARGS... args) {
auto args_obj = std::make_tuple(args...);
auto rpcid = rpcid__++;
auto call_obj = std::make_tuple(0,rpcid,name,args_obj);
LOG(INFO) << "RPC " << name << "() -> " << uri_;
std::stringstream buf;
msgpack::pack(buf, call_obj);
// Register the CB
callbacks_[rpcid] = cb;
send(FTL_PROTOCOL_RPC, buf.str());
}
/**
* Blocking Remote Procedure Call.
*/
template <typename R, typename... ARGS>
R call(const std::string &name, ARGS... args);
/**
* Send data to given service number.
*/
int send(uint32_t service, const std::string &data);
/**
* Internal handlers for specific event types.
* Send with two distinct data source. Avoids the need to memcpy them to a
* single buffer.
*/
int send2(uint32_t service, const std::string &data1,
const std::string &data2);
/**
* Internal handlers for specific event types. This should be private but
* is current here for testing purposes.
* @{
*/
void dispatchRPC(const std::string &d) { disp_.dispatch(d); }
......@@ -120,23 +116,27 @@ class Socket {
bool data();
void error();
private: // Functions
void _connected();
void _updateURI();
private:
std::string uri_;
int sock_;
size_t pos_;
char *buffer_;
std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_;
std::vector<std::function<void(Socket&)>> connect_handlers_;
private: // Data
bool valid_;
bool connected_;
std::map<int, std::function<void(msgpack::object&)>> callbacks_;
ftl::net::Dispatcher disp_;
uint32_t version_;
int sock_;
size_t pos_;
char *buffer_;
std::string uri_;
std::string peerid_;
void _connected();
void _updateURI();
ftl::net::Dispatcher disp_;
std::map<uint32_t,std::function<void(Socket&,const std::string&)>> handlers_;
std::vector<std::function<void(Socket&)>> connect_handlers_;
std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
static int rpcid__;
......@@ -144,6 +144,55 @@ class Socket {
static const int BUFFER_SIZE = MAX_MESSAGE + 16;
};
// --- Inline Template Implementations -----------------------------------------
template <typename F>
void Socket::bind(const std::string &name, F func) {
disp_.bind(name, func,
typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind());
}
//template <typename T, typename... ARGS>
template <typename R, typename... ARGS>
R Socket::call(const std::string &name, ARGS... args) {
bool hasreturned = false;
R result;
asyncCall<R>(name, [&result,&hasreturned](const R &r) {
hasreturned = true;
result = r;
}, std::forward<ARGS>(args)...);
// Loop the network
int limit = 10;
while (limit > 0 && !hasreturned) {
limit--;
ftl::net::wait();
}
return result;
}
template <typename T, typename... ARGS>
void Socket::asyncCall(
const std::string &name,
std::function<void(const T&)> cb,
ARGS... args) {
auto args_obj = std::make_tuple(args...);
auto rpcid = rpcid__++;
auto call_obj = std::make_tuple(0,rpcid,name,args_obj);
LOG(INFO) << "RPC " << name << "() -> " << uri_;
std::stringstream buf;
msgpack::pack(buf, call_obj);
// Register the CB
callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
send(FTL_PROTOCOL_RPC, buf.str());
}
};
};
......
......@@ -128,7 +128,7 @@ Socket::Socket(int s) : sock_(s), pos_(0), disp_(this) {
_updateURI();
}
Socket::Socket(const char *pUri) : uri_(pUri), pos_(0), disp_(this) {
Socket::Socket(const char *pUri) : pos_(0), uri_(pUri), disp_(this) {
// Allocate buffer
buffer_ = new char[BUFFER_SIZE];
......@@ -340,7 +340,7 @@ void Socket::dispatchReturn(const std::string &d) {
if (callbacks_.count(id) > 0) {
LOG(INFO) << "Received return RPC value";
callbacks_[id](res);
(*callbacks_[id])(res);
callbacks_.erase(id);
} else {
LOG(ERROR) << "Missing RPC callback for result";
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment