Skip to content
Snippets Groups Projects
Commit b0c39b18 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

WIP stream chunking

parent c932d7a3
No related branches found
No related tags found
1 merge request!40Implements #89 PNG chunking
Pipeline #11591 passed
......@@ -16,6 +16,8 @@
namespace ftl {
namespace rgbd {
static const int kChunkDim = 4;
namespace detail {
struct StreamClient {
......
......@@ -4,6 +4,8 @@
#include <chrono>
#include <shared_mutex>
#include <ftl/rgbd/streamer.hpp>
using ftl::rgbd::detail::NetSource;
using ftl::net::Universe;
using ftl::UUID;
......@@ -76,6 +78,31 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch
//lk.unlock();
}
void NetSource::_recvChunk(int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
cv::Mat tmp_rgb, tmp_depth;
if (!active_) return;
// Decode in temporary buffers to prevent long locks
cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
// Build chunk head
int cx = (chunk % chunks_dim_) * chunk_width_;
int cy = (chunk / chunks_dim_) * chunk_height_;
cv::Rect roi(cx,cy,chunk_width_,chunk_height_);
cv::Mat chunkRGB = rgb_(roi);
cv::Mat chunkDepth = depth_(roi);
// Lock host to prevent grab
UNIQUE_LOCK(host_->mutex(),lk);
tmp_rgb.copyTo(chunkRGB);
tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*100.0f));
N_--;
//lk.unlock();
}
void NetSource::setPose(const Eigen::Matrix4f &pose) {
if (!active_) return;
......@@ -110,8 +137,8 @@ void NetSource::_updateURI() {
has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_);
host_->getNet()->bind(*uri, [this](const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recv(jpg, d);
host_->getNet()->bind(*uri, [this](int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recvChunk(frame, chunk, delta, jpg, d);
});
N_ = 10;
......@@ -123,6 +150,13 @@ void NetSource::_updateURI() {
LOG(ERROR) << "Could not connect to stream " << *uri;
}
// Update chunk details
chunks_dim_ = ftl::rgbd::kChunkDim;
chunk_width_ = params_.width / chunks_dim_;
chunk_height_ = params_.height / chunks_dim_;
rgb_ = cv::Mat(cv::Size(params_.width, params_.height), CV_8UC3, cv::Scalar(0,0,0));
depth_ = cv::Mat(cv::Size(params_.width, params_.height), CV_32FC1, 0.0f);
uri_ = *uri;
active_ = true;
} else {
......
......@@ -37,9 +37,13 @@ class NetSource : public detail::Source {
std::string uri_;
ftl::net::callback_t h_;
std::mutex mutex_;
int chunks_dim_;
int chunk_width_;
int chunk_height_;
bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p);
void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _recvChunk(int frame, uchar chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _updateURI();
};
......
......@@ -282,8 +282,42 @@ void Streamer::_schedule() {
job_cv_.notify_one();
});
// Create jobs for each chunk
for (int i=0; i<(kChunkDim*kChunkDim); i++) {
pool_.push([this,src](int id, int chunk) {
if (!src->rgb.empty() && !src->depth.empty()) {
int chunk_width = src->rgb.cols / kChunkDim;
int chunk_height = src->rgb.rows / kChunkDim;
// Build chunk head
int cx = (chunk % kChunkDim) * chunk_width;
int cy = (chunk / kChunkDim) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = src->rgb(roi);
cv::Mat chunkDepth = src->depth(roi);
cv::imencode(".jpg", chunkRGB, rgb_buf);
cv::Mat d2;
vector<unsigned char> d_buf;
chunkDepth.convertTo(d2, CV_16UC1, 16*100);
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, d_buf, pngparams);
}
src->state |= ftl::rgbd::detail::kRGB;
_swap(src);
--jobs_;
job_cv_.notify_one();
}, i);
}
// Compress colour job
pool_.push([this,src](int id) {
/*pool_.push([this,src](int id) {
if (!src->rgb.empty()) {
auto start = std::chrono::high_resolution_clock::now();
......@@ -322,7 +356,7 @@ void Streamer::_schedule() {
_swap(src);
--jobs_;
job_cv_.notify_one();
});
});*/
// Transmit job
// For any single source and bitrate there is only one thread
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment