Skip to content
Snippets Groups Projects
Commit c87c943d authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Switch peer blocking calls to condition_variable

parent da1d2d12
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,10 @@ ...@@ -20,6 +20,10 @@
#include <tuple> #include <tuple>
#include <vector> #include <vector>
#include <type_traits> #include <type_traits>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
# define ENABLE_IF(...) \ # define ENABLE_IF(...) \
typename std::enable_if<(__VA_ARGS__), bool>::type = true typename std::enable_if<(__VA_ARGS__), bool>::type = true
...@@ -56,6 +60,7 @@ struct decrypt{};*/ ...@@ -56,6 +60,7 @@ struct decrypt{};*/
class Peer { class Peer {
public: public:
friend class Universe; friend class Universe;
friend class Dispatcher;
enum Status { enum Status {
kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting
...@@ -143,6 +148,9 @@ class Peer { ...@@ -143,6 +148,9 @@ class Peer {
void socketError(); // Process one error from socket void socketError(); // Process one error from socket
void error(int e); void error(int e);
void _dispatchResponse(uint32_t id, msgpack::object &obj);
void _sendResponse(uint32_t id, const msgpack::object &obj);
/** /**
* Get the internal OS dependent socket. * Get the internal OS dependent socket.
* TODO(nick) Work out if this should be private. * TODO(nick) Work out if this should be private.
...@@ -161,7 +169,6 @@ class Peer { ...@@ -161,7 +169,6 @@ class Peer {
private: // Functions private: // Functions
void _connected(); void _connected();
void _updateURI(); void _updateURI();
void _dispatchReturn(const std::string &d);
int _send(); int _send();
...@@ -228,17 +235,26 @@ void Peer::bind(const std::string &name, F func) { ...@@ -228,17 +235,26 @@ void Peer::bind(const std::string &name, F func) {
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
R Peer::call(const std::string &name, ARGS... args) { R Peer::call(const std::string &name, ARGS... args) {
bool hasreturned = false; bool hasreturned = false;
std::mutex m;
std::condition_variable cv;
R result; R result;
asyncCall<R>(name, [&result,&hasreturned](const R &r) { asyncCall<R>(name, [&](const R &r) {
std::unique_lock<std::mutex> lk(m);
hasreturned = true; hasreturned = true;
result = r; result = r;
lk.unlock();
cv.notify_one();
}, std::forward<ARGS>(args)...); }, std::forward<ARGS>(args)...);
// Loop the network { // Block thread until async callback notifies us
int limit = 10; std::unique_lock<std::mutex> lk(m);
while (limit > 0 && !hasreturned) { cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
limit--; }
// TODO REPLACE ftl::net::wait();
if (!hasreturned) {
// TODO(nick) remove callback
throw 1;
} }
return result; return result;
...@@ -255,13 +271,12 @@ void Peer::asyncCall( ...@@ -255,13 +271,12 @@ void Peer::asyncCall(
LOG(INFO) << "RPC " << name << "() -> " << uri_; LOG(INFO) << "RPC " << name << "() -> " << uri_;
std::stringstream buf; msgpack::pack(send_buf_, call_obj);
msgpack::pack(buf, call_obj);
// Register the CB // Register the CB
callbacks_[rpcid] = std::make_unique<caller<T>>(cb); callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
send("__rpc__", buf.str()); _send();
} }
}; };
......
...@@ -48,43 +48,56 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { ...@@ -48,43 +48,56 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) {
void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
call_t the_call; call_t the_call;
msg.convert(the_call);
try {
msg.convert(the_call);
} catch(...) {
LOG(ERROR) << "Bad message format";
return;
}
// TODO: proper validation of protocol (and responding to it) // TODO: proper validation of protocol (and responding to it)
// auto &&type = std::get<0>(the_call); auto &&type = std::get<0>(the_call);
// assert(type == 0);
auto &&id = std::get<1>(the_call); auto &&id = std::get<1>(the_call);
auto &&name = std::get<2>(the_call); auto &&name = std::get<2>(the_call);
auto &&args = std::get<3>(the_call); auto &&args = std::get<3>(the_call);
// assert(type == 0);
LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); if (type == 1) {
LOG(INFO) << "RPC return for " << id;
auto it_func = funcs_.find(name); s._dispatchResponse(id, args);
} else if (type == 0) {
if (it_func != end(funcs_)) { LOG(INFO) << "RPC " << name << "() <- " << s.getURI();
try {
auto result = (it_func->second)(args); //->get(); auto it_func = funcs_.find(name);
response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
std::stringstream buf; if (it_func != end(funcs_)) {
msgpack::pack(buf, res_obj); try {
s.send("__return__", buf.str()); auto result = (it_func->second)(args); //->get();
} catch (const std::exception &e) { s._sendResponse(id, result->get());
//throw; /*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; std::stringstream buf;
response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); msgpack::pack(buf, res_obj);
std::stringstream buf; s.send("__return__", buf.str());*/
msgpack::pack(buf, res_obj); } catch (const std::exception &e) {
s.send("__return__", buf.str()); //throw;
} catch (int e) { //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
//throw; /*response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object());
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; std::stringstream buf;
response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); msgpack::pack(buf, res_obj);
std::stringstream buf; s.send("__return__", buf.str());*/
msgpack::pack(buf, res_obj); } catch (int e) {
s.send("__return__", buf.str()); //throw;
//LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")";
/*response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object());
std::stringstream buf;
msgpack::pack(buf, res_obj);
s.send("__return__", buf.str());*/
}
} }
} } else {
// TODO(nick) Some error
}
} }
void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) { void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) {
......
...@@ -308,12 +308,18 @@ bool Peer::data() { ...@@ -308,12 +308,18 @@ bool Peer::data() {
msgpack::object obj = msg.get(); msgpack::object obj = msg.get();
if (status_ != kConnected) { if (status_ != kConnected) {
// First message must be a handshake // First message must be a handshake
tuple<uint32_t, std::string, msgpack::object> hs; try {
obj.convert(hs); tuple<uint32_t, std::string, msgpack::object> hs;
obj.convert(hs);
if (get<1>(hs) != "__handshake__") {
if (get<1>(hs) != "__handshake__") {
close();
LOG(ERROR) << "Missing handshake";
return false;
}
} catch(...) {
close(); close();
LOG(ERROR) << "Missing handshake"; LOG(ERROR) << "Bad first message format";
return false; return false;
} }
} }
...@@ -432,21 +438,7 @@ void Socket::handshake2() { ...@@ -432,21 +438,7 @@ void Socket::handshake2() {
_connected(); _connected();
}*/ }*/
void Peer::_dispatchReturn(const std::string &d) { void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
auto unpacked = msgpack::unpack(d.data(), d.size());
Dispatcher::response_t the_result;
unpacked.get().convert(the_result);
// Msgpack stipulates that 1 means return message
if (std::get<0>(the_result) != 1) {
LOG(ERROR) << "Bad RPC return message";
return;
}
auto &&id = std::get<1>(the_result);
//auto &&err = std::get<2>(the_result);
auto &&res = std::get<3>(the_result);
// TODO Handle error reporting... // TODO Handle error reporting...
if (callbacks_.count(id) > 0) { if (callbacks_.count(id) > 0) {
...@@ -460,6 +452,12 @@ void Peer::_dispatchReturn(const std::string &d) { ...@@ -460,6 +452,12 @@ void Peer::_dispatchReturn(const std::string &d) {
} }
} }
void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
Dispatcher::response_t res_obj = std::make_tuple(1,id,msgpack::object(),res);
msgpack::pack(send_buf_, res_obj);
_send();
}
void Peer::onConnect(std::function<void()> &f) { void Peer::onConnect(std::function<void()> &f) {
if (status_ == kConnected) { if (status_ == kConnected) {
f(); f();
......
...@@ -9,6 +9,7 @@ add_executable(peer_unit ...@@ -9,6 +9,7 @@ add_executable(peer_unit
target_link_libraries(peer_unit target_link_libraries(peer_unit
${URIPARSER_LIBRARIES} ${URIPARSER_LIBRARIES}
glog::glog glog::glog
Threads::Threads
${UUID_LIBRARIES}) ${UUID_LIBRARIES})
### URI ######################################################################## ### URI ########################################################################
......
...@@ -116,6 +116,14 @@ tuple<std::string, T> readResponse(int s) { ...@@ -116,6 +116,14 @@ tuple<std::string, T> readResponse(int s) {
return std::make_tuple(get<1>(req), get<2>(req)); return std::make_tuple(get<1>(req), get<2>(req));
} }
template <typename T>
tuple<uint32_t, T> readRPC(int s) {
msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size());
tuple<uint8_t, uint32_t, std::string, T> req;
msg.get().convert(req);
return std::make_tuple(get<1>(req), get<3>(req));
}
// --- Files to test ----------------------------------------------------------- // --- Files to test -----------------------------------------------------------
#include "../src/peer.cpp" #include "../src/peer.cpp"
...@@ -171,52 +179,64 @@ TEST_CASE("Peer(int)", "[]") { ...@@ -171,52 +179,64 @@ TEST_CASE("Peer(int)", "[]") {
} }
} }
/*TEST_CASE("Peer::call()", "[rpc]") { TEST_CASE("Peer::call()", "[rpc]") {
MockPeer s; MockPeer s;
send_handshake(s);
s.mock_data();
SECTION("no argument call") { SECTION("one argument call") {
waithandler = [&]() { REQUIRE( s.isConnected() );
// Read fakedata sent
// TODO Validate data fakedata[0] = "";
// Thread to provide response to otherwise blocking call
std::thread thr([&s]() {
while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20));
// Do a fake send auto [id,value] = readRPC<tuple<int>>(0);
auto res_obj = std::make_tuple(1,0,msgpack::object(),66); auto res_obj = std::make_tuple(1,id,"__return__",get<0>(value)+22);
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
fakedata[0] = buf.str();
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s.mock_data(); s.mock_data();
}; });
int res = s.call<int>("test1"); int res = s.call<int>("test1", 44);
thr.join();
REQUIRE( (res == 66) ); REQUIRE( (res == 66) );
} }
SECTION("one argument call") { SECTION("no argument call") {
waithandler = [&]() { REQUIRE( s.isConnected() );
// Read fakedata sent
// TODO Validate data fakedata[0] = "";
// Thread to provide response to otherwise blocking call
std::thread thr([&s]() {
while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20));
// Do a fake send auto [id,value] = readRPC<tuple<>>(0);
auto res_obj = std::make_tuple(1,1,msgpack::object(),43); auto res_obj = std::make_tuple(1,id,"__return__",77);
std::stringstream buf; std::stringstream buf;
msgpack::pack(buf, res_obj); msgpack::pack(buf, res_obj);
fakedata[0] = buf.str();
fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str());
s.mock_data(); s.mock_data();
}; });
int res = s.call<int>("test1", 78); int res = s.call<int>("test1");
thr.join();
REQUIRE( (res == 43) ); REQUIRE( (res == 77) );
} }
}
waithandler = nullptr;
}*/
TEST_CASE("Peer::bind()", "[rpc]") { TEST_CASE("Peer::bind()", "[rpc]") {
MockPeer s; MockPeer s;
send_handshake(s);
s.mock_data();
SECTION("no argument call") { SECTION("no argument call") {
bool done = false; bool done = false;
...@@ -225,8 +245,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -225,8 +245,6 @@ TEST_CASE("Peer::bind()", "[rpc]") {
done = true; done = true;
}); });
send_handshake(s);
s.mock_data();
s.send("hello"); s.send("hello");
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
...@@ -240,8 +258,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -240,8 +258,6 @@ TEST_CASE("Peer::bind()", "[rpc]") {
done = a; done = a;
}); });
send_handshake(s);
s.mock_data();
s.send("hello", 55); s.send("hello", 55);
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
...@@ -254,9 +270,7 @@ TEST_CASE("Peer::bind()", "[rpc]") { ...@@ -254,9 +270,7 @@ TEST_CASE("Peer::bind()", "[rpc]") {
s.bind("hello", [&](int a, std::string b) { s.bind("hello", [&](int a, std::string b) {
done = b; done = b;
}); });
send_handshake(s);
s.mock_data();
s.send("hello", 55, "world"); s.send("hello", 55, "world");
s.mock_data(); // Force it to read the fake send... s.mock_data(); // Force it to read the fake send...
...@@ -322,93 +336,8 @@ TEST_CASE("Socket::send()", "[io]") { ...@@ -322,93 +336,8 @@ TEST_CASE("Socket::send()", "[io]") {
auto [name, value] = readResponse<tuple<std::string,std::string>>(0); auto [name, value] = readResponse<tuple<std::string,std::string>>(0);
REQUIRE( (name == "dummy2") ); REQUIRE( (name == "dummy2") );
REQUIRE( (get<0>(value) == "hello") ); REQUIRE( (get<0>(value) == "hello ") );
REQUIRE( (get<1>(value) == "world") ); REQUIRE( (get<1>(value) == "world") );
} }
} }
/*TEST_CASE("Socket::read()", "[io]") {
MockSocket s;
SECTION("read an int") {
int i = 99;
fake_send(0, 100, std::string((char*)&i,4));
i = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == sizeof(int)) );
REQUIRE( (s.read(i) == sizeof(int)) );
REQUIRE( (i == 99) );
}
SECTION("read two ints") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == 2*sizeof(int)) );
REQUIRE( (s.read(&i,2) == 2*sizeof(int)) );
REQUIRE( (i[0] == 99) );
REQUIRE( (i[1] == 101) );
}
SECTION("multiple reads") {
int i[2];
i[0] = 99;
i[1] = 101;
fake_send(0, 100, std::string((char*)&i,2*sizeof(int)));
i[0] = 0;
i[1] = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.read(&i[0],1) == sizeof(int)) );
REQUIRE( (i[0] == 99) );
REQUIRE( (s.read(&i[1],1) == sizeof(int)) );
REQUIRE( (i[1] == 101) );
}
SECTION("read a string") {
std::string str;
fake_send(0, 100, std::string("hello world"));
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == 11) );
REQUIRE( (s.read(str) == 11) );
REQUIRE( (str == "hello world") );
}
SECTION("read into existing string") {
std::string str;
str.reserve(11);
void *ptr = str.data();
fake_send(0, 100, std::string("hello world"));
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == 11) );
REQUIRE( (s.read(str) == 11) );
REQUIRE( (str == "hello world") );
REQUIRE( (str.data() == ptr) );
}
SECTION("read too much data") {
int i = 99;
fake_send(0, 100, std::string((char*)&i,4));
i = 0;
s.mock_data(); // Force a message read, but no protocol...
REQUIRE( (s.size() == sizeof(int)) );
REQUIRE( (s.read(&i,2) == sizeof(int)) );
REQUIRE( (i == 99) );
}
}*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment