Skip to content
Snippets Groups Projects
Commit 930bbe0e authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Reformation findOne and findAll

parent e1405b71
No related branches found
No related tags found
1 merge request!316Resolves #343 GUI and Frame Refactor
...@@ -311,102 +311,74 @@ void Universe::broadcast(const std::string &name, ARGS... args) { ...@@ -311,102 +311,74 @@ void Universe::broadcast(const std::string &name, ARGS... args) {
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
std::atomic_flag hasreturned; struct SharedData {
std::mutex m; std::atomic_bool hasreturned = false;
std::condition_variable cv; std::mutex m;
std::atomic<int> count = 0; std::condition_variable cv;
std::optional<R> result; std::optional<R> result;
hasreturned.clear();
auto handler = [&](const std::optional<R> &r) {
std::unique_lock<std::mutex> lk(m);
count--;
//if (hasreturned || !r) return;
if (r && !hasreturned.test_and_set()) {
result = r;
}
lk.unlock();
cv.notify_one();
}; };
std::map<Peer*, int> record; auto sdata = std::make_shared<SharedData>();
SHARED_LOCK(net_mutex_,lk);
for (auto p : peers_) { auto handler = [sdata](const std::optional<R> &r) {
if (!p->waitConnection()) continue; std::unique_lock<std::mutex> lk(sdata->m);
count++; if (r && !sdata->hasreturned) {
record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); sdata->hasreturned = true;
} sdata->result = r;
lk.unlock();
{ // Block thread until async callback notifies us
std::unique_lock<std::mutex> llk(m);
// FIXME: what happens if one clients does not return (count != 0)?
cv.wait_for(llk, std::chrono::seconds(1), [&count] {
return count == 0; //hasreturned || count == 0;
});
// Cancel any further results
lk.lock();
if (count > 0) {
throw FTL_Error("Find one failed with timeout");
} }
lk.unlock();
sdata->cv.notify_one();
};
{
SHARED_LOCK(net_mutex_,lk);
for (auto p : peers_) { for (auto p : peers_) {
auto mm = record.find(p); if (!p->waitConnection()) continue;
if (mm != record.end()) { p->asyncCall<std::optional<R>>(name, handler, args...);
p->cancelCall(mm->second);
}
} }
} }
// Block thread until async callback notifies us
std::unique_lock<std::mutex> llk(sdata->m);
sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata] {
return (bool)sdata->hasreturned;
});
return result; return sdata->result;
} }
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
int returncount = 0; struct SharedData {
int sentcount = 0; std::atomic_int returncount = 0;
std::mutex m; std::atomic_int sentcount = 0;
std::condition_variable cv; std::mutex m;
std::condition_variable cv;
std::vector<R> results; std::vector<R> results;
};
auto sdata = std::make_shared<SharedData>();
auto handler = [&](const std::vector<R> &r) { auto handler = [sdata](const std::vector<R> &r) {
//UNIQUE_LOCK(m,lk); std::unique_lock<std::mutex> lk(sdata->m);
std::unique_lock<std::mutex> lk(m); ++sdata->returncount;
returncount++; sdata->results.insert(sdata->results.end(), r.begin(), r.end());
results.insert(results.end(), r.begin(), r.end());
lk.unlock(); lk.unlock();
cv.notify_one(); sdata->cv.notify_one();
}; };
std::map<Peer*, int> record; {
SHARED_LOCK(net_mutex_,lk); SHARED_LOCK(net_mutex_,lk);
for (auto p : peers_) {
if (!p->waitConnection()) continue;
sentcount++;
record[p] = p->asyncCall<std::vector<R>>(name, handler, args...);
}
lk.unlock();
{ // Block thread until async callback notifies us
//UNIQUE_LOCK(m,llk);
std::unique_lock<std::mutex> llk(m);
cv.wait_for(llk, std::chrono::seconds(1), [&returncount,sentcount]{return returncount == sentcount;});
// Cancel any further results
lk.lock();
for (auto p : peers_) { for (auto p : peers_) {
auto mm = record.find(p); if (!p->waitConnection()) continue;
if (mm != record.end()) { ++sdata->sentcount;
p->cancelCall(mm->second); p->asyncCall<std::vector<R>>(name, handler, args...);
}
} }
} }
return results; std::unique_lock<std::mutex> llk(sdata->m);
sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata]{return sdata->returncount == sdata->sentcount; });
return sdata->results;
} }
template <typename R, typename... ARGS> template <typename R, typename... ARGS>
......
...@@ -420,7 +420,7 @@ void Peer::socketError() { ...@@ -420,7 +420,7 @@ void Peer::socketError() {
// more socket errors... // more socket errors...
_badClose(); _badClose();
LOG(ERROR) << "Socket: " << uri_ << " - error " << err; if (err != 0) LOG(ERROR) << "Socket: " << uri_ << " - error " << err;
} }
void Peer::error(int e) { void Peer::error(int e) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment