Skip to content
Snippets Groups Projects
Commit a19584e6 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Add dispatch chaining to allow peer specific and common bindings

parent 485d1bdb
Branches
Tags
No related merge requests found
Pipeline #9709 failed
......@@ -15,6 +15,7 @@
#include <vector>
#include <string>
#include <unordered_map>
#include <optional>
namespace ftl {
......@@ -46,7 +47,7 @@ class Peer;
class Dispatcher {
public:
Dispatcher() {}
explicit Dispatcher(Dispatcher *parent=nullptr) : parent_(parent) {}
//void dispatch(Peer &, const std::string &msg);
void dispatch(Peer &, const msgpack::object &msg);
......@@ -134,8 +135,11 @@ class Dispatcher {
std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>;
private:
Dispatcher *parent_;
std::unordered_map<std::string, adaptor_type> funcs_;
std::optional<adaptor_type> _locateHandler(const std::string &name) const;
static void enforce_arg_count(std::string const &func, std::size_t found,
std::size_t expected);
......
......@@ -197,7 +197,6 @@ class Peer {
int sock_;
ftl::URI::scheme_t scheme_;
uint32_t version_;
bool destroy_disp_;
// Receive buffers
bool is_waiting_;
......
......@@ -8,6 +8,7 @@ using ftl::net::Peer;
using ftl::net::Dispatcher;
using std::vector;
using std::string;
using std::optional;
/*static std::string hexStr(const std::string &s)
{
......@@ -100,6 +101,19 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
}
}
optional<Dispatcher::adaptor_type> ftl::net::Dispatcher::_locateHandler(const std::string &name) const {
auto it_func = funcs_.find(name);
if (it_func == end(funcs_)) {
if (parent_ != nullptr) {
return parent_->_locateHandler(name);
} else {
return {};
}
} else {
return it_func->second;
}
}
void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) {
notification_t the_call;
msg.convert(the_call);
......@@ -113,11 +127,11 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
auto it_func = funcs_.find(name);
auto binding = _locateHandler(name);
if (it_func != end(funcs_)) {
if (binding) {
try {
auto result = (it_func->second)(args);
auto result = (*binding)(args);
} catch (int e) {
throw e;
}
......@@ -137,6 +151,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_
void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) {
auto pos = funcs_.find(func);
if (pos != end(funcs_)) {
LOG(ERROR) << "RPC non unique binding for " << func;
throw -1;
}
}
......
......@@ -130,13 +130,7 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting;
_updateURI();
if (d != nullptr) {
disp_ = d;
destroy_disp_ = false;
} else {
disp_ = new Dispatcher();
destroy_disp_ = true;
}
disp_ = new Dispatcher(d);
is_waiting_ = true;
......@@ -166,13 +160,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
status_ = kInvalid;
sock_ = INVALID_SOCKET;
if (d != nullptr) {
disp_ = d;
destroy_disp_ = false;
} else {
disp_ = new Dispatcher();
destroy_disp_ = true;
}
disp_ = new Dispatcher(d);
scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) {
......@@ -318,13 +306,11 @@ void Peer::data() {
bool Peer::_data() {
//std::unique_lock<std::mutex> lk(recv_mtx_);
std::cout << "BEGIN DATA" << std::endl;
recv_buf_.reserve_buffer(kMaxMessage);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
if (rc < 0) {
std::cout << "ERR = " << std::to_string(errno) << std::endl;
return false;
}
......@@ -541,9 +527,7 @@ int Peer::_send() {
Peer::~Peer() {
close();
if (destroy_disp_) {
delete disp_;
}
delete disp_;
}
......@@ -5,6 +5,8 @@
#include <chrono>
using ftl::net::Universe;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
// --- Support -----------------------------------------------------------------
......@@ -19,7 +21,7 @@ TEST_CASE("Universe::connect()", "[net]") {
SECTION("valid tcp connection using ipv4") {
REQUIRE( b.connect("tcp://127.0.0.1:7077") );
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sleep_for(milliseconds(100));
REQUIRE( a.numberOfPeers() == 1 );
REQUIRE( b.numberOfPeers() == 1 );
......@@ -28,7 +30,7 @@ TEST_CASE("Universe::connect()", "[net]") {
SECTION("valid tcp connection using hostname") {
REQUIRE( b.connect("tcp://localhost:7077") );
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sleep_for(milliseconds(100));
REQUIRE( a.numberOfPeers() == 1 );
REQUIRE( b.numberOfPeers() == 1 );
......@@ -37,7 +39,7 @@ TEST_CASE("Universe::connect()", "[net]") {
SECTION("invalid protocol") {
REQUIRE( !b.connect("http://127.0.0.1:7077") );
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sleep_for(milliseconds(100));
REQUIRE( a.numberOfPeers() == 0 );
REQUIRE( b.numberOfPeers() == 0 );
......@@ -84,12 +86,12 @@ TEST_CASE("Universe::broadcast()", "[net]") {
b.broadcast("done");
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sleep_for(milliseconds(100));
}
SECTION("no arguments to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20));
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
bool done = false;
a.bind("hello", [&done]() {
......@@ -98,10 +100,51 @@ TEST_CASE("Universe::broadcast()", "[net]") {
b.broadcast("hello");
std::this_thread::sleep_for(std::chrono::milliseconds(200));
sleep_for(milliseconds(100));
REQUIRE( done );
}
SECTION("one argument to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
int done = 0;
a.bind("hello", [&done](int v) {
done = v;
});
b.broadcast("hello", 676);
sleep_for(milliseconds(100));
REQUIRE( done == 676 );
}
SECTION("one argument to two peers") {
Universe c("ftl://utu.fi");
b.connect("tcp://localhost:7077");
c.connect("tcp://localhost:7077");
while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
int done1 = 0;
b.bind("hello", [&done1](int v) {
done1 = v;
});
int done2 = 0;
c.bind("hello", [&done2](int v) {
done2 = v;
});
a.broadcast("hello", 676);
sleep_for(milliseconds(100));
REQUIRE( done1 == 676 );
REQUIRE( done2 == 676 );
}
}
/*TEST_CASE("net::listen()", "[net]") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment