diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 44dee3439f856c299e35a102bb714af0916d3207..35cbe5bb2b749c13287327af02764b522e48cca0 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -273,7 +273,7 @@ class Peer { //std::vector<std::function<void(Peer &)>> close_handlers_; std::map<int, std::unique_ptr<virtual_caller>> callbacks_; - static volatile int rpcid__; // Return ID for RPC calls + static std::atomic_int rpcid__; // Return ID for RPC calls }; // --- Inline Template Implementations ----------------------------------------- diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 193d29ace1148489d34dd24aa3b325b38508766c..c7bef74e572837da789a6e37b95cd5252a53f7b6 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -311,18 +311,21 @@ void Universe::broadcast(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { - bool hasreturned = false; + std::atomic_flag hasreturned; std::mutex m; std::condition_variable cv; std::atomic<int> count = 0; std::optional<R> result; + hasreturned.clear(); + auto handler = [&](const std::optional<R> &r) { - count--; std::unique_lock<std::mutex> lk(m); - if (hasreturned || !r) return; - hasreturned = true; - result = r; + count--; + //if (hasreturned || !r) return; + if (r && !hasreturned.test_and_set()) { + result = r; + } lk.unlock(); cv.notify_one(); }; @@ -340,12 +343,16 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { { // Block thread until async callback notifies us std::unique_lock<std::mutex> llk(m); // FIXME: what happens if one clients does not return (count != 0)? - cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned, &count] { - return hasreturned && count == 0; + cv.wait_for(llk, std::chrono::seconds(1), [&count] { + return count == 0; //hasreturned || count == 0; }); // Cancel any further results lk.lock(); + if (count > 0) { + throw FTL_Error("Find one failed with timeout"); + } + for (auto p : peers_) { auto mm = record.find(p); if (mm != record.end()) { diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 0e87eaf9206dff4970faea2f3384dd884ec386e8..4e99f67b5b1123bf1a3c0c9eb5d5da71d7ec325c 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -59,7 +59,7 @@ using std::vector; return ss.str(); }*/ -volatile int Peer::rpcid__ = 0; +std::atomic_int Peer::rpcid__ = 0; // Global peer UUID ftl::UUID ftl::net::this_peer; diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index 23249568542140e034a377b1f7eb6d73aa83f74d..11e077b647691bc2b7640e92bb5cb058f13e1ea3 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -329,7 +329,7 @@ void ForeignBuilder::_schedule() { cb_.trigger(fs); } catch(const ftl::exception &e) { LOG(ERROR) << "Exception in frameset builder: " << e.what(); - LOG(ERROR) << "Trace = " << e.trace(); + //LOG(ERROR) << "Trace = " << e.trace(); } catch(std::exception &e) { LOG(ERROR) << "Exception in frameset builder: " << e.what(); } diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index b686b8bdd82ddfcc43438e1ca9ad8dcdc864f761..a2a78fd6aa34c2c568e95dc6224968e92626bd02 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -1105,7 +1105,13 @@ void Feed::_beginRecord(Filter *f) { handle_record2_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) { record_stream_->select(fs->frameset(), f->channels(), true); stream_->select(fs->frameset(), f->channels(), true); - fs->flush(); // Force now to reduce latency + ftl::pool.push([fs](int id) { + try { + fs->flush(); // Force now to reduce latency + } catch (const ftl::exception &e) { + LOG(ERROR) << "Exception when sending: " << e.what(); + } + }); return true; }); }