Skip to content
Snippets Groups Projects
Commit e018127b authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Multiple stream qualities per client

parent a5601bbc
No related branches found
No related tags found
No related merge requests found
...@@ -438,7 +438,7 @@ static bool findConfiguration(const string &file, const vector<string> &paths) { ...@@ -438,7 +438,7 @@ static bool findConfiguration(const string &file, const vector<string> &paths) {
} }
if (found) { if (found) {
_indexConfig(config); //_indexConfig(config);
return true; return true;
} else { } else {
return false; return false;
...@@ -593,6 +593,9 @@ Configurable *ftl::config::configure(int argc, char **argv, const std::string &r ...@@ -593,6 +593,9 @@ Configurable *ftl::config::configure(int argc, char **argv, const std::string &r
string root_str = (options.find("root") != options.end()) ? nlohmann::json::parse(options["root"]).get<string>() : root; string root_str = (options.find("root") != options.end()) ? nlohmann::json::parse(options["root"]).get<string>() : root;
if (options.find("id") != options.end()) config["$id"] = nlohmann::json::parse(options["id"]).get<string>();
_indexConfig(config);
Configurable *rootcfg = create<Configurable>(config); Configurable *rootcfg = create<Configurable>(config);
if (root_str.size() > 0) { if (root_str.size() > 0) {
LOG(INFO) << "Setting root to " << root_str; LOG(INFO) << "Setting root to " << root_str;
......
...@@ -294,11 +294,15 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID ...@@ -294,11 +294,15 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
for (auto &client : s->clients) { for (auto &client : s->clients) {
// If already listening, just update chunk counters // If already listening, just update chunk counters
if (client.peerid == peer) { if (client.peerid == peer) {
// Allow for same client but different quality (beyond threshold)
if ((client.preset < kQualityThreshold && rate >= kQualityThreshold) ||
(client.preset >= kQualityThreshold && rate < kQualityThreshold)) continue;
client.txmax = N; client.txmax = N;
client.txcount = 0; client.txcount = 0;
// Possible switch from high quality to low quality encoding or vice versa // Possible switch from high quality to low quality encoding or vice versa
if (client.preset < kQualityThreshold && rate >= kQualityThreshold) { /*if (client.preset < kQualityThreshold && rate >= kQualityThreshold) {
s->hq_count--; s->hq_count--;
s->lq_count++; s->lq_count++;
if (s->lq_encoder_c1) s->lq_encoder_c1->reset(); if (s->lq_encoder_c1) s->lq_encoder_c1->reset();
...@@ -308,7 +312,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID ...@@ -308,7 +312,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
s->lq_count--; s->lq_count--;
if (s->hq_encoder_c1) s->hq_encoder_c1->reset(); if (s->hq_encoder_c1) s->hq_encoder_c1->reset();
if (s->hq_encoder_c2) s->hq_encoder_c2->reset(); if (s->hq_encoder_c2) s->hq_encoder_c2->reset();
} break;
}*/
client.preset = rate; client.preset = rate;
return; return;
...@@ -512,51 +517,6 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { ...@@ -512,51 +517,6 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
}); });
} }
} }
// Do we need to do low quality encoding?
/*if (src->lq_count > 0) {
if (!src->lq_encoder_c1) src->lq_encoder_c1 = ftl::codecs::allocateLQEncoder();
if (!src->lq_encoder_c2) src->lq_encoder_c2 = ftl::codecs::allocateLQEncoder();
// Do we have the resources to do a LQ encoding?
if (src->lq_encoder_c1 && src->lq_encoder_c2) {
const auto *enc1 = src->lq_encoder_c1;
const auto *enc2 = src->lq_encoder_c2;
// Do entire frame as single step
if (!enc1->useBlocks() || !enc2->useBlocks()) {
ftl::pool.push([this,&fs,j,src](int id) {
_encodeLQAndTransmit(src, fs.channel1[j], fs.channel2[j], -1);
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
if (jobs_ == 0) job_cv_.notify_one();
});
jobs_++;
// Or divide frame into blocks and encode each
} else {
// Create jobs for each chunk
for (int i=0; i<chunk_count_; ++i) {
// Add chunk job to thread pool
ftl::pool.push([this,&fs,j,i,src](int id) {
int chunk = i;
try {
_encodeLQAndTransmit(src, fs.channel1[j], fs.channel2[j], chunk);
} catch(...) {
LOG(ERROR) << "Encode Exception: " << chunk;
}
//src->jobs--;
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
if (jobs_ == 0) job_cv_.notify_one();
});
}
jobs_ += chunk_count_;
}
}
}*/
} }
/*std::unique_lock<std::mutex> lk(job_mtx_); /*std::unique_lock<std::mutex> lk(job_mtx_);
...@@ -619,219 +579,3 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacke ...@@ -619,219 +579,3 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacke
++c; ++c;
} }
} }
/*void Streamer::_encodeHQAndTransmit(StreamSource *src, const cv::Mat &c1, const cv::Mat &c2, int block) {
bool hasChan2 = (!c2.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
LOG(INFO) << "Encode HQ: " << block;
vector<unsigned char> c1buff;
vector<unsigned char> c2buff;
if (block == -1) {
src->hq_encoder_c1->encode(c1, c1buff, src->hq_bitrate, false);
if (hasChan2) src->hq_encoder_c2->encode(c2, c2buff, src->hq_bitrate, false);
} else {
//bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = c1.cols / chunk_dim_;
int chunk_height = c1.rows / chunk_dim_;
// Build chunk heads
int cx = (block % chunk_dim_) * chunk_width;
int cy = (block / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
//vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = c1(roi);
src->hq_encoder_c1->encode(chunkRGB, c1buff, src->hq_bitrate, false);
if (hasChan2) {
cv::Mat chunkDepth = c2(roi);
src->hq_encoder_c2->encode(chunkDepth, c2buff, src->hq_bitrate, false);
}
}
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const int b = (*c).bitrate;
if (b >= kQualityThreshold) continue; // Not a HQ request
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(src->hq_bitrate), block, c1buff, c2buff)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}
void Streamer::_encodeLQAndTransmit(StreamSource *src, const cv::Mat &c1, const cv::Mat &c2, int block) {
bool hasChan2 = (!c2.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
LOG(INFO) << "Encode LQ: " << block;
vector<unsigned char> c1buff;
vector<unsigned char> c2buff;
if (block == -1) {
src->lq_encoder_c1->encode(c1, c1buff, src->lq_bitrate, false);
if (hasChan2) src->lq_encoder_c2->encode(c2, c2buff, src->lq_bitrate, false);
} else {
//bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = c1.cols / chunk_dim_;
int chunk_height = c1.rows / chunk_dim_;
// Build chunk heads
int cx = (block % chunk_dim_) * chunk_width;
int cy = (block / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
//vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = c1(roi);
//cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_));
src->lq_encoder_c1->encode(chunkRGB, c1buff, src->lq_bitrate, false);
if (hasChan2) {
cv::Mat chunkDepth = c2(roi);
//cv::resize(chunkDepth, tmp, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST);
src->lq_encoder_c2->encode(chunkDepth, c2buff, src->lq_bitrate, false);
}
}
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const int b = (*c).bitrate;
if (b < kQualityThreshold) continue; // Not an LQ request
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(src->hq_bitrate), block, c1buff, c2buff)) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}*/
/*void Streamer::_encodeImagesAndTransmit(StreamSource *src, const cv::Mat &rgb, const cv::Mat &depth, int chunk) {
bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
//bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not
int chunk_width = rgb.cols / chunk_dim_;
int chunk_height = rgb.rows / chunk_dim_;
// Build chunk heads
int cx = (chunk % chunk_dim_) * chunk_width;
int cy = (chunk / chunk_dim_) * chunk_height;
cv::Rect roi(cx,cy,chunk_width,chunk_height);
//vector<unsigned char> rgb_buf;
cv::Mat chunkRGB = rgb(roi);
cv::Mat chunkDepth;
//cv::Mat chunkDepthPrev = src->prev_depth(roi);
cv::Mat d2, d3;
//vector<unsigned char> d_buf;
if (hasChan2) {
chunkDepth = depth(roi);
if (chunkDepth.type() == CV_32F) chunkDepth.convertTo(d2, CV_16UC1, 1000); // 16*10);
else d2 = chunkDepth;
//if (delta) d3 = (d2 * 2) - chunkDepthPrev;
//else d3 = d2;
//d2.copyTo(chunkDepthPrev);
}
// TODO: Verify these don't allocate memory if not needed.
// TODO: Reuse these buffers to reduce allocations.
vector<unsigned char> brgb[ftl::rgbd::detail::kMaxBitrateLevels];
vector<unsigned char> bdepth[ftl::rgbd::detail::kMaxBitrateLevels];
// Lock to prevent clients being added / removed
SHARED_LOCK(src->mutex,lk);
auto c = src->clients.begin();
while (c != src->clients.end()) {
const int b = (*c).bitrate;
if (brgb[b].empty()) {
// Max bitrate means no changes
if (b == 0) {
_encodeImageChannel1(chunkRGB, brgb[b], b);
if (hasChan2) _encodeImageChannel2(d2, bdepth[b], src->src->getChannel(), b);
// Otherwise must downscale and change compression params
} else {
cv::Mat downrgb, downdepth;
cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_));
if (hasChan2) cv::resize(d2, downdepth, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST);
_encodeImageChannel1(downrgb, brgb[b], b);
if (hasChan2) _encodeImageChannel2(downdepth, bdepth[b], src->src->getChannel(), b);
}
}
try {
// TODO:(Nick) Send pose
short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(b), chunk, brgb[b], bdepth[b])) {
// Send failed so mark as client stream completed
(*c).txcount = (*c).txmax;
} else {
++(*c).txcount;
//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
}
} catch(...) {
(*c).txcount = (*c).txmax;
}
++c;
}
}
void Streamer::_encodeImageChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) {
vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)};
cv::imencode(".jpg", in, out, jpgparams);
}
bool Streamer::_encodeImageChannel2(const cv::Mat &in, vector<unsigned char> &out, ftl::codecs::Channel_t c, unsigned int b) {
if (c == ftl::rgbd::kChanNone) return false; // NOTE: Should not happen
if (isFloatChannel(c) && in.type() == CV_16U && in.channels() == 1) {
vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, ABRController::getDepthQuality(b)};
if (!cv::imencode(".png", in, out, params)) {
LOG(ERROR) << "PNG Encoding error";
return false;
}
return true;
} else if (!isFloatChannel(c) && in.type() == CV_8UC3) {
vector<int> params = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)};
cv::imencode(".jpg", in, out, params);
return true;
} else {
LOG(ERROR) << "Bad channel configuration: channel=" << c << " imagetype=" << in.type();
}
return false;
}
Source *Streamer::get(const std::string &uri) {
SHARED_LOCK(mutex_,slk);
if (sources_.find(uri) != sources_.end()) return sources_[uri]->src;
else return nullptr;
}*/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment