diff --git a/net/cpp/include/ftl/net/dispatcher.hpp b/net/cpp/include/ftl/net/dispatcher.hpp index 46505835223ad64419d6d1db95676e4888150206..ba240b8c25c8989705b80e76108289e142f79e42 100644 --- a/net/cpp/include/ftl/net/dispatcher.hpp +++ b/net/cpp/include/ftl/net/dispatcher.hpp @@ -15,6 +15,7 @@ #include <vector> #include <string> #include <unordered_map> +#include <optional> namespace ftl { @@ -46,7 +47,7 @@ class Peer; class Dispatcher { public: - Dispatcher() {} + explicit Dispatcher(Dispatcher *parent=nullptr) : parent_(parent) {} //void dispatch(Peer &, const std::string &msg); void dispatch(Peer &, const msgpack::object &msg); @@ -134,8 +135,11 @@ class Dispatcher { std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>; private: + Dispatcher *parent_; std::unordered_map<std::string, adaptor_type> funcs_; + std::optional<adaptor_type> _locateHandler(const std::string &name) const; + static void enforce_arg_count(std::string const &func, std::size_t found, std::size_t expected); diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 9b75261896fe524e85ccf0e142d6c6e9224de14a..0b9b812288b06ebca15f2b9ba23e0d22c72ca664 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -197,7 +197,6 @@ class Peer { int sock_; ftl::URI::scheme_t scheme_; uint32_t version_; - bool destroy_disp_; // Receive buffers bool is_waiting_; diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp index 9d893af850790f2e7baf626a9b18ed6cff9d8ffe..1e71b9b9d548cadc4596f90593ca043ff552d14b 100644 --- a/net/cpp/src/dispatcher.cpp +++ b/net/cpp/src/dispatcher.cpp @@ -8,6 +8,7 @@ using ftl::net::Peer; using ftl::net::Dispatcher; using std::vector; using std::string; +using std::optional; /*static std::string hexStr(const std::string &s) { @@ -100,6 +101,19 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { } } +optional<Dispatcher::adaptor_type> ftl::net::Dispatcher::_locateHandler(const std::string &name) const { + auto it_func = funcs_.find(name); + if (it_func == end(funcs_)) { + if (parent_ != nullptr) { + return parent_->_locateHandler(name); + } else { + return {}; + } + } else { + return it_func->second; + } +} + void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) { notification_t the_call; msg.convert(the_call); @@ -113,11 +127,11 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); - auto it_func = funcs_.find(name); + auto binding = _locateHandler(name); - if (it_func != end(funcs_)) { + if (binding) { try { - auto result = (it_func->second)(args); + auto result = (*binding)(args); } catch (int e) { throw e; } @@ -137,6 +151,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_ void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) { auto pos = funcs_.find(func); if (pos != end(funcs_)) { + LOG(ERROR) << "RPC non unique binding for " << func; throw -1; } } diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index a5ea6ed887090f9dd439e4f7511a8fbc8238e454..bc969f91bc85afef40c4f0f7cf4c673ab7fe0149 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -130,13 +130,7 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; _updateURI(); - if (d != nullptr) { - disp_ = d; - destroy_disp_ = false; - } else { - disp_ = new Dispatcher(); - destroy_disp_ = true; - } + disp_ = new Dispatcher(d); is_waiting_ = true; @@ -166,13 +160,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { status_ = kInvalid; sock_ = INVALID_SOCKET; - if (d != nullptr) { - disp_ = d; - destroy_disp_ = false; - } else { - disp_ = new Dispatcher(); - destroy_disp_ = true; - } + disp_ = new Dispatcher(d); scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -318,13 +306,11 @@ void Peer::data() { bool Peer::_data() { //std::unique_lock<std::mutex> lk(recv_mtx_); - - std::cout << "BEGIN DATA" << std::endl; + recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); if (rc < 0) { - std::cout << "ERR = " << std::to_string(errno) << std::endl; return false; } @@ -541,9 +527,7 @@ int Peer::_send() { Peer::~Peer() { close(); - - if (destroy_disp_) { - delete disp_; - } + + delete disp_; } diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp index c0691a803f3ea897334c02bebb577edfac5854ae..cc2d581d01baba343f170df30562ad8f4027c66a 100644 --- a/net/cpp/test/net_integration.cpp +++ b/net/cpp/test/net_integration.cpp @@ -5,6 +5,8 @@ #include <chrono> using ftl::net::Universe; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; // --- Support ----------------------------------------------------------------- @@ -19,7 +21,7 @@ TEST_CASE("Universe::connect()", "[net]") { SECTION("valid tcp connection using ipv4") { REQUIRE( b.connect("tcp://127.0.0.1:7077") ); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sleep_for(milliseconds(100)); REQUIRE( a.numberOfPeers() == 1 ); REQUIRE( b.numberOfPeers() == 1 ); @@ -28,7 +30,7 @@ TEST_CASE("Universe::connect()", "[net]") { SECTION("valid tcp connection using hostname") { REQUIRE( b.connect("tcp://localhost:7077") ); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sleep_for(milliseconds(100)); REQUIRE( a.numberOfPeers() == 1 ); REQUIRE( b.numberOfPeers() == 1 ); @@ -37,7 +39,7 @@ TEST_CASE("Universe::connect()", "[net]") { SECTION("invalid protocol") { REQUIRE( !b.connect("http://127.0.0.1:7077") ); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sleep_for(milliseconds(100)); REQUIRE( a.numberOfPeers() == 0 ); REQUIRE( b.numberOfPeers() == 0 ); @@ -84,12 +86,12 @@ TEST_CASE("Universe::broadcast()", "[net]") { b.broadcast("done"); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sleep_for(milliseconds(100)); } SECTION("no arguments to one peer") { b.connect("tcp://localhost:7077"); - while (a.numberOfPeers() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); bool done = false; a.bind("hello", [&done]() { @@ -98,10 +100,51 @@ TEST_CASE("Universe::broadcast()", "[net]") { b.broadcast("hello"); - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + sleep_for(milliseconds(100)); REQUIRE( done ); } + + SECTION("one argument to one peer") { + b.connect("tcp://localhost:7077"); + while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + + int done = 0; + a.bind("hello", [&done](int v) { + done = v; + }); + + b.broadcast("hello", 676); + + sleep_for(milliseconds(100)); + + REQUIRE( done == 676 ); + } + + SECTION("one argument to two peers") { + Universe c("ftl://utu.fi"); + + b.connect("tcp://localhost:7077"); + c.connect("tcp://localhost:7077"); + while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); + + int done1 = 0; + b.bind("hello", [&done1](int v) { + done1 = v; + }); + + int done2 = 0; + c.bind("hello", [&done2](int v) { + done2 = v; + }); + + a.broadcast("hello", 676); + + sleep_for(milliseconds(100)); + + REQUIRE( done1 == 676 ); + REQUIRE( done2 == 676 ); + } } /*TEST_CASE("net::listen()", "[net]") {