diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 208fef758e1c268d42c7f8a49bc0eb43c8b35b85..0fc234815c5c256e07db8b3d72e2f99f10d24cdd 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -16,6 +16,8 @@ namespace ftl { namespace rgbd { +static const int kChunkDim = 4; + namespace detail { struct StreamClient { diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index ef3fdd5d270288cd6c73422d0651cb40bced7b82..4d3b60f19c9df9b8fdf8bc0fd704e545d19cb072 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -4,6 +4,8 @@ #include <chrono> #include <shared_mutex> +#include <ftl/rgbd/streamer.hpp> + using ftl::rgbd::detail::NetSource; using ftl::net::Universe; using ftl::UUID; @@ -76,6 +78,31 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch //lk.unlock(); } +void NetSource::_recvChunk(int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + cv::Mat tmp_rgb, tmp_depth; + + if (!active_) return; + + // Decode in temporary buffers to prevent long locks + cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); + cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); + + // Build chunk head + int cx = (chunk % chunks_dim_) * chunk_width_; + int cy = (chunk / chunks_dim_) * chunk_height_; + + cv::Rect roi(cx,cy,chunk_width_,chunk_height_); + cv::Mat chunkRGB = rgb_(roi); + cv::Mat chunkDepth = depth_(roi); + + // Lock host to prevent grab + UNIQUE_LOCK(host_->mutex(),lk); + tmp_rgb.copyTo(chunkRGB); + tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*100.0f)); + N_--; + //lk.unlock(); +} + void NetSource::setPose(const Eigen::Matrix4f &pose) { if (!active_) return; @@ -110,8 +137,8 @@ void NetSource::_updateURI() { has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_); - host_->getNet()->bind(*uri, [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { - _recv(jpg, d); + host_->getNet()->bind(*uri, [this](int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + _recvChunk(frame, chunk, delta, jpg, d); }); N_ = 10; @@ -123,6 +150,13 @@ void NetSource::_updateURI() { LOG(ERROR) << "Could not connect to stream " << *uri; } + // Update chunk details + chunks_dim_ = ftl::rgbd::kChunkDim; + chunk_width_ = params_.width / chunks_dim_; + chunk_height_ = params_.height / chunks_dim_; + rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0)); + depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f); + uri_ = *uri; active_ = true; } else { diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index de4a3b00b78bc8322a82f4ec598263d35202d69d..ed4d7bc1f71e77a67acd16963f2bb380f2a0f5b9 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -37,9 +37,13 @@ class NetSource : public detail::Source { std::string uri_; ftl::net::callback_t h_; std::mutex mutex_; + int chunks_dim_; + int chunk_width_; + int chunk_height_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); + void _recvChunk(int frame, uchar chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); void _updateURI(); }; diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index b3ad77e6c1feff8da5c99d5ad2baaf16e812a109..01ba44aa803226eaee11a18fd145cca7474d0121 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -282,8 +282,42 @@ void Streamer::_schedule() { job_cv_.notify_one(); }); + // Create jobs for each chunk + for (int i=0; i<(kChunkDim*kChunkDim); i++) { + pool_.push([this,src](int id, int chunk) { + if (!src->rgb.empty() && !src->depth.empty()) { + int chunk_width = src->rgb.cols / kChunkDim; + int chunk_height = src->rgb.rows / kChunkDim; + + // Build chunk head + int cx = (chunk % kChunkDim) * chunk_width; + int cy = (chunk / kChunkDim) * chunk_height; + + cv::Rect roi(cx,cy,chunk_width,chunk_height); + vector<unsigned char> rgb_buf; + cv::Mat chunkRGB = src->rgb(roi); + cv::Mat chunkDepth = src->depth(roi); + + cv::imencode(".jpg", chunkRGB, rgb_buf); + + cv::Mat d2; + vector<unsigned char> d_buf; + chunkDepth.convertTo(d2, CV_16UC1, 16*100); + vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow. + cv::imencode(".png", d2, d_buf, pngparams); + + + } + + src->state |= ftl::rgbd::detail::kRGB; + _swap(src); + --jobs_; + job_cv_.notify_one(); + }, i); + } + // Compress colour job - pool_.push([this,src](int id) { + /*pool_.push([this,src](int id) { if (!src->rgb.empty()) { auto start = std::chrono::high_resolution_clock::now(); @@ -322,7 +356,7 @@ void Streamer::_schedule() { _swap(src); --jobs_; job_cv_.notify_one(); - }); + });*/ // Transmit job // For any single source and bitrate there is only one thread