Newer
Older
#define GLOG_NO_ABBREVIATED_SEVERITIES
#include <ftl/net/peer.hpp>
using ftl::net::Peer;
using ftl::net::Dispatcher;
using std::vector;
using std::string;
/*static std::string hexStr(const std::string &s)
{
const char *data = s.data();
int len = s.size();
std::stringstream ss;
ss << std::hex;
for(int i=0;i<len;++i)
ss << std::setw(2) << std::setfill('0') << (int)data[i];
return ss.str();
}*/
//void ftl::net::Dispatcher::dispatch(Peer &s, const std::string &msg) {
//std::cout << "Received dispatch : " << hexStr(msg) << std::endl;
// auto unpacked = msgpack::unpack(msg.data(), msg.size());
// dispatch(s, unpacked.get());
//}
vector<string> Dispatcher::getBindings() const {
vector<string> res;
for (auto x : funcs_) {
res.push_back(x.first);
}
return res;
}
void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) {
dispatch_notification(s, msg); break;
dispatch_call(s, msg); break;
LOG(ERROR) << "Unrecognised msgpack : " << msg.via.array.size;
void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
try {
msg.convert(the_call);
} catch(...) {
LOG(ERROR) << "Bad message format";
return;
}
// TODO: proper validation of protocol (and responding to it)
auto &&type = std::get<0>(the_call);
auto &&id = std::get<1>(the_call);
auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call);
// assert(type == 0);
if (type == 1) {
LOG(INFO) << "RPC return for " << id;
s._dispatchResponse(id, args);
} else if (type == 0) {
LOG(INFO) << "RPC " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name);
if (it_func != end(funcs_)) {
try {
auto result = (it_func->second)(args); //->get();
s._sendResponse(id, result->get());
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
} catch (const std::exception &e) {
//throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
} catch (int e) {
//throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
}
} else {
// TODO(nick) Some error
}
void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) {
notification_t the_call;
msg.convert(the_call);
// TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call);
// assert(type == static_cast<uint8_t>(request_type::notification));
auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call);
LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name);
if (it_func != end(funcs_)) {
try {
auto result = (it_func->second)(args);
} catch (int e) {
throw e;
} else {
LOG(ERROR) << "Missing handler for incoming message";
}
}
void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_t found,
std::size_t expected) {
if (found != expected) {
LOG(FATAL) << "RPC argument missmatch - " << found << " != " << expected;
throw -1;
}
}
void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) {
auto pos = funcs_.find(func);
if (pos != end(funcs_)) {