Skip to content
Snippets Groups Projects
Commit 097f39ea authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Tidy peer code

parent 12b6cdff
No related branches found
No related tags found
1 merge request!278Resolves #338 rpc failure
Pipeline #22467 passed
......@@ -230,6 +230,8 @@ class Peer {
int _send();
void _waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name);
template<typename... ARGS>
void _trigger(const std::vector<std::function<void(Peer &, ARGS...)>> &hs, ARGS... args) {
for (auto h : hs) {
......@@ -320,38 +322,17 @@ void Peer::bind(const std::string &name, F func) {
template <typename R, typename... ARGS>
R Peer::call(const std::string &name, ARGS... args) {
bool hasreturned = false;
std::mutex m;
//std::mutex m;
std::condition_variable cv;
R result;
int id = asyncCall<R>(name, [&](const R &r) {
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
hasreturned = true;
result = r;
lk.unlock();
hasreturned = true;
cv.notify_one();
}, std::forward<ARGS>(args)...);
int64_t beginat = ftl::timer::get_time();
std::function<void(int)> j;
while (!hasreturned) {
// Attempt to do a thread pool job if available
if ((bool)(j=ftl::pool.pop())) {
j(-1);
} else {
// Block for a little otherwise
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, std::chrono::milliseconds(2), [&hasreturned]{return hasreturned;});
}
if (ftl::timer::get_time() - beginat > 1000) break;
}
if (!hasreturned) {
cancelCall(id);
throw FTL_Error("RPC failed with timeout: " << name);
}
_waitCall(id, cv, hasreturned, name);
return result;
}
......
......@@ -594,6 +594,30 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
_send();
}
void Peer::_waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name) {
std::mutex m;
int64_t beginat = ftl::timer::get_time();
std::function<void(int)> j;
while (!hasreturned) {
// Attempt to do a thread pool job if available
if ((bool)(j=ftl::pool.pop())) {
j(-1);
} else {
// Block for a little otherwise
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, std::chrono::milliseconds(2), [&hasreturned]{return hasreturned;});
}
if (ftl::timer::get_time() - beginat > 1000) break;
}
if (!hasreturned) {
cancelCall(id);
throw FTL_Error("RPC failed with timeout: " << name);
}
}
bool Peer::waitConnection() {
if (status_ == kConnected) return true;
else if (status_ != kConnecting) return false;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment