diff --git a/components/common/cpp/include/ftl/timer.hpp b/components/common/cpp/include/ftl/timer.hpp index bbfe33b7e42ea63bb2fbe4c107142222c4608f1d..dad98a704dd127c23b961bd7429da14253399db3 100644 --- a/components/common/cpp/include/ftl/timer.hpp +++ b/components/common/cpp/include/ftl/timer.hpp @@ -64,6 +64,8 @@ void setInterval(int ms); int getInterval(); +int64_t get_time(); + /** * Add the specified number of milliseconds to the clock when generating * timestamps. This is used to synchronise clocks on multiple machines as it diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index f1dfbb4c7f574fc7ce20a2e0196e2234ac24ae3d..2c3e1fd636edd16772b3bf44ecb0e15fefa3b16b 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -330,11 +330,11 @@ R Peer::call(const std::string &name, ARGS... args) { }, std::forward<ARGS>(args)...); // While waiting, do some other thread jobs... - std::function<void(int)> j; + /*std::function<void(int)> j; while (!hasreturned && (bool)(j=ftl::pool.pop())) { LOG(INFO) << "Doing job whilst waiting..."; j(-1); - } + }*/ { // Block thread until async callback notifies us std::unique_lock<std::mutex> lk(m); diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 6394e26e36aed185c3b57af30e55eb97c5ef5876..b4419b1f7fc713259b0d9f1a14f00ff12332dd6b 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -209,7 +209,7 @@ class Universe : public ftl::Configurable { private: bool active_; ftl::UUID this_peer; - SHARED_MUTEX net_mutex_; + mutable SHARED_MUTEX net_mutex_; RECURSIVE_MUTEX handler_mutex_; fd_set sfderror_; fd_set sfdread_; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index a70e42141dfe096b1cb7f71cac27bbda2ed8671d..4d1d00499e6d42038a8a7190bba529a53f445cb3 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -220,6 +220,7 @@ void Universe::_cleanupPeers() { } Peer *Universe::getPeer(const UUID &id) const { + SHARED_LOCK(net_mutex_,lk); auto ix = peer_ids_.find(id); if (ix == peer_ids_.end()) return nullptr; else return ix->second; diff --git a/components/rgbd-sources/CMakeLists.txt b/components/rgbd-sources/CMakeLists.txt index e0a925616c522522342d0032b5cae7bc684698b9..9aa252db3765d673f160aec5781185df29415218 100644 --- a/components/rgbd-sources/CMakeLists.txt +++ b/components/rgbd-sources/CMakeLists.txt @@ -14,6 +14,7 @@ set(RGBDSRC # src/algorithms/opencv_sgbm.cpp # src/algorithms/opencv_bm.cpp src/cb_segmentation.cpp + src/abr.cpp src/offilter.cpp ) diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b3d809784abdf9a3f1bdb362c2dcc3a88b1a9e3e --- /dev/null +++ b/components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp @@ -0,0 +1,121 @@ +#ifndef _FTL_RGBD_ABR_HPP_ +#define _FTL_RGBD_ABR_HPP_ + +#include <ftl/rgbd/detail/netframe.hpp> +#include <cstdint> + +namespace ftl { +namespace rgbd { +namespace detail { + +static const float kAspectRatio = 1.777778f; + +enum codec_t { + kCodecJPG = 0, + kCodecPNG +}; + +struct BitrateSetting { + int colour_res; + int depth_res; + int colour_qual; + int depth_qual; + codec_t colour_codec; + codec_t depth_codec; + int block_count_x; + + /*int width; + int height; + int jpg_quality; + int png_compression; + codec_t colour_codec; + codec_t depth_codec; + int chunking;*/ +}; + +static const BitrateSetting bitrate_settings[] = { + 1080, 1080, 95, 1, kCodecJPG, kCodecPNG, 4, + 1080, 720, 95, 1, kCodecJPG, kCodecPNG, 4, + 720, 720, 95, 1, kCodecJPG, kCodecPNG, 4, + 720, 576, 95, 5, kCodecJPG, kCodecPNG, 4, + 576, 576, 95, 5, kCodecJPG, kCodecPNG, 4, + 576, 480, 95, 5, kCodecJPG, kCodecPNG, 2, + 480, 480, 95, 5, kCodecJPG, kCodecPNG, 2, + 480, 360, 95, 9, kCodecJPG, kCodecPNG, 2, + 360, 360, 95, 9, kCodecJPG, kCodecPNG, 2, + 360, 360, 50, 9, kCodecJPG, kCodecPNG, 2 +}; + +/*static const BitrateSetting bitrate_settings[] = { + 1920, 1080, 95, 1, kCodecJPG, kCodecPNG, 4, // ? + 1280, 720, 95, 1, kCodecJPG, kCodecPNG, 4, // ~96Mbps + 1024, 576, 95, 5, kCodecJPG, kCodecPNG, 3, // ~62Mbps + 854, 480, 95, 5, kCodecJPG, kCodecPNG, 3, // ~48Mbps + 640, 360, 95, 9, kCodecJPG, kCodecPNG, 2, // ~31Mbps + 640, 360, 75, 9, kCodecJPG, kCodecPNG, 2, // ~25Mbps + 640, 360, 65, 9, kCodecJPG, kCodecPNG, 2, // ~24Mbps + 640, 360, 50, 9, kCodecJPG, kCodecPNG, 2, // ~23Mbps + 320, 160, 95, 9, kCodecJPG, kCodecPNG, 2, // ~10Mbps + 320, 160, 75, 9, kCodecJPG, kCodecPNG, 2 // ~8Mbps +};*/ + +typedef unsigned int bitrate_t; + +static const bitrate_t kBitrateBest = 0; +static const bitrate_t kBitrateWorst = 9; + +/** + * Adaptive Bitrate Controller to monitor and decide on a client streams + * bitrate. The basics of our approach are that if transmission latency exceeds + * some proportion of the frame time then mark it as a slow frame. Similarly if + * transmission latency falls below a proportion of frame time then mark it as + * a fast frame. If the net frame status is slow (thresholded) then reduce + * bitrate, if the net status is fast then increase bitrate. + */ +class ABRController { + public: + ABRController(); + ~ABRController(); + + /** + * From a received frame, select a bitrate based upon actual and required + * bitrate as well as past frames. + */ + bitrate_t selectBitrate(const ftl::rgbd::detail::NetFrame &); + + /** + * Called to tell the controller the new bitrate is now in use by the stream + */ + void notifyChanged(); + + void setMaximumBitrate(bitrate_t); + void setMinimumBitrate(bitrate_t); + + static const ftl::rgbd::detail::BitrateSetting &getBitrateInfo(bitrate_t b); + static int getColourWidth(bitrate_t b); + static int getDepthWidth(bitrate_t b); + static int getColourHeight(bitrate_t b); + static int getDepthHeight(bitrate_t b); + static int getBlockCountX(bitrate_t b); + static int getBlockCountY(bitrate_t b); + static int getBlockCount(bitrate_t b); + static int getColourQuality(bitrate_t b); + static int getDepthQuality(bitrate_t b); + + private: + unsigned int down_log_; // Bit log of delayed frames + unsigned int up_log_; // Bit log of fast frames + int64_t last_br_change_; // Time of last adaptive change + float down_threshold_; // Proportion of min bitrate before reduction + float up_threshold_; // Proportion of min bitrate before increase + bitrate_t bitrate_; + bool enabled_; + bitrate_t max_; + bitrate_t min_; +}; + +} +} +} + +#endif // _FTL_RGBD_ABR_HPP_ diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp new file mode 100644 index 0000000000000000000000000000000000000000..60620f0938d9e72fa13c24105ac1efe801c470a0 --- /dev/null +++ b/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp @@ -0,0 +1,49 @@ +#ifndef _FTL_RGBD_NETFRAME_HPP_ +#define _FTL_RGBD_NETFRAME_HPP_ + +#include <cstdint> +#include <vector> +#include <ftl/rgbd/source.hpp> + +namespace ftl { +namespace rgbd { +namespace detail { + +/** + * Buffers for a single frame as it is being received over the network. + * Also maintains statistics about the frame transmission for later analysis. + */ +struct NetFrame { + cv::Mat channel1; + cv::Mat channel2; + volatile int64_t timestamp; + std::atomic<int> chunk_count; + std::atomic<int> tx_size; + int64_t tx_latency; + MUTEX mtx; +}; + +/** + * Manage multiple frames with their timestamp as an identifier. Once a frame + * is completed it should be freed immediately from the queue for reuse. It + * is not the job of this queue to buffer frames for longer periods, see Group + * for this functionality. This queue is only to manage chunk ordering problems. + */ +class NetFrameQueue { + public: + explicit NetFrameQueue(int size=2); + ~NetFrameQueue(); + + NetFrame &getFrame(int64_t ts, const cv::Size &, int c1type, int c2type); + void freeFrame(NetFrame &); + + private: + std::vector<NetFrame> frames_; + MUTEX mtx_; +}; + +} +} +} + +#endif // _FTL_RGBD_NETFRAME_HPP_ diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index b387d1fe4df41d6859356bfa9f349ecea04a9ee0..a571e63a0ffb2477063597a76612004c75ae1269 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -26,6 +26,7 @@ struct StreamClient { ftl::UUID peerid; std::atomic<int> txcount; // Frames sent since last request int txmax; // Frames to send in request + int bitrate; }; static const unsigned int kGrabbed = 0x1; @@ -33,6 +34,7 @@ static const unsigned int kRGB = 0x2; static const unsigned int kDepth = 0x4; static const unsigned int kFrameDropLimit = 5; +static const unsigned int kMaxBitrateLevels = 10; struct StreamSource { ftl::rgbd::Source *src; @@ -42,7 +44,7 @@ struct StreamSource { cv::Mat depth; // Tx buffer cv::Mat prev_rgb; cv::Mat prev_depth; - std::list<detail::StreamClient> clients[10]; // One list per bitrate + std::list<detail::StreamClient> clients; SHARED_MUTEX mutex; unsigned long long frame; }; diff --git a/components/rgbd-sources/src/abr.cpp b/components/rgbd-sources/src/abr.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5400fa5d60542ec075534661bb35d0107403b54f --- /dev/null +++ b/components/rgbd-sources/src/abr.cpp @@ -0,0 +1,118 @@ +#include <ftl/rgbd/detail/abr.hpp> +#include <ftl/timer.hpp> + +#include <bitset> + +using ftl::rgbd::detail::BitrateSetting; +using ftl::rgbd::detail::ABRController; +using ftl::rgbd::detail::bitrate_t; +using ftl::rgbd::detail::kBitrateWorst; +using ftl::rgbd::detail::kBitrateBest; +using ftl::rgbd::detail::bitrate_settings; +using ftl::rgbd::detail::NetFrame; + +ABRController::ABRController() { + bitrate_ = 0; + enabled_ = false; + max_ = kBitrateBest; + min_ = kBitrateWorst; +} + +ABRController::~ABRController() { + +} + +void ABRController::setMaximumBitrate(bitrate_t b) { + max_ = (b == -1) ? kBitrateBest : b; + if (bitrate_ < max_) bitrate_ = max_; +} + +void ABRController::setMinimumBitrate(bitrate_t b) { + min_ = (b == -1) ? kBitrateWorst : b; + if (bitrate_ > min_) bitrate_ = min_; +} + +void ABRController::notifyChanged() { + //enabled_ = true; +} + +bitrate_t ABRController::selectBitrate(const NetFrame &frame) { + if (!enabled_) return bitrate_; + + float actual_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(frame.tx_latency))) / 1048576.0f; + float min_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f; + LOG(INFO) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps"; + float ratio = actual_mbps / min_mbps; + //LOG(INFO) << "Rate Ratio = " << frame.tx_latency; + + down_log_ = down_log_ << 1; + up_log_ = up_log_ << 1; + + if (ratio < 1.2f) { + down_log_ += 1; + } else if (ratio > 1.5f) { + up_log_ += 1; + } + + std::bitset<32> bd(down_log_); + std::bitset<32> bu(up_log_); + + if (bitrate_ < min_ && int(bd.count()) - int(bu.count()) > 5) { + enabled_ = false; + down_log_ = 0; + up_log_ = 0; + bitrate_++; + LOG(INFO) << "Bitrate down to: " << bitrate_; + } else if (bitrate_ > max_ && int(bu.count()) - int(bd.count()) > 15) { + enabled_ = false; + up_log_ = 0; + down_log_ = 0; + bitrate_--; + LOG(INFO) << "Bitrate up to: " << bitrate_; + } + + return bitrate_; +} + +const BitrateSetting &ABRController::getBitrateInfo(bitrate_t b) { + if (b > kBitrateWorst) return bitrate_settings[kBitrateWorst]; + if (b < kBitrateBest) return bitrate_settings[kBitrateBest]; + return bitrate_settings[b]; +}; + +int ABRController::getColourWidth(bitrate_t b) { + return std::ceil(bitrate_settings[b].colour_res * kAspectRatio); +} + +int ABRController::getDepthWidth(bitrate_t b) { + return std::ceil(bitrate_settings[b].depth_res * kAspectRatio); +} + +int ABRController::getColourHeight(bitrate_t b) { + return bitrate_settings[b].colour_res; +} + +int ABRController::getDepthHeight(bitrate_t b) { + return bitrate_settings[b].depth_res; +} + +int ABRController::getBlockCountX(bitrate_t b) { + return bitrate_settings[b].block_count_x; +} + +int ABRController::getBlockCountY(bitrate_t b) { + return bitrate_settings[b].block_count_x; +} + +int ABRController::getBlockCount(bitrate_t b) { + const int c = bitrate_settings[b].block_count_x; + return c*c; +} + +int ABRController::getColourQuality(bitrate_t b) { + return bitrate_settings[b].colour_qual; +} + +int ABRController::getDepthQuality(bitrate_t b) { + return bitrate_settings[b].depth_qual; +} diff --git a/components/rgbd-sources/src/bitrate_settings.hpp b/components/rgbd-sources/src/bitrate_settings.hpp index 61e3ec7e48447d0a72f7b67afe81437300fbf944..3dbd23bc10129398d878cae0501bbc73d22bb3a8 100644 --- a/components/rgbd-sources/src/bitrate_settings.hpp +++ b/components/rgbd-sources/src/bitrate_settings.hpp @@ -5,26 +5,6 @@ namespace ftl { namespace rgbd { namespace detail { -struct BitrateSetting { - int width; - int height; - int jpg_quality; - int png_compression; -}; - -static const BitrateSetting bitrate_settings[] = { - 1920, 1080, 95, 1, - 1280, 720, 95, 1, - 1280, 720, 95, 1, - 1280, 720, 75, 1, - 640, 360, 95, 1, - 640, 360, 75, 5, - 640, 360, 50, 5, - 320, 160, 95, 5, - 320, 160, 75, 5, - 320, 160, 50, 9 -}; - } } } diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 3cab6c86d5e6cf45dc1a22a0643e2e58c4f3cd73..2e873d5cf4a803583c7e8088a2e89989086aeb77 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -3,6 +3,7 @@ #include <thread> #include <chrono> #include <tuple> +#include <bitset> #include "colour.hpp" @@ -43,6 +44,7 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int if (f.timestamp == -1) { f.timestamp = ts; f.chunk_count = 0; + f.tx_size = 0; f.channel1.create(s, c1type); f.channel2.create(s, c2type); return f; @@ -108,7 +110,7 @@ bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &s } NetSource::NetSource(ftl::rgbd::Source *host) - : ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), queue_(3) { + : ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), adaptive_(0), queue_(3) { gamma_ = host->value("gamma", 1.0f); temperature_ = host->value("temperature", 6500); @@ -150,6 +152,9 @@ NetSource::NetSource(ftl::rgbd::Source *host) chunks_dim_ = host->value("chunking",4); chunk_count_ = chunks_dim_*chunks_dim_; + abr_.setMaximumBitrate(host->value("max_bitrate", -1)); + abr_.setMinimumBitrate(host->value("min_bitrate", -1)); + _updateURI(); h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) { @@ -167,15 +172,48 @@ NetSource::~NetSource() { host_->getNet()->removeCallback(h_); } -void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { +/*void NetSource::_checkAdaptive(int64_t ts) { + const int64_t current = ftl::timer::get_time(); + int64_t net_latency = current - ts; + + // Only change bit rate gradually + if (current - last_br_change_ > ftl::rgbd::detail::kAdaptationRate) { + // Was this frame late? + if (adaptive_ < ftl::rgbd::detail::kMaxBitrateLevels && net_latency > ftl::rgbd::detail::kLatencyThreshold) { + slow_log_ = (slow_log_ << 1) + 1; + std::bitset<32> bs(slow_log_); + + // Enough late frames to reduce bit rate + if (bs.count() > ftl::rgbd::detail::kSlowFramesThreshold) { + adaptive_++; + slow_log_ = 0; + last_br_change_ = current; + LOG(WARNING) << "Adjust bitrate to " << adaptive_; + } + // No late frames in recent history... + } else if (adaptive_ > 0 && slow_log_ == 0) { + // TODO: (Nick) Don't change bitrate up so quickly as down? + // Try a higher bitrate again? + adaptive_--; + } + } +}*/ + +void NetSource::_recvChunk(int64_t ts, short ttimeoff, int chunk, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { // TODO: Don't allocate these each chunk cv::Mat tmp_rgb, tmp_depth; - //if (!active_ || ts == 0) return; + // Capture time here for better net latency estimate + int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count(); + + if (!active_) return; const ftl::rgbd::channel_t chan = host_->getChannel(); NetFrame &frame = queue_.getFrame(ts, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3)); + // Update frame statistics + frame.tx_size += jpg.size() + d.size(); + // Build chunk head int cx = (chunk % chunks_dim_) * chunk_width_; int cy = (chunk / chunks_dim_) * chunk_height_; @@ -183,13 +221,6 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig cv::Mat chunkRGB = frame.channel1(roi); cv::Mat chunkDepth = frame.channel2(roi); - auto start = std::chrono::high_resolution_clock::now(); - int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count(); - //LOG(INFO) << ts << " - Chunk Latency (" << chunk_count_ << ") = " << (now - ts) << " - " << ftl::pool.q_size(); - //if (now - ts > 160) { - // LOG(INFO) << "OLD PACKET: " << host_->getURI() << " (" << chunk << ") - " << ts << " (" << (now - ts) << ")"; - //} - // Decode in temporary buffers to prevent long locks cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); if (d.size() > 0) cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); @@ -213,10 +244,10 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig // Downsized so needs a scale up } else { cv::resize(tmp_rgb, chunkRGB, chunkRGB.size()); - tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); + //tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); if (!tmp_depth.empty() && tmp_depth.type() == CV_16U && chunkDepth.type() == CV_32F) { tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); //(16.0f*10.0f)); - cv::resize(tmp_depth, chunkDepth, chunkDepth.size()); + cv::resize(tmp_depth, chunkDepth, chunkDepth.size(), 0, 0, cv::INTER_NEAREST); } else if (!tmp_depth.empty() && tmp_depth.type() == CV_8UC3 && chunkDepth.type() == CV_8UC3) { cv::resize(tmp_depth, chunkDepth, chunkDepth.size()); } else { @@ -233,11 +264,23 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig if (frame.chunk_count > chunk_count_) LOG(FATAL) << "TOO MANY CHUNKS"; + // Capture tx time of first received chunk + if (frame.chunk_count == 1) { + UNIQUE_LOCK(frame.mtx, flk); + if (frame.chunk_count == 1) { + frame.tx_latency = int64_t(ttimeoff); + } + } + + // Last chunk now received if (frame.chunk_count == chunk_count_) { UNIQUE_LOCK(frame.mtx, flk); - timestamp_ = frame.timestamp; if (frame.timestamp >= 0 && frame.chunk_count == chunk_count_) { + timestamp_ = frame.timestamp; + frame.tx_latency = now-(ts+frame.tx_latency); + + adaptive_ = abr_.selectBitrate(frame); //LOG(INFO) << "Frame finished: " << frame.timestamp; auto cb = host_->callback(); if (cb) { @@ -304,8 +347,8 @@ void NetSource::_updateURI() { has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_, ftl::rgbd::kChanLeft); - host_->getNet()->bind(*uri, [this](int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { - _recvChunk(frame, chunk, delta, jpg, d); + host_->getNet()->bind(*uri, [this](int64_t frame, short ttimeoff, int chunk, const vector<unsigned char> &jpg, const vector<unsigned char> &d) { + _recvChunk(frame, ttimeoff, chunk, jpg, d); }); N_ = 0; @@ -333,7 +376,7 @@ bool NetSource::compute(int n, int b) { maxN_ = std::max(maxN_,(n == -1) ? ftl::rgbd::detail::kDefaultFrameCount : n); // Choose best requested quality - minB_ = std::min(minB_,(b == -1) ? 0 : b); + minB_ = std::min(minB_,(b == -1) ? int(adaptive_) : b); // Send k frames before end to prevent unwanted pause // Unless only a single frame is requested @@ -360,6 +403,8 @@ bool NetSource::compute(int n, int b) { active_ = false; } + abr_.notifyChanged(); + maxN_ = 1; // Reset to single frame minB_ = 9; // Reset to worst quality } diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index 23206896c844cf4a29aa844771d66347010305d6..b71b52150a847dbbed2f098e3d6c4a9f47e1a164 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -4,7 +4,9 @@ #include <ftl/net/universe.hpp> #include <ftl/rgbd/source.hpp> +#include <ftl/rgbd/detail/abr.hpp> #include <ftl/threads.hpp> +#include <ftl/rgbd/detail/netframe.hpp> #include <string> namespace ftl { @@ -12,40 +14,12 @@ namespace rgbd { namespace detail { static const int kDefaultFrameCount = 30; +static const int kLatencyThreshold = 5; // Milliseconds delay considered as late +static const int kSlowFramesThreshold = 5; // Number of late frames before change +static const int kAdaptationRate = 5000; // Milliseconds between bitrate changes /** - * Buffers for a single frame as it is being received over the network. - */ -struct NetFrame { - cv::Mat channel1; - cv::Mat channel2; - volatile int64_t timestamp; - std::atomic<int> chunk_count; - MUTEX mtx; -}; - -/** - * Manage multiple frames with their timestamp as an identifier. Once a frame - * is completed it should be freed from the queue for reuse. - */ -class NetFrameQueue { - public: - explicit NetFrameQueue(int size=2); - ~NetFrameQueue(); - - NetFrame &getFrame(int64_t ts, const cv::Size &, int c1type, int c2type); - void freeFrame(NetFrame &); - - private: - std::vector<NetFrame> frames_; - MUTEX mtx_; -}; - -/** - * RGBD source from either a stereo video file with left + right images, or - * direct from two camera devices. A variety of algorithms are included for - * calculating disparity, before converting to depth. Calibration of the images - * is also performed. + * A two channel network streamed source for RGB-Depth. */ class NetSource : public detail::Source { public: @@ -81,19 +55,21 @@ class NetSource : public detail::Source { int default_quality_; int chunk_count_; ftl::rgbd::channel_t prev_chan_; - //volatile int64_t current_frame_; - //std::atomic<int> chunk_count_; - // Double buffering - //cv::Mat d_depth_; - //cv::Mat d_rgb_; + ftl::rgbd::detail::ABRController abr_; + + // Adaptive bitrate functionality + ftl::rgbd::detail::bitrate_t adaptive_; // Current adapted bitrate + //unsigned int slow_log_; // Bit count of delayed frames + //int64_t last_br_change_; // Time of last adaptive change NetFrameQueue queue_; bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p, ftl::rgbd::channel_t chan); void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); - void _recvChunk(int64_t frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); + void _recvChunk(int64_t frame, short ttimeoff, int chunk, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d); void _updateURI(); + //void _checkAdaptive(int64_t); }; } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index bf7304f72f0f7ace305d15da555f1813d1552d03..81731781c843da06d5b398345bde960dbd16d5e0 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -7,13 +7,14 @@ #include <tuple> #include <algorithm> -#include "bitrate_settings.hpp" +#include <ftl/rgbd/detail/abr.hpp> using ftl::rgbd::Streamer; using ftl::rgbd::Source; using ftl::rgbd::detail::StreamSource; using ftl::rgbd::detail::StreamClient; using ftl::rgbd::detail::bitrate_settings; +using ftl::rgbd::detail::ABRController; using ftl::net::Universe; using std::string; using std::list; @@ -207,21 +208,23 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID SHARED_LOCK(mutex_, slk); UNIQUE_LOCK(s->mutex, lk2); - for (auto &client : s->clients[rate]) { + for (auto &client : s->clients) { // If already listening, just update chunk counters if (client.peerid == peer) { client.txmax = N * chunk_count_; client.txcount = 0; + client.bitrate = rate; return; } } // Not an existing client so add one - StreamClient &c = s->clients[rate].emplace_back(); + StreamClient &c = s->clients.emplace_back(); c.peerid = peer; c.uri = dest; c.txcount = 0; c.txmax = N * chunk_count_; + c.bitrate = rate; ++s->clientCount; } @@ -259,17 +262,20 @@ void Streamer::_cleanUp() { StreamSource *src = s.second; UNIQUE_LOCK(src->mutex,lk); - for (unsigned int b=0; b<10; ++b) { - auto i = src->clients[b].begin(); - while (i != src->clients[b].end()) { - // Client request completed so remove from list - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[b].erase(i); - --src->clientCount; - } else { - i++; + auto i = src->clients.begin(); + while (i != src->clients.end()) { + // Client request completed so remove from list + if ((*i).txcount >= (*i).txmax) { + // If peer was clock sync master, the remove that... + if ((*i).peerid == time_peer_) { + timer_job_.cancel(); + time_peer_ = ftl::UUID(0); } + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients.erase(i); + --src->clientCount; + } else { + i++; } } } @@ -327,14 +333,18 @@ void Streamer::_transmit(ftl::rgbd::FrameSet &fs) { } // Go to sleep if no clients instead of spinning the cpu - if (totalclients == 0 || sources_.size() == 0) sleep_for(milliseconds(200)); - else _cleanUp(); + if (totalclients == 0 || sources_.size() == 0) { + // Make sure to unlock so clients can connect! + lk.unlock(); + slk.unlock(); + sleep_for(milliseconds(50)); + } else _cleanUp(); } void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const cv::Mat &depth, int chunk) { bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone); - bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not + //bool delta = (chunk+src->frame) % 8 > 0; // Do XOR or not int chunk_width = rgb.cols / chunk_dim_; int chunk_height = rgb.rows / chunk_dim_; @@ -342,13 +352,13 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c int cx = (chunk % chunk_dim_) * chunk_width; int cy = (chunk / chunk_dim_) * chunk_height; cv::Rect roi(cx,cy,chunk_width,chunk_height); - vector<unsigned char> rgb_buf; + //vector<unsigned char> rgb_buf; cv::Mat chunkRGB = rgb(roi); cv::Mat chunkDepth; //cv::Mat chunkDepthPrev = src->prev_depth(roi); cv::Mat d2, d3; - vector<unsigned char> d_buf; + //vector<unsigned char> d_buf; if (hasChan2) { chunkDepth = depth(roi); @@ -359,8 +369,52 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c //d2.copyTo(chunkDepthPrev); } + // TODO: Verify these don't allocate memory if not needed. + // TODO: Reuse these buffers to reduce allocations. + vector<unsigned char> brgb[ftl::rgbd::detail::kMaxBitrateLevels]; + vector<unsigned char> bdepth[ftl::rgbd::detail::kMaxBitrateLevels]; + + // Lock to prevent clients being added / removed + SHARED_LOCK(src->mutex,lk); + auto c = src->clients.begin(); + while (c != src->clients.end()) { + const int b = (*c).bitrate; + + if (brgb[b].empty()) { + // Max bitrate means no changes + if (b == 0) { + _encodeChannel1(chunkRGB, brgb[b], b); + if (hasChan2) _encodeChannel2(d2, bdepth[b], src->src->getChannel(), b); + + // Otherwise must downscale and change compression params + } else { + cv::Mat downrgb, downdepth; + cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_)); + if (hasChan2) cv::resize(d2, downdepth, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST); + + _encodeChannel1(downrgb, brgb[b], b); + if (hasChan2) _encodeChannel2(downdepth, bdepth[b], src->src->getChannel(), b); + } + } + + try { + // TODO:(Nick) Send pose + short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_); + if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, chunk, brgb[b], bdepth[b])) { + // Send failed so mark as client stream completed + (*c).txcount = (*c).txmax; + } else { + ++(*c).txcount; + //LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk; + } + } catch(...) { + (*c).txcount = (*c).txmax; + } + ++c; + } + // For each allowed bitrate setting (0 = max quality) - for (unsigned int b=0; b<10; ++b) { + /*for (unsigned int b=0; b<10; ++b) { { //SHARED_LOCK(src->mutex,lk); if (src->clients[b].size() == 0) continue; @@ -390,7 +444,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c while (c != src->clients[b].end()) { try { // TODO:(Nick) Send pose - if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) { + short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_); + if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, chunk, rgb_buf, d_buf)) { // Send failed so mark as client stream completed (*c).txcount = (*c).txmax; } else { @@ -402,11 +457,11 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c } ++c; } - } + }*/ } void Streamer::_encodeChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) { - vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality}; + vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)}; cv::imencode(".jpg", in, out, jpgparams); } @@ -414,11 +469,14 @@ bool Streamer::_encodeChannel2(const cv::Mat &in, vector<unsigned char> &out, ft if (c == ftl::rgbd::kChanNone) return false; // NOTE: Should not happen if (isFloatChannel(c) && in.type() == CV_16U && in.channels() == 1) { - vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, bitrate_settings[b].png_compression}; - cv::imencode(".png", in, out, params); + vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, ABRController::getDepthQuality(b)}; + if (!cv::imencode(".png", in, out, params)) { + LOG(ERROR) << "PNG Encoding error"; + return false; + } return true; } else if (!isFloatChannel(c) && in.type() == CV_8UC3) { - vector<int> params = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality}; + vector<int> params = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)}; cv::imencode(".jpg", in, out, params); return true; } else {