diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 96626022c1e2bee1372870f2c2004a5c9af51a52..a4b02f34554898ced490c22f5f6eb5f365080c41 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -294,11 +294,15 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID for (auto &client : s->clients) { // If already listening, just update chunk counters if (client.peerid == peer) { + // Allow for same client but different quality (beyond threshold) + if ((client.preset < kQualityThreshold && rate >= kQualityThreshold) || + (client.preset >= kQualityThreshold && rate < kQualityThreshold)) continue; + client.txmax = N; client.txcount = 0; // Possible switch from high quality to low quality encoding or vice versa - if (client.preset < kQualityThreshold && rate >= kQualityThreshold) { + /*if (client.preset < kQualityThreshold && rate >= kQualityThreshold) { s->hq_count--; s->lq_count++; if (s->lq_encoder_c1) s->lq_encoder_c1->reset(); @@ -308,7 +312,8 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID s->lq_count--; if (s->hq_encoder_c1) s->hq_encoder_c1->reset(); if (s->hq_encoder_c2) s->hq_encoder_c2->reset(); - } + break; + }*/ client.preset = rate; return; @@ -512,51 +517,6 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { }); } } - - // Do we need to do low quality encoding? - /*if (src->lq_count > 0) { - if (!src->lq_encoder_c1) src->lq_encoder_c1 = ftl::codecs::allocateLQEncoder(); - if (!src->lq_encoder_c2) src->lq_encoder_c2 = ftl::codecs::allocateLQEncoder(); - - // Do we have the resources to do a LQ encoding? - if (src->lq_encoder_c1 && src->lq_encoder_c2) { - const auto *enc1 = src->lq_encoder_c1; - const auto *enc2 = src->lq_encoder_c2; - - // Do entire frame as single step - if (!enc1->useBlocks() || !enc2->useBlocks()) { - ftl::pool.push([this,&fs,j,src](int id) { - _encodeLQAndTransmit(src, fs.channel1[j], fs.channel2[j], -1); - std::unique_lock<std::mutex> lk(job_mtx_); - --jobs_; - if (jobs_ == 0) job_cv_.notify_one(); - }); - - jobs_++; - // Or divide frame into blocks and encode each - } else { - // Create jobs for each chunk - for (int i=0; i<chunk_count_; ++i) { - // Add chunk job to thread pool - ftl::pool.push([this,&fs,j,i,src](int id) { - int chunk = i; - try { - _encodeLQAndTransmit(src, fs.channel1[j], fs.channel2[j], chunk); - } catch(...) { - LOG(ERROR) << "Encode Exception: " << chunk; - } - - //src->jobs--; - std::unique_lock<std::mutex> lk(job_mtx_); - --jobs_; - if (jobs_ == 0) job_cv_.notify_one(); - }); - } - - jobs_ += chunk_count_; - } - } - }*/ } /*std::unique_lock<std::mutex> lk(job_mtx_); @@ -619,219 +579,3 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacke ++c; } } - -/*void Streamer::_encodeHQAndTransmit(StreamSource *src, const cv::Mat &c1, const cv::Mat &c2, int block) { - bool hasChan2 = (!c2.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); - - LOG(INFO) << "Encode HQ: " << block; - - vector<unsigned char> c1buff; - vector<unsigned char> c2buff; - - if (block == -1) { - src->hq_encoder_c1->encode(c1, c1buff, src->hq_bitrate, false); - if (hasChan2) src->hq_encoder_c2->encode(c2, c2buff, src->hq_bitrate, false); - } else { - //bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not - int chunk_width = c1.cols / chunk_dim_; - int chunk_height = c1.rows / chunk_dim_; - - // Build chunk heads - int cx = (block % chunk_dim_) * chunk_width; - int cy = (block / chunk_dim_) * chunk_height; - cv::Rect roi(cx,cy,chunk_width,chunk_height); - //vector<unsigned char> rgb_buf; - cv::Mat chunkRGB = c1(roi); - src->hq_encoder_c1->encode(chunkRGB, c1buff, src->hq_bitrate, false); - - if (hasChan2) { - cv::Mat chunkDepth = c2(roi); - src->hq_encoder_c2->encode(chunkDepth, c2buff, src->hq_bitrate, false); - } - } - - // Lock to prevent clients being added / removed - SHARED_LOCK(src->mutex,lk); - auto c = src->clients.begin(); - while (c != src->clients.end()) { - const int b = (*c).bitrate; - if (b >= kQualityThreshold) continue; // Not a HQ request - - try { - // TODO:(Nick) Send pose - short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_); - if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(src->hq_bitrate), block, c1buff, c2buff)) { - // Send failed so mark as client stream completed - (*c).txcount = (*c).txmax; - } else { - ++(*c).txcount; - //LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk; - } - } catch(...) { - (*c).txcount = (*c).txmax; - } - ++c; - } -} - -void Streamer::_encodeLQAndTransmit(StreamSource *src, const cv::Mat &c1, const cv::Mat &c2, int block) { - bool hasChan2 = (!c2.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); - - LOG(INFO) << "Encode LQ: " << block; - - vector<unsigned char> c1buff; - vector<unsigned char> c2buff; - - if (block == -1) { - src->lq_encoder_c1->encode(c1, c1buff, src->lq_bitrate, false); - if (hasChan2) src->lq_encoder_c2->encode(c2, c2buff, src->lq_bitrate, false); - } else { - //bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not - int chunk_width = c1.cols / chunk_dim_; - int chunk_height = c1.rows / chunk_dim_; - - // Build chunk heads - int cx = (block % chunk_dim_) * chunk_width; - int cy = (block / chunk_dim_) * chunk_height; - cv::Rect roi(cx,cy,chunk_width,chunk_height); - //vector<unsigned char> rgb_buf; - cv::Mat chunkRGB = c1(roi); - //cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_)); - - src->lq_encoder_c1->encode(chunkRGB, c1buff, src->lq_bitrate, false); - - if (hasChan2) { - cv::Mat chunkDepth = c2(roi); - //cv::resize(chunkDepth, tmp, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST); - src->lq_encoder_c2->encode(chunkDepth, c2buff, src->lq_bitrate, false); - } - } - - // Lock to prevent clients being added / removed - SHARED_LOCK(src->mutex,lk); - auto c = src->clients.begin(); - while (c != src->clients.end()) { - const int b = (*c).bitrate; - if (b < kQualityThreshold) continue; // Not an LQ request - - try { - // TODO:(Nick) Send pose - short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_); - if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(src->hq_bitrate), block, c1buff, c2buff)) { - // Send failed so mark as client stream completed - (*c).txcount = (*c).txmax; - } else { - ++(*c).txcount; - //LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk; - } - } catch(...) { - (*c).txcount = (*c).txmax; - } - ++c; - } -}*/ - -/*void Streamer::_encodeImagesAndTransmit(StreamSource *src, const cv::Mat &rgb, const cv::Mat &depth, int chunk) { - bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); - - //bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not - int chunk_width = rgb.cols / chunk_dim_; - int chunk_height = rgb.rows / chunk_dim_; - - // Build chunk heads - int cx = (chunk % chunk_dim_) * chunk_width; - int cy = (chunk / chunk_dim_) * chunk_height; - cv::Rect roi(cx,cy,chunk_width,chunk_height); - //vector<unsigned char> rgb_buf; - cv::Mat chunkRGB = rgb(roi); - cv::Mat chunkDepth; - //cv::Mat chunkDepthPrev = src->prev_depth(roi); - - cv::Mat d2, d3; - //vector<unsigned char> d_buf; - - if (hasChan2) { - chunkDepth = depth(roi); - if (chunkDepth.type() == CV_32F) chunkDepth.convertTo(d2, CV_16UC1, 1000); // 16*10); - else d2 = chunkDepth; - //if (delta) d3 = (d2 * 2) - chunkDepthPrev; - //else d3 = d2; - //d2.copyTo(chunkDepthPrev); - } - - // TODO: Verify these don't allocate memory if not needed. - // TODO: Reuse these buffers to reduce allocations. - vector<unsigned char> brgb[ftl::rgbd::detail::kMaxBitrateLevels]; - vector<unsigned char> bdepth[ftl::rgbd::detail::kMaxBitrateLevels]; - - // Lock to prevent clients being added / removed - SHARED_LOCK(src->mutex,lk); - auto c = src->clients.begin(); - while (c != src->clients.end()) { - const int b = (*c).bitrate; - - if (brgb[b].empty()) { - // Max bitrate means no changes - if (b == 0) { - _encodeImageChannel1(chunkRGB, brgb[b], b); - if (hasChan2) _encodeImageChannel2(d2, bdepth[b], src->src->getChannel(), b); - - // Otherwise must downscale and change compression params - } else { - cv::Mat downrgb, downdepth; - cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_)); - if (hasChan2) cv::resize(d2, downdepth, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST); - - _encodeImageChannel1(downrgb, brgb[b], b); - if (hasChan2) _encodeImageChannel2(downdepth, bdepth[b], src->src->getChannel(), b); - } - } - - try { - // TODO:(Nick) Send pose - short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_); - if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, uint8_t(b), chunk, brgb[b], bdepth[b])) { - // Send failed so mark as client stream completed - (*c).txcount = (*c).txmax; - } else { - ++(*c).txcount; - //LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk; - } - } catch(...) { - (*c).txcount = (*c).txmax; - } - ++c; - } -} - -void Streamer::_encodeImageChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) { - vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)}; - cv::imencode(".jpg", in, out, jpgparams); -} - -bool Streamer::_encodeImageChannel2(const cv::Mat &in, vector<unsigned char> &out, ftl::codecs::Channel_t c, unsigned int b) { - if (c == ftl::rgbd::kChanNone) return false; // NOTE: Should not happen - - if (isFloatChannel(c) && in.type() == CV_16U && in.channels() == 1) { - vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, ABRController::getDepthQuality(b)}; - if (!cv::imencode(".png", in, out, params)) { - LOG(ERROR) << "PNG Encoding error"; - return false; - } - return true; - } else if (!isFloatChannel(c) && in.type() == CV_8UC3) { - vector<int> params = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)}; - cv::imencode(".jpg", in, out, params); - return true; - } else { - LOG(ERROR) << "Bad channel configuration: channel=" << c << " imagetype=" << in.type(); - } - - return false; -} - -Source *Streamer::get(const std::string &uri) { - SHARED_LOCK(mutex_,slk); - if (sources_.find(uri) != sources_.end()) return sources_[uri]->src; - else return nullptr; -}*/