diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index b45163006d9755c3c3a713a23b1b9ccdf704cb8c..d0e10034993b2de34757f6fea64b5abbb1c8456b 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -214,10 +214,10 @@ class Universe : public ftl::Configurable { std::map<ftl::UUID, ftl::net::Peer*> peer_ids_; ftl::UUID id_; ftl::net::Dispatcher disp_; - std::thread thread_; std::list<ReconnectInfo> reconnects_; size_t phase_; std::list<ftl::net::Peer*> garbage_; + std::thread thread_; struct ConnHandler { callback_t id; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 402c5bed22cc394fd08251f2d2625983b89ed48e..c2b94acc1aeead0369601c7a65bf576c97d88832 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -24,12 +24,12 @@ using ftl::net::callback_t; callback_t ftl::net::Universe::cbid__ = 0; -Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this), phase_(0) { +Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { _installBindings(); } Universe::Universe(nlohmann::json &config) : - Configurable(config), active_(true), this_peer(ftl::net::this_peer), thread_(Universe::__start, this), phase_(0) { + Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { _installBindings(); } diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index d2beacb2105c38dfa7d3c8317f2e6512cb07c570..82dba747857eb0c6fc7ab62e52990f3e2a30ae67 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -35,7 +35,7 @@ static const unsigned int kFrameDropLimit = 5; struct StreamSource { ftl::rgbd::Source *src; - std::atomic<unsigned int> jobs; // Busy or ready to swap? + std::atomic<int> jobs; // Busy or ready to swap? std::atomic<unsigned int> clientCount; cv::Mat rgb; // Tx buffer cv::Mat depth; // Tx buffer diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 12a76cc40d38f876f11cbc4919cf7bcb13a4bee5..68f46fc2b6879f437d6cbf76b56145af334e9e7a 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -150,8 +150,6 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un // Lock to allow buffer swap UNIQUE_LOCK(mutex_,lk2); - chunk_count_ = 0; - // Swap the double buffers cv::Mat tmp; tmp = rgb_; @@ -161,6 +159,7 @@ void NetSource::_recvChunk(int64_t frame, int chunk, bool delta, const vector<un depth_ = d_depth_; d_depth_ = tmp; + chunk_count_ = 0; timestamp_ = current_frame_; current_frame_ = frame; } diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 9aee6dfbdbbcbec518ed3c3da879c7aa346c3096..c5efd26fc51068b74cfdc03e84cc3800a5931ea8 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -165,10 +165,10 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) { void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) { SHARED_LOCK(mutex_,lk); - //rgb_.copyTo(rgb); - //depth_.copyTo(depth); - rgb = rgb_; - depth = depth_; + rgb_.copyTo(rgb); + depth_.copyTo(depth); + //rgb = rgb_; + //depth = depth_; /*cv::Mat tmp; tmp = rgb; @@ -254,8 +254,11 @@ bool Source::compute(int N, int B) { depth_ = impl_->depth_; impl_->depth_ = tmp;*/ + // TODO:(Nick) Reduce buffer copies impl_->rgb_.copyTo(rgb_); impl_->depth_.copyTo(depth_); + //rgb_ = impl_->rgb_; + //depth_ = impl_->depth_; return true; } return false; diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index ab03f2a6527163d3413e7a1d2ada8aed9803f07f..21821c908964b5f0889330fa931888c9bc10bcf1 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -263,30 +263,31 @@ void Streamer::run(bool block) { void Streamer::_swap(StreamSource *src) { if (src->jobs == 0) { UNIQUE_LOCK(src->mutex,lk); - - for (unsigned int b=0; b<10; ++b) { - auto i = src->clients[b].begin(); - while (i != src->clients[b].end()) { - // Client request completed so remove from list - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[b].erase(i); - --src->clientCount; - } else { - i++; + if (src->jobs == 0) { + for (unsigned int b=0; b<10; ++b) { + auto i = src->clients[b].begin(); + while (i != src->clients[b].end()) { + // Client request completed so remove from list + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients[b].erase(i); + --src->clientCount; + } else { + i++; + } } } - } - src->src->getFrames(src->rgb, src->depth); - src->src->swap(); + src->src->swap(); + src->src->getFrames(src->rgb, src->depth); - //if (!src->rgb.empty() && src->prev_depth.empty()) { - //src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0)); - //LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows; - //} - src->jobs = 0; - src->frame++; + //if (!src->rgb.empty() && src->prev_depth.empty()) { + //src->prev_depth = cv::Mat(src->rgb.size(), CV_16UC1, cv::Scalar(0)); + //LOG(INFO) << "Creating prevdepth: " << src->rgb.cols << "," << src->rgb.rows; + //} + src->jobs = -1; + src->frame++; + } } } @@ -310,19 +311,13 @@ void Streamer::wait() { } void Streamer::_schedule(StreamSource *src) { - // There will be two jobs for this source... - //UNIQUE_LOCK(job_mtx_,lk); - jobs_ += 2 + kChunkCount; - //lk.unlock(); + if (src == nullptr || src->jobs > 0) return; - //StreamSource *src = sources_[uri]; - if (src == nullptr || src->jobs != 0) return; + jobs_ += 2 + kChunkCount; src->jobs = 2 + kChunkCount; // Grab / capture job ftl::pool.push([this,src](int id) { - //auto start = std::chrono::high_resolution_clock::now(); - auto start = std::chrono::high_resolution_clock::now(); int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_; int64_t target = now / mspf_; @@ -374,7 +369,7 @@ void Streamer::_schedule(StreamSource *src) { // Mark job as finished std::unique_lock<std::mutex> lk(job_mtx_); --jobs_; - job_cv_.notify_one(); + if (jobs_ == 0) job_cv_.notify_one(); }); // Compute job @@ -395,7 +390,7 @@ void Streamer::_schedule(StreamSource *src) { // Mark job as finished std::unique_lock<std::mutex> lk(job_mtx_); --jobs_; - job_cv_.notify_one(); + if (jobs_ == 0) job_cv_.notify_one(); }); // Create jobs for each chunk @@ -415,7 +410,7 @@ void Streamer::_schedule(StreamSource *src) { _swap(src); std::unique_lock<std::mutex> lk(job_mtx_); --jobs_; - job_cv_.notify_one(); + if (jobs_ == 0) job_cv_.notify_one(); }); } }