Skip to content
Snippets Groups Projects
Commit 940be18d authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Allow multi qualities per client

parent a5601bbc
No related branches found
No related tags found
1 merge request!197Multiple stream qualities per client
...@@ -294,11 +294,15 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID ...@@ -294,11 +294,15 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
for (auto &client : s->clients) { for (auto &client : s->clients) {
// If already listening, just update chunk counters // If already listening, just update chunk counters
if (client.peerid == peer) { if (client.peerid == peer) {
// Allow for same client but different quality (beyond threshold)
if ((client.preset < kQualityThreshold && rate >= kQualityThreshold) ||
(client.preset >= kQualityThreshold && rate < kQualityThreshold)) continue;
client.txmax = N; client.txmax = N;
client.txcount = 0; client.txcount = 0;
// Possible switch from high quality to low quality encoding or vice versa // Possible switch from high quality to low quality encoding or vice versa
if (client.preset < kQualityThreshold && rate >= kQualityThreshold) { /*if (client.preset < kQualityThreshold && rate >= kQualityThreshold) {
s->hq_count--; s->hq_count--;
s->lq_count++; s->lq_count++;
if (s->lq_encoder_c1) s->lq_encoder_c1->reset(); if (s->lq_encoder_c1) s->lq_encoder_c1->reset();
...@@ -308,7 +312,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID ...@@ -308,7 +312,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
s->lq_count--; s->lq_count--;
if (s->hq_encoder_c1) s->hq_encoder_c1->reset(); if (s->hq_encoder_c1) s->hq_encoder_c1->reset();
if (s->hq_encoder_c2) s->hq_encoder_c2->reset(); if (s->hq_encoder_c2) s->hq_encoder_c2->reset();
} break;
}*/
client.preset = rate; client.preset = rate;
return; return;
...@@ -512,51 +517,6 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -512,51 +517,6 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
}); });
} }
} }
// Do we need to do low quality encoding?
/*if (src->lq_count > 0) {
if (!src->lq_encoder_c1) src->lq_encoder_c1 = ftl::codecs::allocateLQEncoder();
if (!src->lq_encoder_c2) src->lq_encoder_c2 = ftl::codecs::allocateLQEncoder();
// Do we have the resources to do a LQ encoding?
if (src->lq_encoder_c1 && src->lq_encoder_c2) {
const auto *enc1 = src->lq_encoder_c1;
const auto *enc2 = src->lq_encoder_c2;
// Do entire frame as single step
if (!enc1->useBlocks() || !enc2->useBlocks()) {
ftl::pool.push([this,&fs,j,src](int id) {
_encodeLQAndTransmit(src, fs.channel1[j], fs.channel2[j], -1);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
if (jobs_ == 0) job_cv_.notify_one();
});
jobs_++;
// Or divide frame into blocks and encode each
} else {
// Create jobs for each chunk
for (int i=0; i<chunk_count_; ++i) {
// Add chunk job to thread pool
ftl::pool.push([this,&fs,j,i,src](int id) {
int chunk = i;
try {
_encodeLQAndTransmit(src, fs.channel1[j], fs.channel2[j], chunk);
} catch(...) {
LOG(ERROR) << "Encode Exception: " << chunk;
}
//src->jobs--;
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
if (jobs_ == 0) job_cv_.notify_one();
});
}
jobs_ += chunk_count_;
}
}
}*/
} }
/*std::unique_lock<std::mutex> lk(job_mtx_); /*std::unique_lock<std::mutex> lk(job_mtx_);
...@@ -619,219 +579,3 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacke ...@@ -619,219 +579,3 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacke
++c; ++c;
} }
} }
/*void Streamer::_encodeHQAndTransmit(StreamSource *src, const cv::Mat &c1, const cv::Mat &c2, int block) {
bool hasChan2 = (!c2.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
LOG(INFO) << "Encode HQ: " << block;
vector<unsigned char> c1buff;
vector<unsigned char> c2buff;
if (block == -1) {
src->hq_encoder_c1->encode(c1, c1buff, src->hq_bitrate, false);
if (hasChan2) src->hq_encoder_c2->encode(c2, c2buff, src->hq_bitrate, false);
} else {
//bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = c1.cols / chunk_dim_;
int chunk_height = c1.rows / chunk_dim_;
// Build chunk heads
int cx = (block % chunk_dim_) * chunk_width;
int cy = (block / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
//vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = c1(roi);
src->hq_encoder_c1->encode(chunkRGB, c1buff, src->hq_bitrate, false);
if (hasChan2) {
cv::Mat chunkDepth = c2(roi);
src->hq_encoder_c2->encode(chunkDepth, c2buff, src->hq_bitrate, false);
}
}
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const int b = (*c).bitrate;
if (b >= kQualityThreshold) continue; // Not a HQ request
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(src->hq_bitrate), block, c1buff, c2buff)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}
void Streamer::_encodeLQAndTransmit(StreamSource *src, const cv::Mat &c1, const cv::Mat &c2, int block) {
bool hasChan2 = (!c2.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
LOG(INFO) << "Encode LQ: " << block;
vector<unsigned char> c1buff;
vector<unsigned char> c2buff;
if (block == -1) {
src->lq_encoder_c1->encode(c1, c1buff, src->lq_bitrate, false);
if (hasChan2) src->lq_encoder_c2->encode(c2, c2buff, src->lq_bitrate, false);
} else {
//bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = c1.cols / chunk_dim_;
int chunk_height = c1.rows / chunk_dim_;
// Build chunk heads
int cx = (block % chunk_dim_) * chunk_width;
int cy = (block / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
//vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = c1(roi);
//cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_));
src->lq_encoder_c1->encode(chunkRGB, c1buff, src->lq_bitrate, false);
if (hasChan2) {
cv::Mat chunkDepth = c2(roi);
//cv::resize(chunkDepth, tmp, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST);
src->lq_encoder_c2->encode(chunkDepth, c2buff, src->lq_bitrate, false);
}
}
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const int b = (*c).bitrate;
if (b < kQualityThreshold) continue; // Not an LQ request
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(src->hq_bitrate), block, c1buff, c2buff)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}*/
/*void Streamer::_encodeImagesAndTransmit(StreamSource *src, const cv::Mat &rgb, const cv::Mat &depth, int chunk) {
bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
//bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = rgb.cols / chunk_dim_;
int chunk_height = rgb.rows / chunk_dim_;
// Build chunk heads
int cx = (chunk % chunk_dim_) * chunk_width;
int cy = (chunk / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
//vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = rgb(roi);
cv::Mat chunkDepth;
//cv::Mat chunkDepthPrev = src->prev_depth(roi);
cv::Mat d2, d3;
//vector<unsigned char> d_buf;
if (hasChan2) {
chunkDepth = depth(roi);
if (chunkDepth.type() == CV_32F) chunkDepth.convertTo(d2, CV_16UC1, 1000); // 16*10);
else d2 = chunkDepth;
//if (delta) d3 = (d2 * 2) - chunkDepthPrev;
//else d3 = d2;
//d2.copyTo(chunkDepthPrev);
}
// TODO: Verify these don't allocate memory if not needed.
// TODO: Reuse these buffers to reduce allocations.
vector<unsigned char> brgb[ftl::rgbd::detail::kMaxBitrateLevels];
vector<unsigned char> bdepth[ftl::rgbd::detail::kMaxBitrateLevels];
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const int b = (*c).bitrate;
if (brgb[b].empty()) {
// Max bitrate means no changes
if (b == 0) {
_encodeImageChannel1(chunkRGB, brgb[b], b);
if (hasChan2) _encodeImageChannel2(d2, bdepth[b], src->src->getChannel(), b);
// Otherwise must downscale and change compression params
} else {
cv::Mat downrgb, downdepth;
cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_));
if (hasChan2) cv::resize(d2, downdepth, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST);
_encodeImageChannel1(downrgb, brgb[b], b);
if (hasChan2) _encodeImageChannel2(downdepth, bdepth[b], src->src->getChannel(), b);
}
}
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(b), chunk, brgb[b], bdepth[b])) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}
void Streamer::_encodeImageChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) {
vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)};
cv::imencode(".jpg", in, out, jpgparams);
}
bool Streamer::_encodeImageChannel2(const cv::Mat &in, vector<unsigned char> &out, ftl::codecs::Channel_t c, unsigned int b) {
if (c == ftl::rgbd::kChanNone) return false; // NOTE: Should not happen
if (isFloatChannel(c) && in.type() == CV_16U && in.channels() == 1) {
vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, ABRController::getDepthQuality(b)};
if (!cv::imencode(".png", in, out, params)) {
LOG(ERROR) << "PNG Encoding error";
return false;
}
return true;
} else if (!isFloatChannel(c) && in.type() == CV_8UC3) {
vector<int> params = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)};
cv::imencode(".jpg", in, out, params);
return true;
} else {
LOG(ERROR) << "Bad channel configuration: channel=" << c << " imagetype=" << in.type();
}
return false;
}
Source *Streamer::get(const std::string &uri) {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) return sources_[uri]->src;
else return nullptr;
}*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment