Skip to content
Snippets Groups Projects
Commit b63f48f0 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #62 by using single thread pool

parent c932d7a3
No related branches found
No related tags found
No related merge requests found
......@@ -116,6 +116,8 @@ static void run(ftl::Configurable *root) {
stream->stop();
net->shutdown();
ftl::pool.stop();
delete stream;
delete display;
//delete source; // TODO(Nick) Add ftl::destroy
......
......@@ -3,6 +3,9 @@
#include <mutex>
#include <shared_mutex>
#include <ctpl_stl.h>
#define POOL_SIZE 10
#if defined _DEBUG && DEBUG_MUTEX
#include <loguru.hpp>
......@@ -24,7 +27,7 @@
#endif // _DEBUG && DEBUG_MUTEX
namespace ftl {
extern ctpl::thread_pool pool;
}
#endif // _FTL_THREADS_HPP_
......@@ -18,6 +18,7 @@
#include <ftl/configuration.hpp>
#include <ftl/configurable.hpp>
#include <ftl/uri.hpp>
#include <ftl/threads.hpp>
#include <fstream>
#include <string>
......@@ -34,6 +35,8 @@ using ftl::is_file;
using ftl::is_directory;
using ftl::Configurable;
ctpl::thread_pool ftl::pool(POOL_SIZE);
// Store loaded configuration
namespace ftl {
namespace config {
......
......@@ -60,7 +60,7 @@ int Peer::rpcid__ = 0;
// Global peer UUID
ftl::UUID ftl::net::this_peer;
static ctpl::thread_pool pool(5);
//static ctpl::thread_pool pool(5);
// TODO(nick) Move to tcp_internal.cpp
static SOCKET tcpConnect(URI &uri) {
......@@ -179,7 +179,7 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
if (version != ftl::net::kVersion) LOG(WARNING) << "Net protocol using different versions!";
// Ensure handlers called later or in new thread
pool.push([this](int id) {
ftl::pool.push([this](int id) {
universe_->_notifyConnect(this);
});
}
......@@ -253,7 +253,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
// Ensure handlers called later or in new thread
pool.push([this](int id) {
ftl::pool.push([this](int id) {
universe_->_notifyConnect(this);
});
}
......@@ -421,7 +421,7 @@ void Peer::data() {
// No thread currently processing messages so start one
if (is_waiting_) {
pool.push([](int id, Peer *p) {
ftl::pool.push([](int id, Peer *p) {
p->_data();
//p->is_waiting_ = true;
}, this);
......
#ifndef _FTL_RGBD_STREAMER_HPP_
#define _FTL_RGBD_STREAMER_HPP_
#include <ctpl_stl.h>
#include <loguru.hpp>
#include <ftl/configuration.hpp>
#include <ftl/configurable.hpp>
#include <ftl/rgbd/source.hpp>
#include <ftl/net/universe.hpp>
#include <ftl/threads.hpp>
#include <string>
#include <vector>
#include <map>
......@@ -101,7 +101,7 @@ class Streamer : public ftl::Configurable {
private:
std::map<std::string, detail::StreamSource*> sources_;
ctpl::thread_pool pool_;
//ctpl::thread_pool pool_;
std::shared_mutex mutex_;
bool active_;
ftl::net::Universe *net_;
......
......@@ -21,10 +21,9 @@ using std::shared_lock;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
#define THREAD_POOL_SIZE 6
Streamer::Streamer(nlohmann::json &config, Universe *net)
: ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false), jobs_(0) {
: ftl::Configurable(config), late_(false), jobs_(0) {
active_ = false;
net_ = net;
......@@ -89,7 +88,7 @@ Streamer::~Streamer() {
net_->unbind("get_stream");
net_->unbind("sync_streams");
net_->unbind("ping_streamer");
pool_.stop();
//pool_.stop();
}
void Streamer::add(Source *src) {
......@@ -155,7 +154,7 @@ void Streamer::remove(const std::string &) {
void Streamer::stop() {
active_ = false;
pool_.stop();
//pool_.stop();
}
void Streamer::poll() {
......@@ -188,7 +187,7 @@ void Streamer::run(bool block) {
}
} else {
// Create thread job for frame ticking
pool_.push([this](int id) {
ftl::pool.push([this](int id) {
while (ftl::running && active_) {
poll();
}
......@@ -229,7 +228,7 @@ void Streamer::_swap(StreamSource *src) {
void Streamer::wait() {
// Do some jobs in this thread, might as well...
std::function<void(int)> j;
while ((bool)(j=pool_.pop())) {
while ((bool)(j=ftl::pool.pop())) {
j(-1);
}
......@@ -264,7 +263,7 @@ void Streamer::_schedule() {
if (src == nullptr || src->state != 0) continue;
// Grab job
pool_.push([this,src](int id) {
ftl::pool.push([this,src](int id) {
//StreamSource *src = sources_[uri];
try {
src->src->grab();
......@@ -283,7 +282,7 @@ void Streamer::_schedule() {
});
// Compress colour job
pool_.push([this,src](int id) {
ftl::pool.push([this,src](int id) {
if (!src->rgb.empty()) {
auto start = std::chrono::high_resolution_clock::now();
......@@ -298,7 +297,7 @@ void Streamer::_schedule() {
});
// Compress depth job
pool_.push([this,src](int id) {
ftl::pool.push([this,src](int id) {
auto start = std::chrono::high_resolution_clock::now();
if (!src->depth.empty()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment