diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 16cad5138518d010df7b64a8d5b27ad6b091e27d..a7756fbdcfbced330fe1dc341d13e30078b50c36 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -20,6 +20,10 @@ #include <tuple> #include <vector> #include <type_traits> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <chrono> # define ENABLE_IF(...) \ typename std::enable_if<(__VA_ARGS__), bool>::type = true @@ -56,6 +60,7 @@ struct decrypt{};*/ class Peer { public: friend class Universe; + friend class Dispatcher; enum Status { kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting @@ -143,6 +148,9 @@ class Peer { void socketError(); // Process one error from socket void error(int e); + void _dispatchResponse(uint32_t id, msgpack::object &obj); + void _sendResponse(uint32_t id, const msgpack::object &obj); + /** * Get the internal OS dependent socket. * TODO(nick) Work out if this should be private. @@ -161,7 +169,6 @@ class Peer { private: // Functions void _connected(); void _updateURI(); - void _dispatchReturn(const std::string &d); int _send(); @@ -228,17 +235,26 @@ void Peer::bind(const std::string &name, F func) { template <typename R, typename... ARGS> R Peer::call(const std::string &name, ARGS... args) { bool hasreturned = false; + std::mutex m; + std::condition_variable cv; + R result; - asyncCall<R>(name, [&result,&hasreturned](const R &r) { + asyncCall<R>(name, [&](const R &r) { + std::unique_lock<std::mutex> lk(m); hasreturned = true; result = r; + lk.unlock(); + cv.notify_one(); }, std::forward<ARGS>(args)...); - // Loop the network - int limit = 10; - while (limit > 0 && !hasreturned) { - limit--; - // TODO REPLACE ftl::net::wait(); + { // Block thread until async callback notifies us + std::unique_lock<std::mutex> lk(m); + cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); + } + + if (!hasreturned) { + // TODO(nick) remove callback + throw 1; } return result; @@ -255,13 +271,12 @@ void Peer::asyncCall( LOG(INFO) << "RPC " << name << "() -> " << uri_; - std::stringstream buf; - msgpack::pack(buf, call_obj); + msgpack::pack(send_buf_, call_obj); // Register the CB callbacks_[rpcid] = std::make_unique<caller<T>>(cb); - send("__rpc__", buf.str()); + _send(); } }; diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp index 394307e1949297f6b5fd549b80307d64d975bf46..9d893af850790f2e7baf626a9b18ed6cff9d8ffe 100644 --- a/net/cpp/src/dispatcher.cpp +++ b/net/cpp/src/dispatcher.cpp @@ -48,43 +48,56 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { call_t the_call; - msg.convert(the_call); + + try { + msg.convert(the_call); + } catch(...) { + LOG(ERROR) << "Bad message format"; + return; + } // TODO: proper validation of protocol (and responding to it) - // auto &&type = std::get<0>(the_call); - // assert(type == 0); - + auto &&type = std::get<0>(the_call); auto &&id = std::get<1>(the_call); - auto &&name = std::get<2>(the_call); - auto &&args = std::get<3>(the_call); + auto &&name = std::get<2>(the_call); + auto &&args = std::get<3>(the_call); + // assert(type == 0); - LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); - - auto it_func = funcs_.find(name); - - if (it_func != end(funcs_)) { - try { - auto result = (it_func->second)(args); //->get(); - response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str()); - } catch (const std::exception &e) { - //throw; - //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; - response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str()); - } catch (int e) { - //throw; - //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; - response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str()); + if (type == 1) { + LOG(INFO) << "RPC return for " << id; + s._dispatchResponse(id, args); + } else if (type == 0) { + LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); + + auto it_func = funcs_.find(name); + + if (it_func != end(funcs_)) { + try { + auto result = (it_func->second)(args); //->get(); + s._sendResponse(id, result->get()); + /*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + s.send("__return__", buf.str());*/ + } catch (const std::exception &e) { + //throw; + //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; + /*response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + s.send("__return__", buf.str());*/ + } catch (int e) { + //throw; + //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; + /*response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + s.send("__return__", buf.str());*/ + } } - } + } else { + // TODO(nick) Some error + } } void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) { diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index 868aa168d270d5f80420e396fad8fccca56f4ea5..7b1955f9aafa6644a97143e2fd0ddd9fcd032a2e 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -308,12 +308,18 @@ bool Peer::data() { msgpack::object obj = msg.get(); if (status_ != kConnected) { // First message must be a handshake - tuple<uint32_t, std::string, msgpack::object> hs; - obj.convert(hs); - - if (get<1>(hs) != "__handshake__") { + try { + tuple<uint32_t, std::string, msgpack::object> hs; + obj.convert(hs); + + if (get<1>(hs) != "__handshake__") { + close(); + LOG(ERROR) << "Missing handshake"; + return false; + } + } catch(...) { close(); - LOG(ERROR) << "Missing handshake"; + LOG(ERROR) << "Bad first message format"; return false; } } @@ -432,21 +438,7 @@ void Socket::handshake2() { _connected(); }*/ -void Peer::_dispatchReturn(const std::string &d) { - auto unpacked = msgpack::unpack(d.data(), d.size()); - Dispatcher::response_t the_result; - unpacked.get().convert(the_result); - - // Msgpack stipulates that 1 means return message - if (std::get<0>(the_result) != 1) { - LOG(ERROR) << "Bad RPC return message"; - return; - } - - auto &&id = std::get<1>(the_result); - //auto &&err = std::get<2>(the_result); - auto &&res = std::get<3>(the_result); - +void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... if (callbacks_.count(id) > 0) { @@ -460,6 +452,12 @@ void Peer::_dispatchReturn(const std::string &d) { } } +void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { + Dispatcher::response_t res_obj = std::make_tuple(1,id,msgpack::object(),res); + msgpack::pack(send_buf_, res_obj); + _send(); +} + void Peer::onConnect(std::function<void()> &f) { if (status_ == kConnected) { f(); diff --git a/net/cpp/test/CMakeLists.txt b/net/cpp/test/CMakeLists.txt index 71b2610d50239f007d2bddad0506742532b23ed0..ce0044c9269757089e7a5243ced4f3dec40ec41a 100644 --- a/net/cpp/test/CMakeLists.txt +++ b/net/cpp/test/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(peer_unit target_link_libraries(peer_unit ${URIPARSER_LIBRARIES} glog::glog + Threads::Threads ${UUID_LIBRARIES}) ### URI ######################################################################## diff --git a/net/cpp/test/peer_unit.cpp b/net/cpp/test/peer_unit.cpp index edc9f1507a46b7a863b0da3f657765aad9202e1d..35a581657d89efe10b7d0ab19a29c3e1a3e8f41f 100644 --- a/net/cpp/test/peer_unit.cpp +++ b/net/cpp/test/peer_unit.cpp @@ -116,6 +116,14 @@ tuple<std::string, T> readResponse(int s) { return std::make_tuple(get<1>(req), get<2>(req)); } +template <typename T> +tuple<uint32_t, T> readRPC(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + tuple<uint8_t, uint32_t, std::string, T> req; + msg.get().convert(req); + return std::make_tuple(get<1>(req), get<3>(req)); +} + // --- Files to test ----------------------------------------------------------- #include "../src/peer.cpp" @@ -171,52 +179,64 @@ TEST_CASE("Peer(int)", "[]") { } } -/*TEST_CASE("Peer::call()", "[rpc]") { +TEST_CASE("Peer::call()", "[rpc]") { MockPeer s; + send_handshake(s); + s.mock_data(); - SECTION("no argument call") { - waithandler = [&]() { - // Read fakedata sent - // TODO Validate data + SECTION("one argument call") { + REQUIRE( s.isConnected() ); + + fakedata[0] = ""; + + // Thread to provide response to otherwise blocking call + std::thread thr([&s]() { + while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - // Do a fake send - auto res_obj = std::make_tuple(1,0,msgpack::object(),66); + auto [id,value] = readRPC<tuple<int>>(0); + auto res_obj = std::make_tuple(1,id,"__return__",get<0>(value)+22); std::stringstream buf; msgpack::pack(buf, res_obj); - - fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str()); + fakedata[0] = buf.str(); s.mock_data(); - }; + }); - int res = s.call<int>("test1"); + int res = s.call<int>("test1", 44); + + thr.join(); REQUIRE( (res == 66) ); } - SECTION("one argument call") { - waithandler = [&]() { - // Read fakedata sent - // TODO Validate data + SECTION("no argument call") { + REQUIRE( s.isConnected() ); + + fakedata[0] = ""; + + // Thread to provide response to otherwise blocking call + std::thread thr([&s]() { + while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - // Do a fake send - auto res_obj = std::make_tuple(1,1,msgpack::object(),43); + auto [id,value] = readRPC<tuple<>>(0); + auto res_obj = std::make_tuple(1,id,"__return__",77); std::stringstream buf; msgpack::pack(buf, res_obj); - - fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str()); + fakedata[0] = buf.str(); s.mock_data(); - }; + }); - int res = s.call<int>("test1", 78); + int res = s.call<int>("test1"); + + thr.join(); - REQUIRE( (res == 43) ); + REQUIRE( (res == 77) ); } - - waithandler = nullptr; -}*/ +} TEST_CASE("Peer::bind()", "[rpc]") { MockPeer s; + send_handshake(s); + s.mock_data(); SECTION("no argument call") { bool done = false; @@ -225,8 +245,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { done = true; }); - send_handshake(s); - s.mock_data(); s.send("hello"); s.mock_data(); // Force it to read the fake send... @@ -240,8 +258,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { done = a; }); - send_handshake(s); - s.mock_data(); s.send("hello", 55); s.mock_data(); // Force it to read the fake send... @@ -254,9 +270,7 @@ TEST_CASE("Peer::bind()", "[rpc]") { s.bind("hello", [&](int a, std::string b) { done = b; }); - - send_handshake(s); - s.mock_data(); + s.send("hello", 55, "world"); s.mock_data(); // Force it to read the fake send... @@ -322,93 +336,8 @@ TEST_CASE("Socket::send()", "[io]") { auto [name, value] = readResponse<tuple<std::string,std::string>>(0); REQUIRE( (name == "dummy2") ); - REQUIRE( (get<0>(value) == "hello") ); + REQUIRE( (get<0>(value) == "hello ") ); REQUIRE( (get<1>(value) == "world") ); } } -/*TEST_CASE("Socket::read()", "[io]") { - MockSocket s; - - SECTION("read an int") { - int i = 99; - fake_send(0, 100, std::string((char*)&i,4)); - - i = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == sizeof(int)) ); - REQUIRE( (s.read(i) == sizeof(int)) ); - REQUIRE( (i == 99) ); - } - - SECTION("read two ints") { - int i[2]; - i[0] = 99; - i[1] = 101; - fake_send(0, 100, std::string((char*)&i,2*sizeof(int))); - - i[0] = 0; - i[1] = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == 2*sizeof(int)) ); - REQUIRE( (s.read(&i,2) == 2*sizeof(int)) ); - REQUIRE( (i[0] == 99) ); - REQUIRE( (i[1] == 101) ); - } - - SECTION("multiple reads") { - int i[2]; - i[0] = 99; - i[1] = 101; - fake_send(0, 100, std::string((char*)&i,2*sizeof(int))); - - i[0] = 0; - i[1] = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.read(&i[0],1) == sizeof(int)) ); - REQUIRE( (i[0] == 99) ); - REQUIRE( (s.read(&i[1],1) == sizeof(int)) ); - REQUIRE( (i[1] == 101) ); - } - - SECTION("read a string") { - std::string str; - fake_send(0, 100, std::string("hello world")); - - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == 11) ); - REQUIRE( (s.read(str) == 11) ); - REQUIRE( (str == "hello world") ); - } - - SECTION("read into existing string") { - std::string str; - str.reserve(11); - void *ptr = str.data(); - fake_send(0, 100, std::string("hello world")); - - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == 11) ); - REQUIRE( (s.read(str) == 11) ); - REQUIRE( (str == "hello world") ); - REQUIRE( (str.data() == ptr) ); - } - - SECTION("read too much data") { - int i = 99; - fake_send(0, 100, std::string((char*)&i,4)); - - i = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == sizeof(int)) ); - REQUIRE( (s.read(&i,2) == sizeof(int)) ); - REQUIRE( (i == 99) ); - } -}*/ -