diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 065ee28c60cc9382c33e1136309275c64eed0f6d..63f5c06fa55890708a999168e87d2aed2204efa8 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -11,6 +11,7 @@ #include <vector> #include <map> #include <shared_mutex> +#include <atomic> namespace ftl { namespace rgbd { @@ -29,7 +30,7 @@ static const unsigned int kTransmitted = 0x2; struct StreamSource { ftl::rgbd::Source *src; - unsigned int state; // Busy or ready to swap? + std::atomic<unsigned int> state; // Busy or ready to swap? cv::Mat rgb; // Tx buffer cv::Mat depth; // Tx buffer std::vector<detail::StreamClient> clients[10]; // One list per bitrate @@ -104,7 +105,7 @@ class Streamer : public ftl::Configurable { bool late_; std::mutex job_mtx_; std::condition_variable job_cv_; - int jobs_; + std::atomic<int> jobs_; void _schedule(); void _swap(detail::StreamSource &); diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 5b556a1fd0418009cfa53e849ca03f90c4548685..e9c0701c4e72f7d9f032f4e87173919cfa75d119 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -106,15 +106,23 @@ void Streamer::add(Source *src) { } void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { - UNIQUE_LOCK(mutex_,slk); - if (sources_.find(source) == sources_.end()) return; + StreamSource *s = nullptr; - if (rate < 0 || rate >= 10) return; - if (N < 0 || N > ftl::rgbd::kMaxFrames) return; + { + UNIQUE_LOCK(mutex_,slk); + if (sources_.find(source) == sources_.end()) return; - DLOG(INFO) << "Adding Stream Peer: " << peer.to_string(); + if (rate < 0 || rate >= 10) return; + if (N < 0 || N > ftl::rgbd::kMaxFrames) return; - StreamSource *s = sources_[source]; + DLOG(INFO) << "Adding Stream Peer: " << peer.to_string(); + + s = sources_[source]; + } + + if (!s) return; + + UNIQUE_LOCK(s->mutex, lk2); for (int i=0; i<s->clients[rate].size(); i++) { if (s->clients[rate][i].peerid == peer) { StreamClient &c = s->clients[rate][i]; @@ -221,26 +229,29 @@ void Streamer::_schedule() { } // There will be two jobs for this source... - UNIQUE_LOCK(job_mtx_,lk); + //UNIQUE_LOCK(job_mtx_,lk); jobs_ += 2; - lk.unlock(); + //lk.unlock(); - // Grab job - pool_.push([this,uri](int id) { - StreamSource *src = sources_[uri]; + StreamSource *src = sources_[uri]; + if (src == nullptr || src->state != 0) continue; + // Grab job + pool_.push([this,src](int id) { + //StreamSource *src = sources_[uri]; src->src->grab(); // CHECK (Nick) Can state be an atomic instead? - UNIQUE_LOCK(src->mutex, lk); + //UNIQUE_LOCK(src->mutex, lk); src->state |= ftl::rgbd::detail::kGrabbed; _swap(*src); - lk.unlock(); + //lk.unlock(); // Mark job as finished - UNIQUE_LOCK(job_mtx_, ulk); - jobs_--; - ulk.unlock(); + //UNIQUE_LOCK(job_mtx_, ulk); + //jobs_--; + //ulk.unlock(); + --jobs_; job_cv_.notify_one(); }); @@ -249,8 +260,8 @@ void Streamer::_schedule() { // meaning that no lock is required here since outer shared_lock // prevents addition of new clients. // TODO, could do one for each bitrate... - pool_.push([this,uri](int id) { - StreamSource *src = sources_[uri]; + pool_.push([this,src](int id) { + //StreamSource *src = sources_[uri]; try { if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { @@ -270,22 +281,25 @@ void Streamer::_schedule() { //LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb"; - auto i = src->clients[0].begin(); - while (i != src->clients[0].end()) { - try { - // TODO(Nick) Send pose and timestamp - if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) { + { + UNIQUE_LOCK(src->mutex,lk); + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + try { + // TODO(Nick) Send pose and timestamp + if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) { + (*i).txcount = (*i).txmax; + } + } catch(...) { (*i).txcount = (*i).txmax; } - } catch(...) { - (*i).txcount = (*i).txmax; - } - (*i).txcount++; - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[0].erase(i); - } else { - i++; + (*i).txcount++; + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients[0].erase(i); + } else { + i++; + } } } } @@ -298,16 +312,17 @@ void Streamer::_schedule() { LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/ // CHECK (Nick) Could state be an atomic? - UNIQUE_LOCK(src->mutex,lk); + //UNIQUE_LOCK(src->mutex,lk); //LOG(INFO) << "Tx Frame: " << uri; src->state |= ftl::rgbd::detail::kTransmitted; _swap(*src); - lk.unlock(); + //lk.unlock(); // Mark job as finished - UNIQUE_LOCK(job_mtx_,ulk); - jobs_--; - ulk.unlock(); + //UNIQUE_LOCK(job_mtx_,ulk); + //jobs_--; + //ulk.unlock(); + --jobs_; job_cv_.notify_one(); }); }