From d3ce99dbb57ff4b2de471ca8cdb2009dd78f8197 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Mon, 27 Jul 2020 14:24:52 +0300
Subject: [PATCH] Attempt to improve rpc stability

---
 components/net/cpp/include/ftl/net/peer.hpp   |  2 +-
 .../net/cpp/include/ftl/net/universe.hpp      | 21 ++++++++++++-------
 components/net/cpp/src/peer.cpp               |  2 +-
 components/streams/src/builder.cpp            |  2 +-
 components/streams/src/feed.cpp               |  8 ++++++-
 5 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp
index 44dee3439..35cbe5bb2 100644
--- a/components/net/cpp/include/ftl/net/peer.hpp
+++ b/components/net/cpp/include/ftl/net/peer.hpp
@@ -273,7 +273,7 @@ class Peer {
 	//std::vector<std::function<void(Peer &)>> close_handlers_;
 	std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
 	
-	static volatile int rpcid__;				// Return ID for RPC calls
+	static std::atomic_int rpcid__;				// Return ID for RPC calls
 };
 
 // --- Inline Template Implementations -----------------------------------------
diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp
index 193d29ace..c7bef74e5 100644
--- a/components/net/cpp/include/ftl/net/universe.hpp
+++ b/components/net/cpp/include/ftl/net/universe.hpp
@@ -311,18 +311,21 @@ void Universe::broadcast(const std::string &name, ARGS... args) {
 
 template <typename R, typename... ARGS>
 std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
-	bool hasreturned = false;
+	std::atomic_flag hasreturned;
 	std::mutex m;
 	std::condition_variable cv;
 	std::atomic<int> count = 0;
 	std::optional<R> result;
 
+	hasreturned.clear();
+
 	auto handler = [&](const std::optional<R> &r) {
-		count--;
 		std::unique_lock<std::mutex> lk(m);
-		if (hasreturned || !r) return;
-		hasreturned = true;
-		result = r;
+		count--;
+		//if (hasreturned || !r) return;
+		if (r && !hasreturned.test_and_set()) {
+			result = r;
+		}
 		lk.unlock();
 		cv.notify_one();
 	};
@@ -340,12 +343,16 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
 	{	// Block thread until async callback notifies us
 		std::unique_lock<std::mutex> llk(m);
 		// FIXME: what happens if one clients does not return (count != 0)?
-		cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned, &count] {
-			return hasreturned && count == 0;
+		cv.wait_for(llk, std::chrono::seconds(1), [&count] {
+			return count == 0; //hasreturned || count == 0;
 		});
 
 		// Cancel any further results
 		lk.lock();
+		if (count > 0) {
+			throw FTL_Error("Find one failed with timeout");
+		}
+
 		for (auto p : peers_) {
 			auto mm = record.find(p);
 			if (mm != record.end()) {
diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp
index 0e87eaf92..4e99f67b5 100644
--- a/components/net/cpp/src/peer.cpp
+++ b/components/net/cpp/src/peer.cpp
@@ -59,7 +59,7 @@ using std::vector;
     return ss.str();
 }*/
 
-volatile int Peer::rpcid__ = 0;
+std::atomic_int Peer::rpcid__ = 0;
 
 // Global peer UUID
 ftl::UUID ftl::net::this_peer;
diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp
index 232495685..11e077b64 100644
--- a/components/streams/src/builder.cpp
+++ b/components/streams/src/builder.cpp
@@ -329,7 +329,7 @@ void ForeignBuilder::_schedule() {
 				cb_.trigger(fs);
 			} catch(const ftl::exception &e) {
 				LOG(ERROR) << "Exception in frameset builder: " << e.what();
-				LOG(ERROR) << "Trace = " << e.trace();
+				//LOG(ERROR) << "Trace = " << e.trace();
 			} catch(std::exception &e) {
 				LOG(ERROR) << "Exception in frameset builder: " << e.what();
 			}
diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp
index b686b8bdd..a2a78fd6a 100644
--- a/components/streams/src/feed.cpp
+++ b/components/streams/src/feed.cpp
@@ -1105,7 +1105,13 @@ void Feed::_beginRecord(Filter *f) {
 	handle_record2_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) {
 		record_stream_->select(fs->frameset(), f->channels(), true);
 		stream_->select(fs->frameset(), f->channels(), true);
-		fs->flush();  // Force now to reduce latency
+		ftl::pool.push([fs](int id) {
+			try {
+				fs->flush();  // Force now to reduce latency
+			} catch (const ftl::exception &e) {
+				LOG(ERROR) << "Exception when sending: " << e.what();
+			}
+		});
 		return true;
 	});
 }
-- 
GitLab