diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 4beea6fdb4ff4c16c9f5acef9b7fd62c4cfd845c..141ae56131ec13e1e1cd48608c6f32f62e3b9f06 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -116,6 +116,8 @@ static void run(ftl::Configurable *root) { stream->stop(); net->shutdown(); + ftl::pool.stop(); + delete stream; delete display; //delete source; // TODO(Nick) Add ftl::destroy diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index 199b355b448032895f86c7ba12f44371fd2a2337..3091ac64f894b2e8a3ec059c615cfba563e43bee 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -3,6 +3,9 @@ #include <mutex> #include <shared_mutex> +#include <ctpl_stl.h> + +#define POOL_SIZE 10 #if defined _DEBUG && DEBUG_MUTEX #include <loguru.hpp> @@ -24,7 +27,7 @@ #endif // _DEBUG && DEBUG_MUTEX namespace ftl { - + extern ctpl::thread_pool pool; } #endif // _FTL_THREADS_HPP_ diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp index b8633f0de843ca55a77143b8759aeb39124101fe..a10ba86e571c0844c931caa643aa668d0d9c2443 100644 --- a/components/common/cpp/src/configuration.cpp +++ b/components/common/cpp/src/configuration.cpp @@ -18,6 +18,7 @@ #include <ftl/configuration.hpp> #include <ftl/configurable.hpp> #include <ftl/uri.hpp> +#include <ftl/threads.hpp> #include <fstream> #include <string> @@ -34,6 +35,8 @@ using ftl::is_file; using ftl::is_directory; using ftl::Configurable; +ctpl::thread_pool ftl::pool(POOL_SIZE); + // Store loaded configuration namespace ftl { namespace config { diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index f3f875ae8e648c176159417adf9711318ae3cb7d..853b91d26c213a74fc83a5f99706e7b6282dec76 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -60,7 +60,7 @@ int Peer::rpcid__ = 0; // Global peer UUID ftl::UUID ftl::net::this_peer; -static ctpl::thread_pool pool(5); +//static ctpl::thread_pool pool(5); // TODO(nick) Move to tcp_internal.cpp static SOCKET tcpConnect(URI &uri) { @@ -179,7 +179,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!"; // Ensure handlers called later or in new thread - pool.push([this](int id) { + ftl::pool.push([this](int id) { universe_->_notifyConnect(this); }); } @@ -253,7 +253,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); // Ensure handlers called later or in new thread - pool.push([this](int id) { + ftl::pool.push([this](int id) { universe_->_notifyConnect(this); }); } @@ -421,7 +421,7 @@ void Peer::data() { // No thread currently processing messages so start one if (is_waiting_) { - pool.push([](int id, Peer *p) { + ftl::pool.push([](int id, Peer *p) { p->_data(); //p->is_waiting_ = true; }, this); diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 208fef758e1c268d42c7f8a49bc0eb43c8b35b85..0286512fc9eb40dfb81663f2141d828281855107 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -1,12 +1,12 @@ #ifndef _FTL_RGBD_STREAMER_HPP_ #define _FTL_RGBD_STREAMER_HPP_ -#include <ctpl_stl.h> #include <loguru.hpp> #include <ftl/configuration.hpp> #include <ftl/configurable.hpp> #include <ftl/rgbd/source.hpp> #include <ftl/net/universe.hpp> +#include <ftl/threads.hpp> #include <string> #include <vector> #include <map> @@ -101,7 +101,7 @@ class Streamer : public ftl::Configurable { private: std::map<std::string, detail::StreamSource*> sources_; - ctpl::thread_pool pool_; + //ctpl::thread_pool pool_; std::shared_mutex mutex_; bool active_; ftl::net::Universe *net_; diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index b3ad77e6c1feff8da5c99d5ad2baaf16e812a109..251f77f82f28fff82fe1588c8d367b4031de197c 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -21,10 +21,9 @@ using std::shared_lock; using std::this_thread::sleep_for; using std::chrono::milliseconds; -#define THREAD_POOL_SIZE 6 Streamer::Streamer(nlohmann::json &config, Universe *net) - : ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false), jobs_(0) { + : ftl::Configurable(config), late_(false), jobs_(0) { active_ = false; net_ = net; @@ -89,7 +88,7 @@ Streamer::~Streamer() { net_->unbind("get_stream"); net_->unbind("sync_streams"); net_->unbind("ping_streamer"); - pool_.stop(); + //pool_.stop(); } void Streamer::add(Source *src) { @@ -155,7 +154,7 @@ void Streamer::remove(const std::string &) { void Streamer::stop() { active_ = false; - pool_.stop(); + //pool_.stop(); } void Streamer::poll() { @@ -188,7 +187,7 @@ void Streamer::run(bool block) { } } else { // Create thread job for frame ticking - pool_.push([this](int id) { + ftl::pool.push([this](int id) { while (ftl::running && active_) { poll(); } @@ -229,7 +228,7 @@ void Streamer::_swap(StreamSource *src) { void Streamer::wait() { // Do some jobs in this thread, might as well... std::function<void(int)> j; - while ((bool)(j=pool_.pop())) { + while ((bool)(j=ftl::pool.pop())) { j(-1); } @@ -264,7 +263,7 @@ void Streamer::_schedule() { if (src == nullptr || src->state != 0) continue; // Grab job - pool_.push([this,src](int id) { + ftl::pool.push([this,src](int id) { //StreamSource *src = sources_[uri]; try { src->src->grab(); @@ -283,7 +282,7 @@ void Streamer::_schedule() { }); // Compress colour job - pool_.push([this,src](int id) { + ftl::pool.push([this,src](int id) { if (!src->rgb.empty()) { auto start = std::chrono::high_resolution_clock::now(); @@ -298,7 +297,7 @@ void Streamer::_schedule() { }); // Compress depth job - pool_.push([this,src](int id) { + ftl::pool.push([this,src](int id) { auto start = std::chrono::high_resolution_clock::now(); if (!src->depth.empty()) {