From b63f48f07115c62347e2a9658df13dbc83c51e41 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Sun, 16 Jun 2019 14:04:59 +0300
Subject: [PATCH] Resolves #62 by using single thread pool

---
 applications/vision/src/main.cpp                |  2 ++
 components/common/cpp/include/ftl/threads.hpp   |  5 ++++-
 components/common/cpp/src/configuration.cpp     |  3 +++
 components/net/cpp/src/peer.cpp                 |  8 ++++----
 .../rgbd-sources/include/ftl/rgbd/streamer.hpp  |  4 ++--
 components/rgbd-sources/src/streamer.cpp        | 17 ++++++++---------
 6 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp
index 4beea6fdb..141ae5613 100644
--- a/applications/vision/src/main.cpp
+++ b/applications/vision/src/main.cpp
@@ -116,6 +116,8 @@ static void run(ftl::Configurable *root) {
 	stream->stop();
 	net->shutdown();
 
+	ftl::pool.stop();
+
 	delete stream;
 	delete display;
 	//delete source;  // TODO(Nick) Add ftl::destroy
diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp
index 199b355b4..3091ac64f 100644
--- a/components/common/cpp/include/ftl/threads.hpp
+++ b/components/common/cpp/include/ftl/threads.hpp
@@ -3,6 +3,9 @@
 
 #include <mutex>
 #include <shared_mutex>
+#include <ctpl_stl.h>
+
+#define POOL_SIZE 10
 
 #if defined _DEBUG && DEBUG_MUTEX
 #include <loguru.hpp>
@@ -24,7 +27,7 @@
 #endif  // _DEBUG && DEBUG_MUTEX
 
 namespace ftl {
-
+	extern ctpl::thread_pool pool;
 }
 
 #endif  // _FTL_THREADS_HPP_
diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp
index b8633f0de..a10ba86e5 100644
--- a/components/common/cpp/src/configuration.cpp
+++ b/components/common/cpp/src/configuration.cpp
@@ -18,6 +18,7 @@
 #include <ftl/configuration.hpp>
 #include <ftl/configurable.hpp>
 #include <ftl/uri.hpp>
+#include <ftl/threads.hpp>
 
 #include <fstream>
 #include <string>
@@ -34,6 +35,8 @@ using ftl::is_file;
 using ftl::is_directory;
 using ftl::Configurable;
 
+ctpl::thread_pool ftl::pool(POOL_SIZE);
+
 // Store loaded configuration
 namespace ftl {
 namespace config {
diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp
index f3f875ae8..853b91d26 100644
--- a/components/net/cpp/src/peer.cpp
+++ b/components/net/cpp/src/peer.cpp
@@ -60,7 +60,7 @@ int Peer::rpcid__ = 0;
 // Global peer UUID
 ftl::UUID ftl::net::this_peer;
 
-static ctpl::thread_pool pool(5);
+//static ctpl::thread_pool pool(5);
 
 // TODO(nick) Move to tcp_internal.cpp
 static SOCKET tcpConnect(URI &uri) {
@@ -179,7 +179,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
 				if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!";
 				
 				// Ensure handlers called later or in new thread
-				pool.push([this](int id) {
+				ftl::pool.push([this](int id) {
 					universe_->_notifyConnect(this);
 				});
 			}
@@ -253,7 +253,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
 				send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
 				
 				// Ensure handlers called later or in new thread
-				pool.push([this](int id) {
+				ftl::pool.push([this](int id) {
 					universe_->_notifyConnect(this);
 				});
 			}
@@ -421,7 +421,7 @@ void Peer::data() {
 
 	// No thread currently processing messages so start one
 	if (is_waiting_) {
-		pool.push([](int id, Peer *p) {
+		ftl::pool.push([](int id, Peer *p) {
 			p->_data();
 			//p->is_waiting_ = true;
 		}, this);
diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
index 208fef758..0286512fc 100644
--- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
@@ -1,12 +1,12 @@
 #ifndef _FTL_RGBD_STREAMER_HPP_
 #define _FTL_RGBD_STREAMER_HPP_
 
-#include <ctpl_stl.h>
 #include <loguru.hpp>
 #include <ftl/configuration.hpp>
 #include <ftl/configurable.hpp>
 #include <ftl/rgbd/source.hpp>
 #include <ftl/net/universe.hpp>
+#include <ftl/threads.hpp>
 #include <string>
 #include <vector>
 #include <map>
@@ -101,7 +101,7 @@ class Streamer : public ftl::Configurable {
 
 	private:
 	std::map<std::string, detail::StreamSource*> sources_;
-	ctpl::thread_pool pool_;
+	//ctpl::thread_pool pool_;
 	std::shared_mutex mutex_;
 	bool active_;
 	ftl::net::Universe *net_;
diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp
index b3ad77e6c..251f77f82 100644
--- a/components/rgbd-sources/src/streamer.cpp
+++ b/components/rgbd-sources/src/streamer.cpp
@@ -21,10 +21,9 @@ using std::shared_lock;
 using std::this_thread::sleep_for;
 using std::chrono::milliseconds;
 
-#define THREAD_POOL_SIZE 6
 
 Streamer::Streamer(nlohmann::json &config, Universe *net)
-		: ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false), jobs_(0) {
+		: ftl::Configurable(config), late_(false), jobs_(0) {
 
 	active_ = false;
 	net_ = net;
@@ -89,7 +88,7 @@ Streamer::~Streamer() {
 	net_->unbind("get_stream");
 	net_->unbind("sync_streams");
 	net_->unbind("ping_streamer");
-	pool_.stop();
+	//pool_.stop();
 }
 
 void Streamer::add(Source *src) {
@@ -155,7 +154,7 @@ void Streamer::remove(const std::string &) {
 
 void Streamer::stop() {
 	active_ = false;
-	pool_.stop();
+	//pool_.stop();
 }
 
 void Streamer::poll() {
@@ -188,7 +187,7 @@ void Streamer::run(bool block) {
 		}
 	} else {
 		// Create thread job for frame ticking
-		pool_.push([this](int id) {
+		ftl::pool.push([this](int id) {
 			while (ftl::running && active_) {
 				poll();
 			}
@@ -229,7 +228,7 @@ void Streamer::_swap(StreamSource *src) {
 void Streamer::wait() {
 	// Do some jobs in this thread, might as well...
 	std::function<void(int)> j;
-	while ((bool)(j=pool_.pop())) {
+	while ((bool)(j=ftl::pool.pop())) {
 		j(-1);
 	}
 
@@ -264,7 +263,7 @@ void Streamer::_schedule() {
 		if (src == nullptr || src->state != 0) continue;
 
 		// Grab job
-		pool_.push([this,src](int id) {
+		ftl::pool.push([this,src](int id) {
 			//StreamSource *src = sources_[uri];
 			try {
 				src->src->grab();
@@ -283,7 +282,7 @@ void Streamer::_schedule() {
 		});
 
 		// Compress colour job
-		pool_.push([this,src](int id) {
+		ftl::pool.push([this,src](int id) {
 			if (!src->rgb.empty()) {
 				auto start = std::chrono::high_resolution_clock::now();
 
@@ -298,7 +297,7 @@ void Streamer::_schedule() {
 		});
 
 		// Compress depth job
-		pool_.push([this,src](int id) {
+		ftl::pool.push([this,src](int id) {
 			auto start = std::chrono::high_resolution_clock::now();
 
 			if (!src->depth.empty()) {
-- 
GitLab