From 1db888765f2538ee64187d06c1614f6c36728b11 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Thu, 30 Jul 2020 13:26:48 +0300
Subject: [PATCH] Add parallel and async triggers

---
 components/common/cpp/include/ftl/handle.hpp | 54 +++++++++++++++++---
 components/streams/src/feed.cpp              | 15 ++----
 2 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/components/common/cpp/include/ftl/handle.hpp b/components/common/cpp/include/ftl/handle.hpp
index 9c712c039..1e7ca32cf 100644
--- a/components/common/cpp/include/ftl/handle.hpp
+++ b/components/common/cpp/include/ftl/handle.hpp
@@ -76,6 +76,11 @@ struct [[nodiscard]] Handle {
  */
 template <typename ...ARGS>
 struct Handler : BaseHandler {
+	Handler() {}
+	~Handler() {
+		// Ensure all thread pool jobs are done
+		while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(10));
+	}
 
 	/**
 	 * Add a new callback function. It returns a `Handle` object that must
@@ -96,15 +101,47 @@ struct Handler : BaseHandler {
 	 */
 	void trigger(ARGS ...args) {
 		std::unique_lock<std::mutex> lk(mutex_);
-		//try {
+		for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) {
+			bool keep = i->second(std::forward<ARGS>(args)...);
+			if (!keep) i = callbacks_.erase(i);
+			else ++i;
+		}
+	}
+
+	/**
+	 * Call all the callbacks in another thread. The callbacks are done in a
+	 * single thread, not in parallel.
+	 */
+	void triggerAsync(ARGS ...args) {
+		ftl::pool.push([this, args...](int id) {
+			std::unique_lock<std::mutex> lk(mutex_);
 			for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) {
 				bool keep = i->second(std::forward<ARGS>(args)...);
 				if (!keep) i = callbacks_.erase(i);
 				else ++i;
 			}
-		//} catch (const std::exception &e) {
-		//	LOG(ERROR) << "Exception in callback: " << e.what();
-		//}
+		});
+	}
+
+	/**
+	 * Each callback is called in its own thread job. Note: the return value
+	 * of the callback is ignored in this case and does not allow callback
+	 * removal via the return value.
+	 */
+	void triggerParallel(ARGS ...args) {
+		std::unique_lock<std::mutex> lk(mutex_);
+		jobs_ += callbacks_.size();
+		for (auto i=callbacks_.begin(); i!=callbacks_.end(); ++i) {
+			ftl::pool.push([this, f = i->second, args...](int id) {
+				try {
+					f(std::forward<ARGS>(args)...);
+				} catch (const ftl::exception &e) {
+					--jobs_;
+					throw e;
+				}
+				--jobs_;
+			});
+		}
 	}
 
 	/**
@@ -112,12 +149,17 @@ struct Handler : BaseHandler {
 	 * `Handle` to be destroyed or cancelled.
 	 */
 	void remove(const Handle &h) override {
-		std::unique_lock<std::mutex> lk(mutex_);
-		callbacks_.erase(h.id());
+		{
+			std::unique_lock<std::mutex> lk(mutex_);
+			callbacks_.erase(h.id());
+		}
+		// Make sure any possible call to removed callback has finished.
+		while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(10));
 	}
 
 	private:
 	std::unordered_map<int, std::function<bool(ARGS...)>> callbacks_;
+	std::atomic_int jobs_=0;
 };
 
 /**
diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp
index 5f71e7fcf..aa7878a46 100644
--- a/components/streams/src/feed.cpp
+++ b/components/streams/src/feed.cpp
@@ -194,7 +194,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
 
 				if (filter->sources().empty()) {
 					//filter->channels_available_ = fs->channels();
-					filter->handler_.trigger(fs);
+					filter->handler_.triggerParallel(fs);
 				}
 				else {
 					// TODO: process partial/complete sets here (drop), that is
@@ -206,7 +206,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
 						//if (fs->hasFrame(src)) {
 						if (fs->frameset() == src) {
 							//filter->channels_available_ = fs->channels();
-							filter->handler_.trigger(fs);
+							filter->handler_.triggerParallel(fs);
 							break;
 						}
 					}
@@ -499,10 +499,8 @@ void Feed::_updateNetSources(ftl::net::Peer *p, const std::string &s, bool autoa
 		add(uri);
 	}
 
-	ftl::pool.push([this, s](int id) {
-		std::vector<std::string> srcs{s};
-		new_sources_cb_.trigger(srcs);
-	});
+	std::vector<std::string> srcs{s};
+	new_sources_cb_.triggerAsync(srcs);
 }
 
 void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) {
@@ -527,10 +525,7 @@ void Feed::_updateNetSources(ftl::net::Peer *p, bool autoadd) {
 			}
 		}
 
-		ftl::pool.push([this, peerstreams](int id) {
-			new_sources_cb_.trigger(peerstreams);
-		});
-
+		new_sources_cb_.triggerAsync(peerstreams);
 	} catch (const ftl::exception &e) {
 
 	}
-- 
GitLab