From 930bbe0e175431163d36849466a86887968b5ee5 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Mon, 27 Jul 2020 19:47:14 +0300 Subject: [PATCH] Reformation findOne and findAll --- .../net/cpp/include/ftl/net/universe.hpp | 124 +++++++----------- components/net/cpp/src/peer.cpp | 2 +- 2 files changed, 49 insertions(+), 77 deletions(-) diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index c7bef74e5..3d3a4b3ff 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -311,102 +311,74 @@ void Universe::broadcast(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { - std::atomic_flag hasreturned; - std::mutex m; - std::condition_variable cv; - std::atomic<int> count = 0; - std::optional<R> result; - - hasreturned.clear(); - - auto handler = [&](const std::optional<R> &r) { - std::unique_lock<std::mutex> lk(m); - count--; - //if (hasreturned || !r) return; - if (r && !hasreturned.test_and_set()) { - result = r; - } - lk.unlock(); - cv.notify_one(); + struct SharedData { + std::atomic_bool hasreturned = false; + std::mutex m; + std::condition_variable cv; + std::optional<R> result; }; - std::map<Peer*, int> record; - SHARED_LOCK(net_mutex_,lk); + auto sdata = std::make_shared<SharedData>(); - for (auto p : peers_) { - if (!p->waitConnection()) continue; - count++; - record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); - } - lk.unlock(); - - { // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(m); - // FIXME: what happens if one clients does not return (count != 0)? - cv.wait_for(llk, std::chrono::seconds(1), [&count] { - return count == 0; //hasreturned || count == 0; - }); - - // Cancel any further results - lk.lock(); - if (count > 0) { - throw FTL_Error("Find one failed with timeout"); + auto handler = [sdata](const std::optional<R> &r) { + std::unique_lock<std::mutex> lk(sdata->m); + if (r && !sdata->hasreturned) { + sdata->hasreturned = true; + sdata->result = r; } + lk.unlock(); + sdata->cv.notify_one(); + }; + { + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { - auto mm = record.find(p); - if (mm != record.end()) { - p->cancelCall(mm->second); - } + if (!p->waitConnection()) continue; + p->asyncCall<std::optional<R>>(name, handler, args...); } } + + // Block thread until async callback notifies us + std::unique_lock<std::mutex> llk(sdata->m); + sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata] { + return (bool)sdata->hasreturned; + }); - return result; + return sdata->result; } template <typename R, typename... ARGS> std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { - int returncount = 0; - int sentcount = 0; - std::mutex m; - std::condition_variable cv; - - std::vector<R> results; + struct SharedData { + std::atomic_int returncount = 0; + std::atomic_int sentcount = 0; + std::mutex m; + std::condition_variable cv; + std::vector<R> results; + }; + + auto sdata = std::make_shared<SharedData>(); - auto handler = [&](const std::vector<R> &r) { - //UNIQUE_LOCK(m,lk); - std::unique_lock<std::mutex> lk(m); - returncount++; - results.insert(results.end(), r.begin(), r.end()); + auto handler = [sdata](const std::vector<R> &r) { + std::unique_lock<std::mutex> lk(sdata->m); + ++sdata->returncount; + sdata->results.insert(sdata->results.end(), r.begin(), r.end()); lk.unlock(); - cv.notify_one(); + sdata->cv.notify_one(); }; - std::map<Peer*, int> record; - SHARED_LOCK(net_mutex_,lk); - for (auto p : peers_) { - if (!p->waitConnection()) continue; - sentcount++; - record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); - } - lk.unlock(); - - { // Block thread until async callback notifies us - //UNIQUE_LOCK(m,llk); - std::unique_lock<std::mutex> llk(m); - cv.wait_for(llk, std::chrono::seconds(1), [&returncount,sentcount]{return returncount == sentcount;}); - - // Cancel any further results - lk.lock(); + { + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { - auto mm = record.find(p); - if (mm != record.end()) { - p->cancelCall(mm->second); - } + if (!p->waitConnection()) continue; + ++sdata->sentcount; + p->asyncCall<std::vector<R>>(name, handler, args...); } } - - return results; + + std::unique_lock<std::mutex> llk(sdata->m); + sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata]{return sdata->returncount == sdata->sentcount; }); + return sdata->results; } template <typename R, typename... ARGS> diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 4e99f67b5..368a5bbb2 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -420,7 +420,7 @@ void Peer::socketError() { // more socket errors... _badClose(); - LOG(ERROR) << "Socket: " << uri_ << " - error " << err; + if (err != 0) LOG(ERROR) << "Socket: " << uri_ << " - error " << err; } void Peer::error(int e) { -- GitLab