diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 0fc234815c5c256e07db8b3d72e2f99f10d24cdd..85ae3f6dde17b1c207d7464d8c8bd1479efcf925 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -33,11 +33,9 @@ static const unsigned int kDepth = 0x4; struct StreamSource { ftl::rgbd::Source *src; - std::atomic<unsigned int> state; // Busy or ready to swap? + std::atomic<unsigned int> jobs; // Busy or ready to swap? cv::Mat rgb; // Tx buffer cv::Mat depth; // Tx buffer - std::vector<unsigned char> rgb_buf; - std::vector<unsigned char> d_buf; std::vector<detail::StreamClient> clients[10]; // One list per bitrate std::shared_mutex mutex; }; diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 4d3b60f19c9df9b8fdf8bc0fd704e545d19cb072..bca35479febd8a1113de97b13f02dc739332183f 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -78,11 +78,14 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch //lk.unlock(); } -void NetSource::_recvChunk(int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { +void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { cv::Mat tmp_rgb, tmp_depth; if (!active_) return; + LOG(INFO) << "Received chunk " << (int)chunk; + + try { // Decode in temporary buffers to prevent long locks cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); @@ -99,8 +102,12 @@ void NetSource::_recvChunk(int frame, uchar chunk, bool delta, const vector<unsi UNIQUE_LOCK(host_->mutex(),lk); tmp_rgb.copyTo(chunkRGB); tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*100.0f)); - N_--; + if (chunk == 0) N_--; //lk.unlock(); + } catch(...) { + LOG(ERROR) << "Decode exception"; + return; + } } void NetSource::setPose(const Eigen::Matrix4f &pose) { @@ -137,7 +144,7 @@ void NetSource::_updateURI() { has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_); - host_->getNet()->bind(*uri, [this](int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { _recvChunk(frame, chunk, delta, jpg, d); }); diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index ed4d7bc1f71e77a67acd16963f2bb380f2a0f5b9..5ad0e08d7d71eba003edd69a5a04bebe09225d50 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -43,7 +43,7 @@ class NetSource : public detail::Source { bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); - void _recvChunk(int frame, uchar chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); + void _recvChunk(int frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); void _updateURI(); }; diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 01ba44aa803226eaee11a18fd145cca7474d0121..49b5f3077a541278ed809f9e8464ce37622cd097 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -101,7 +101,7 @@ void Streamer::add(Source *src) { StreamSource *s = new StreamSource; s->src = src; - s->state = 0; + s->jobs = 0; sources_[src->getID()] = s; } @@ -198,31 +198,22 @@ void Streamer::run(bool block) { // Must be called in source locked state or src.state must be atomic void Streamer::_swap(StreamSource *src) { - if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) { + if (src->jobs == 0) { UNIQUE_LOCK(src->mutex,lk); - if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) { - auto i = src->clients[0].begin(); - while (i != src->clients[0].end()) { - try { - // TODO(Nick) Send pose and timestamp - if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) { - (*i).txcount = (*i).txmax; - } - } catch(...) { - (*i).txcount = (*i).txmax; - } - (*i).txcount++; - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[0].erase(i); - } else { - i++; - } + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + (*i).txcount++; + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients[0].erase(i); + } else { + i++; } } + src->src->getFrames(src->rgb, src->depth); - src->state = 0; + src->jobs = 0; } } @@ -257,11 +248,12 @@ void Streamer::_schedule() { // There will be two jobs for this source... //UNIQUE_LOCK(job_mtx_,lk); - jobs_ += 3; + jobs_ += 1 + kChunkDim*kChunkDim; //lk.unlock(); StreamSource *src = sources_[uri]; - if (src == nullptr || src->state != 0) continue; + if (src == nullptr || src->jobs != 0) continue; + src->jobs = 1 + kChunkDim*kChunkDim; // Grab job pool_.push([this,src](int id) { @@ -274,7 +266,8 @@ void Streamer::_schedule() { // CHECK (Nick) Can state be an atomic instead? //UNIQUE_LOCK(src->mutex, lk); - src->state |= ftl::rgbd::detail::kGrabbed; + src->jobs--; + //src->state |= ftl::rgbd::detail::kGrabbed; _swap(src); // Mark job as finished @@ -306,10 +299,24 @@ void Streamer::_schedule() { vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow. cv::imencode(".png", d2, d_buf, pngparams); - + LOG(INFO) << "Sending chunk " << chunk; + + UNIQUE_LOCK(src->mutex,lk); + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + try { + // TODO(Nick) Send pose and timestamp + if (!net_->send((*i).peerid, (*i).uri, 0, chunk, false, rgb_buf, d_buf)) { + (*i).txcount = (*i).txmax; + } + } catch(...) { + (*i).txcount = (*i).txmax; + } + } } - src->state |= ftl::rgbd::detail::kRGB; + //src->state |= ftl::rgbd::detail::kRGB; + src->jobs--; _swap(src); --jobs_; job_cv_.notify_one();