Skip to content
Snippets Groups Projects
Commit d3e198ee authored by Nicolas Pope's avatar Nicolas Pope
Browse files

WIP receive can't receive fast enough

parent b0c39b18
No related branches found
No related tags found
1 merge request!40Implements #89 PNG chunking
......@@ -33,11 +33,9 @@ static const unsigned int kDepth = 0x4;
struct StreamSource {
ftl::rgbd::Source *src;
std::atomic<unsigned int> state; // Busy or ready to swap?
std::atomic<unsigned int> jobs; // Busy or ready to swap?
cv::Mat rgb; // Tx buffer
cv::Mat depth; // Tx buffer
std::vector<unsigned char> rgb_buf;
std::vector<unsigned char> d_buf;
std::vector<detail::StreamClient> clients[10]; // One list per bitrate
std::shared_mutex mutex;
};
......
......@@ -78,11 +78,14 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch
//lk.unlock();
}
void NetSource::_recvChunk(int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
void NetSource::_recvChunk(int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
cv::Mat tmp_rgb, tmp_depth;
if (!active_) return;
LOG(INFO) << "Received chunk " << (int)chunk;
try {
// Decode in temporary buffers to prevent long locks
cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
......@@ -99,8 +102,12 @@ void NetSource::_recvChunk(int frame, uchar chunk, bool delta, const vector<unsi
UNIQUE_LOCK(host_->mutex(),lk);
tmp_rgb.copyTo(chunkRGB);
tmp_depth.convertTo(chunkDepth, CV_32FC1, 1.0f/(16.0f*100.0f));
N_--;
if (chunk == 0) N_--;
//lk.unlock();
} catch(...) {
LOG(ERROR) << "Decode exception";
return;
}
}
void NetSource::setPose(const Eigen::Matrix4f &pose) {
......@@ -137,7 +144,7 @@ void NetSource::_updateURI() {
has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_);
host_->getNet()->bind(*uri, [this](int frame, uchar chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
host_->getNet()->bind(*uri, [this](int frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
_recvChunk(frame, chunk, delta, jpg, d);
});
......
......@@ -43,7 +43,7 @@ class NetSource : public detail::Source {
bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p);
void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _recvChunk(int frame, uchar chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _recvChunk(int frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
void _updateURI();
};
......
......@@ -101,7 +101,7 @@ void Streamer::add(Source *src) {
StreamSource *s = new StreamSource;
s->src = src;
s->state = 0;
s->jobs = 0;
sources_[src->getID()] = s;
}
......@@ -198,20 +198,11 @@ void Streamer::run(bool block) {
// Must be called in source locked state or src.state must be atomic
void Streamer::_swap(StreamSource *src) {
if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) {
if (src->jobs == 0) {
UNIQUE_LOCK(src->mutex,lk);
if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) {
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
(*i).txcount++;
if ((*i).txcount >= (*i).txmax) {
LOG(INFO) << "Remove client: " << (*i).uri;
......@@ -220,9 +211,9 @@ void Streamer::_swap(StreamSource *src) {
i++;
}
}
}
src->src->getFrames(src->rgb, src->depth);
src->state = 0;
src->jobs = 0;
}
}
......@@ -257,11 +248,12 @@ void Streamer::_schedule() {
// There will be two jobs for this source...
//UNIQUE_LOCK(job_mtx_,lk);
jobs_ += 3;
jobs_ += 1 + kChunkDim*kChunkDim;
//lk.unlock();
StreamSource *src = sources_[uri];
if (src == nullptr || src->state != 0) continue;
if (src == nullptr || src->jobs != 0) continue;
src->jobs = 1 + kChunkDim*kChunkDim;
// Grab job
pool_.push([this,src](int id) {
......@@ -274,7 +266,8 @@ void Streamer::_schedule() {
// CHECK (Nick) Can state be an atomic instead?
//UNIQUE_LOCK(src->mutex, lk);
src->state |= ftl::rgbd::detail::kGrabbed;
src->jobs--;
//src->state |= ftl::rgbd::detail::kGrabbed;
_swap(src);
// Mark job as finished
......@@ -306,10 +299,24 @@ void Streamer::_schedule() {
vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow.
cv::imencode(".png", d2, d_buf, pngparams);
LOG(INFO) << "Sending chunk " << chunk;
UNIQUE_LOCK(src->mutex,lk);
auto i = src->clients[0].begin();
while (i != src->clients[0].end()) {
try {
// TODO(Nick) Send pose and timestamp
if (!net_->send((*i).peerid, (*i).uri, 0, chunk, false, rgb_buf, d_buf)) {
(*i).txcount = (*i).txmax;
}
} catch(...) {
(*i).txcount = (*i).txmax;
}
}
}
src->state |= ftl::rgbd::detail::kRGB;
//src->state |= ftl::rgbd::detail::kRGB;
src->jobs--;
_swap(src);
--jobs_;
job_cv_.notify_one();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment