From bd9a7c6dd9ec2b8f2fb9f47effc0f57b148c2ccf Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Wed, 21 Aug 2019 21:10:32 +0300
Subject: [PATCH] Work towards #123 adaptive bitrate

---
 components/common/cpp/include/ftl/timer.hpp   |   2 +
 components/net/cpp/include/ftl/net/peer.hpp   |   4 +-
 .../net/cpp/include/ftl/net/universe.hpp      |   2 +-
 components/net/cpp/src/universe.cpp           |   1 +
 components/rgbd-sources/CMakeLists.txt        |   1 +
 .../include/ftl/rgbd/detail/abr.hpp           | 121 ++++++++++++++++++
 .../include/ftl/rgbd/detail/netframe.hpp      |  49 +++++++
 .../include/ftl/rgbd/streamer.hpp             |   4 +-
 components/rgbd-sources/src/abr.cpp           | 118 +++++++++++++++++
 .../rgbd-sources/src/bitrate_settings.hpp     |  20 ---
 components/rgbd-sources/src/net.cpp           |  77 ++++++++---
 components/rgbd-sources/src/net.hpp           |  52 ++------
 components/rgbd-sources/src/streamer.cpp      | 108 ++++++++++++----
 13 files changed, 456 insertions(+), 103 deletions(-)
 create mode 100644 components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp
 create mode 100644 components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp
 create mode 100644 components/rgbd-sources/src/abr.cpp

diff --git a/components/common/cpp/include/ftl/timer.hpp b/components/common/cpp/include/ftl/timer.hpp
index bbfe33b7e..dad98a704 100644
--- a/components/common/cpp/include/ftl/timer.hpp
+++ b/components/common/cpp/include/ftl/timer.hpp
@@ -64,6 +64,8 @@ void setInterval(int ms);
 
 int getInterval();
 
+int64_t get_time();
+
 /**
  * Add the specified number of milliseconds to the clock when generating
  * timestamps. This is used to synchronise clocks on multiple machines as it
diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp
index f1dfbb4c7..2c3e1fd63 100644
--- a/components/net/cpp/include/ftl/net/peer.hpp
+++ b/components/net/cpp/include/ftl/net/peer.hpp
@@ -330,11 +330,11 @@ R Peer::call(const std::string &name, ARGS... args) {
 	}, std::forward<ARGS>(args)...);
 	
 	// While waiting, do some other thread jobs...
-	std::function<void(int)> j;
+	/*std::function<void(int)> j;
 	while (!hasreturned && (bool)(j=ftl::pool.pop())) {
 			LOG(INFO) << "Doing job whilst waiting...";
 			j(-1);
-	}
+	}*/
 
 	{  // Block thread until async callback notifies us
 		std::unique_lock<std::mutex> lk(m);
diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp
index 6394e26e3..b4419b1f7 100644
--- a/components/net/cpp/include/ftl/net/universe.hpp
+++ b/components/net/cpp/include/ftl/net/universe.hpp
@@ -209,7 +209,7 @@ class Universe : public ftl::Configurable {
 	private:
 	bool active_;
 	ftl::UUID this_peer;
-	SHARED_MUTEX net_mutex_;
+	mutable SHARED_MUTEX net_mutex_;
 	RECURSIVE_MUTEX handler_mutex_;
 	fd_set sfderror_;
 	fd_set sfdread_;
diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp
index a70e42141..4d1d00499 100644
--- a/components/net/cpp/src/universe.cpp
+++ b/components/net/cpp/src/universe.cpp
@@ -220,6 +220,7 @@ void Universe::_cleanupPeers() {
 }
 
 Peer *Universe::getPeer(const UUID &id) const {
+	SHARED_LOCK(net_mutex_,lk);
 	auto ix = peer_ids_.find(id);
 	if (ix == peer_ids_.end()) return nullptr;
 	else return ix->second;
diff --git a/components/rgbd-sources/CMakeLists.txt b/components/rgbd-sources/CMakeLists.txt
index e0a925616..9aa252db3 100644
--- a/components/rgbd-sources/CMakeLists.txt
+++ b/components/rgbd-sources/CMakeLists.txt
@@ -14,6 +14,7 @@ set(RGBDSRC
 #	src/algorithms/opencv_sgbm.cpp
 #	src/algorithms/opencv_bm.cpp
 	src/cb_segmentation.cpp
+	src/abr.cpp
 	src/offilter.cpp
 )
 
diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp
new file mode 100644
index 000000000..b3d809784
--- /dev/null
+++ b/components/rgbd-sources/include/ftl/rgbd/detail/abr.hpp
@@ -0,0 +1,121 @@
+#ifndef _FTL_RGBD_ABR_HPP_
+#define _FTL_RGBD_ABR_HPP_
+
+#include <ftl/rgbd/detail/netframe.hpp>
+#include <cstdint>
+
+namespace ftl {
+namespace rgbd {
+namespace detail {
+
+static const float kAspectRatio = 1.777778f;
+
+enum codec_t {
+	kCodecJPG = 0,
+	kCodecPNG
+};
+
+struct BitrateSetting {
+	int colour_res;
+	int depth_res;
+	int colour_qual;
+	int depth_qual;
+	codec_t colour_codec;
+	codec_t depth_codec;
+	int block_count_x;
+
+	/*int width;
+	int height;
+	int jpg_quality;
+	int png_compression;
+	codec_t colour_codec;
+	codec_t depth_codec;
+	int chunking;*/
+};
+
+static const BitrateSetting bitrate_settings[] = {
+	1080, 1080, 95, 1, kCodecJPG, kCodecPNG, 4,
+	1080, 720, 95, 1, kCodecJPG, kCodecPNG, 4,
+	720, 720, 95, 1, kCodecJPG, kCodecPNG, 4,
+	720, 576, 95, 5, kCodecJPG, kCodecPNG, 4,
+	576, 576, 95, 5, kCodecJPG, kCodecPNG, 4,
+	576, 480, 95, 5, kCodecJPG, kCodecPNG, 2,
+	480, 480, 95, 5, kCodecJPG, kCodecPNG, 2,
+	480, 360, 95, 9, kCodecJPG, kCodecPNG, 2,
+	360, 360, 95, 9, kCodecJPG, kCodecPNG, 2,
+	360, 360, 50, 9, kCodecJPG, kCodecPNG, 2
+};
+
+/*static const BitrateSetting bitrate_settings[] = {
+	1920, 1080, 95, 1, kCodecJPG, kCodecPNG, 4,	// ?
+	1280, 720, 95, 1, kCodecJPG, kCodecPNG, 4,	// ~96Mbps
+	1024, 576, 95, 5, kCodecJPG, kCodecPNG, 3,	// ~62Mbps
+	854, 480, 95, 5, kCodecJPG, kCodecPNG, 3,	// ~48Mbps
+	640, 360, 95, 9, kCodecJPG, kCodecPNG, 2,	// ~31Mbps
+	640, 360, 75, 9, kCodecJPG, kCodecPNG, 2,	// ~25Mbps
+	640, 360, 65, 9, kCodecJPG, kCodecPNG, 2,	// ~24Mbps
+	640, 360, 50, 9, kCodecJPG, kCodecPNG, 2,	// ~23Mbps
+	320, 160, 95, 9, kCodecJPG, kCodecPNG, 2,	// ~10Mbps
+	320, 160, 75, 9, kCodecJPG, kCodecPNG, 2	// ~8Mbps
+};*/
+
+typedef unsigned int bitrate_t;
+
+static const bitrate_t kBitrateBest = 0;
+static const bitrate_t kBitrateWorst = 9;
+
+/**
+ * Adaptive Bitrate Controller to monitor and decide on a client streams
+ * bitrate. The basics of our approach are that if transmission latency exceeds
+ * some proportion of the frame time then mark it as a slow frame. Similarly if
+ * transmission latency falls below a proportion of frame time then mark it as
+ * a fast frame. If the net frame status is slow (thresholded) then reduce
+ * bitrate, if the net status is fast then increase bitrate.
+ */
+class ABRController {
+	public:
+	ABRController();
+	~ABRController();
+
+	/**
+	 * From a received frame, select a bitrate based upon actual and required
+	 * bitrate as well as past frames.
+	 */
+	bitrate_t selectBitrate(const ftl::rgbd::detail::NetFrame &);
+
+	/**
+	 * Called to tell the controller the new bitrate is now in use by the stream
+	 */
+	void notifyChanged();
+
+	void setMaximumBitrate(bitrate_t);
+	void setMinimumBitrate(bitrate_t);
+
+	static const ftl::rgbd::detail::BitrateSetting &getBitrateInfo(bitrate_t b);
+	static int getColourWidth(bitrate_t b);
+	static int getDepthWidth(bitrate_t b);
+	static int getColourHeight(bitrate_t b);
+	static int getDepthHeight(bitrate_t b);
+	static int getBlockCountX(bitrate_t b);
+	static int getBlockCountY(bitrate_t b);
+	static int getBlockCount(bitrate_t b);
+	static int getColourQuality(bitrate_t b);
+	static int getDepthQuality(bitrate_t b);
+
+	private:
+	unsigned int down_log_;		// Bit log of delayed frames
+	unsigned int up_log_;		// Bit log of fast frames
+	int64_t last_br_change_;	// Time of last adaptive change
+	float down_threshold_;		// Proportion of min bitrate before reduction
+	float up_threshold_;		// Proportion of min bitrate before increase
+	bitrate_t bitrate_;
+	bool enabled_;
+	bitrate_t max_;
+	bitrate_t min_;
+};
+
+}
+}
+}
+
+#endif  // _FTL_RGBD_ABR_HPP_
diff --git a/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp b/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp
new file mode 100644
index 000000000..60620f093
--- /dev/null
+++ b/components/rgbd-sources/include/ftl/rgbd/detail/netframe.hpp
@@ -0,0 +1,49 @@
+#ifndef _FTL_RGBD_NETFRAME_HPP_
+#define _FTL_RGBD_NETFRAME_HPP_
+
+#include <cstdint>
+#include <vector>
+#include <ftl/rgbd/source.hpp>
+
+namespace ftl {
+namespace rgbd {
+namespace detail {
+
+/**
+ * Buffers for a single frame as it is being received over the network.
+ * Also maintains statistics about the frame transmission for later analysis.
+ */
+struct NetFrame {
+	cv::Mat channel1;
+	cv::Mat channel2;
+	volatile int64_t timestamp;
+	std::atomic<int> chunk_count;
+	std::atomic<int> tx_size;
+	int64_t tx_latency;
+	MUTEX mtx;
+};
+
+/**
+ * Manage multiple frames with their timestamp as an identifier. Once a frame
+ * is completed it should be freed immediately from the queue for reuse. It
+ * is not the job of this queue to buffer frames for longer periods, see Group
+ * for this functionality. This queue is only to manage chunk ordering problems.
+ */
+class NetFrameQueue {
+	public:
+	explicit NetFrameQueue(int size=2);
+	~NetFrameQueue();
+
+	NetFrame &getFrame(int64_t ts, const cv::Size &, int c1type, int c2type);
+	void freeFrame(NetFrame &);
+
+	private:
+	std::vector<NetFrame> frames_;
+	MUTEX mtx_;
+};
+
+}
+}
+}
+
+#endif  // _FTL_RGBD_NETFRAME_HPP_
diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
index b387d1fe4..a571e63a0 100644
--- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
@@ -26,6 +26,7 @@ struct StreamClient {
 	ftl::UUID peerid;
 	std::atomic<int> txcount;	// Frames sent since last request
 	int txmax;					// Frames to send in request
+	int bitrate;
 };
 
 static const unsigned int kGrabbed = 0x1;
@@ -33,6 +34,7 @@ static const unsigned int kRGB = 0x2;
 static const unsigned int kDepth = 0x4;
 
 static const unsigned int kFrameDropLimit = 5;
+static const unsigned int kMaxBitrateLevels = 10;
 
 struct StreamSource {
 	ftl::rgbd::Source *src;
@@ -42,7 +44,7 @@ struct StreamSource {
 	cv::Mat depth;									// Tx buffer
 	cv::Mat prev_rgb;
 	cv::Mat prev_depth;
-	std::list<detail::StreamClient> clients[10];	// One list per bitrate
+	std::list<detail::StreamClient> clients;
 	SHARED_MUTEX mutex;
 	unsigned long long frame;
 };
diff --git a/components/rgbd-sources/src/abr.cpp b/components/rgbd-sources/src/abr.cpp
new file mode 100644
index 000000000..5400fa5d6
--- /dev/null
+++ b/components/rgbd-sources/src/abr.cpp
@@ -0,0 +1,118 @@
+#include <ftl/rgbd/detail/abr.hpp>
+#include <ftl/timer.hpp>
+
+#include <bitset>
+
+using ftl::rgbd::detail::BitrateSetting;
+using ftl::rgbd::detail::ABRController;
+using ftl::rgbd::detail::bitrate_t;
+using ftl::rgbd::detail::kBitrateWorst;
+using ftl::rgbd::detail::kBitrateBest;
+using ftl::rgbd::detail::bitrate_settings;
+using ftl::rgbd::detail::NetFrame;
+
+ABRController::ABRController() {
+    bitrate_ = 0;
+    enabled_ = false;
+    max_ = kBitrateBest;
+    min_ = kBitrateWorst;
+}
+
+ABRController::~ABRController() {
+
+}
+
+void ABRController::setMaximumBitrate(bitrate_t b) {
+    max_ = (b == -1) ? kBitrateBest : b;
+    if (bitrate_ < max_) bitrate_ = max_;
+}
+
+void ABRController::setMinimumBitrate(bitrate_t b) {
+    min_ = (b == -1) ? kBitrateWorst : b;
+    if (bitrate_ > min_) bitrate_ = min_;
+}
+
+void ABRController::notifyChanged() {
+    //enabled_ = true;
+}
+
+bitrate_t ABRController::selectBitrate(const NetFrame &frame) {
+    if (!enabled_) return bitrate_;
+
+    float actual_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(frame.tx_latency))) / 1048576.0f;
+    float min_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f;
+    LOG(INFO) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps";
+    float ratio = actual_mbps / min_mbps;
+    //LOG(INFO) << "Rate Ratio = " << frame.tx_latency;
+
+    down_log_ = down_log_ << 1;
+    up_log_ = up_log_ << 1;
+
+    if (ratio < 1.2f) {
+        down_log_ += 1;
+    } else if (ratio > 1.5f) {
+        up_log_ += 1;
+    }
+
+    std::bitset<32> bd(down_log_);
+    std::bitset<32> bu(up_log_);
+
+    if (bitrate_ < min_ && int(bd.count()) - int(bu.count()) > 5) {
+        enabled_ = false;
+        down_log_ = 0;
+        up_log_ = 0;
+        bitrate_++;
+        LOG(INFO) << "Bitrate down to: " << bitrate_;
+    } else if (bitrate_ > max_ && int(bu.count()) - int(bd.count()) > 15) {
+        enabled_ = false;
+        up_log_ = 0;
+        down_log_ = 0;
+        bitrate_--;
+        LOG(INFO) << "Bitrate up to: " << bitrate_;
+    }
+
+    return bitrate_;
+}
+
+const BitrateSetting &ABRController::getBitrateInfo(bitrate_t b) {
+    if (b > kBitrateWorst) return bitrate_settings[kBitrateWorst];
+    if (b < kBitrateBest) return bitrate_settings[kBitrateBest];
+    return bitrate_settings[b];
+};
+
+int ABRController::getColourWidth(bitrate_t b) {
+    return std::ceil(bitrate_settings[b].colour_res * kAspectRatio);
+}
+
+int ABRController::getDepthWidth(bitrate_t b) {
+    return std::ceil(bitrate_settings[b].depth_res * kAspectRatio);
+}
+
+int ABRController::getColourHeight(bitrate_t b) {
+    return bitrate_settings[b].colour_res;
+}
+
+int ABRController::getDepthHeight(bitrate_t b) {
+    return bitrate_settings[b].depth_res;
+}
+
+int ABRController::getBlockCountX(bitrate_t b) {
+    return bitrate_settings[b].block_count_x;
+}
+
+int ABRController::getBlockCountY(bitrate_t b) {
+    return bitrate_settings[b].block_count_x;
+}
+
+int ABRController::getBlockCount(bitrate_t b) {
+    const int c = bitrate_settings[b].block_count_x;
+    return c*c;
+}
+
+int ABRController::getColourQuality(bitrate_t b) {
+    return bitrate_settings[b].colour_qual;
+}
+
+int ABRController::getDepthQuality(bitrate_t b) {
+    return bitrate_settings[b].depth_qual;
+}
diff --git a/components/rgbd-sources/src/bitrate_settings.hpp b/components/rgbd-sources/src/bitrate_settings.hpp
index 61e3ec7e4..3dbd23bc1 100644
--- a/components/rgbd-sources/src/bitrate_settings.hpp
+++ b/components/rgbd-sources/src/bitrate_settings.hpp
@@ -5,26 +5,6 @@ namespace ftl {
 namespace rgbd {
 namespace detail {
 
-struct BitrateSetting {
-	int width;
-	int height;
-	int jpg_quality;
-	int png_compression;
-};
-
-static const BitrateSetting bitrate_settings[] = {
-	1920, 1080, 95, 1,
-	1280, 720, 95, 1,
-	1280, 720, 95, 1,
-	1280, 720, 75, 1,
-	640, 360, 95, 1,
-	640, 360, 75, 5,
-	640, 360, 50, 5,
-	320, 160, 95, 5,
-	320, 160, 75, 5,
-	320, 160, 50, 9
-};
-
 }
 }
 }
diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp
index 3cab6c86d..2e873d5cf 100644
--- a/components/rgbd-sources/src/net.cpp
+++ b/components/rgbd-sources/src/net.cpp
@@ -3,6 +3,7 @@
 #include <thread>
 #include <chrono>
 #include <tuple>
+#include <bitset>
 
 #include "colour.hpp"
 
@@ -43,6 +44,7 @@ NetFrame &NetFrameQueue::getFrame(int64_t ts, const cv::Size &s, int c1type, int
 		if (f.timestamp == -1) {
 			f.timestamp = ts;
 			f.chunk_count = 0;
+			f.tx_size = 0;
 			f.channel1.create(s, c1type);
 			f.channel2.create(s, c2type);
 			return f;
@@ -108,7 +110,7 @@ bool NetSource::_getCalibration(Universe &net, const UUID &peer, const string &s
 }
 
 NetSource::NetSource(ftl::rgbd::Source *host)
-		: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), queue_(3) {
+		: ftl::rgbd::detail::Source(host), active_(false), minB_(9), maxN_(1), adaptive_(0), queue_(3) {
 
 	gamma_ = host->value("gamma", 1.0f);
 	temperature_ = host->value("temperature", 6500);
@@ -150,6 +152,9 @@ NetSource::NetSource(ftl::rgbd::Source *host)
 	chunks_dim_ = host->value("chunking",4);
 	chunk_count_ = chunks_dim_*chunks_dim_;
 
+	abr_.setMaximumBitrate(host->value("max_bitrate", -1));
+	abr_.setMinimumBitrate(host->value("min_bitrate", -1));
+
 	_updateURI();
 
 	h_ = host_->getNet()->onConnect([this](ftl::net::Peer *p) {
@@ -167,15 +172,48 @@ NetSource::~NetSource() {
 	host_->getNet()->removeCallback(h_);
 }
 
-void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
+/*void NetSource::_checkAdaptive(int64_t ts) {
+	const int64_t current = ftl::timer::get_time();
+	int64_t net_latency = current - ts;
+
+	// Only change bit rate gradually
+	if (current - last_br_change_ > ftl::rgbd::detail::kAdaptationRate) {
+		// Was this frame late?
+		if (adaptive_ < ftl::rgbd::detail::kMaxBitrateLevels && net_latency > ftl::rgbd::detail::kLatencyThreshold) {
+			slow_log_ = (slow_log_ << 1) + 1;
+			std::bitset<32> bs(slow_log_);
+
+			// Enough late frames to reduce bit rate
+			if (bs.count() > ftl::rgbd::detail::kSlowFramesThreshold) {
+				adaptive_++;
+				slow_log_ = 0;
+				last_br_change_ = current;
+				LOG(WARNING) << "Adjust bitrate to " << adaptive_;
+			}
+		// No late frames in recent history...
+		} else if (adaptive_ > 0 && slow_log_ == 0) {
+			// TODO: (Nick) Don't change bitrate up so quickly as down?
+			// Try a higher bitrate again?
+			adaptive_--;
+		}
+	}
+}*/
+
+void NetSource::_recvChunk(int64_t ts, short ttimeoff, int chunk, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
 	// TODO: Don't allocate these each chunk
 	cv::Mat tmp_rgb, tmp_depth;
 
-	//if (!active_ || ts == 0) return;
+	// Capture time here for better net latency estimate
+	int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
+
+	if (!active_) return;
 
 	const ftl::rgbd::channel_t chan = host_->getChannel();
 	NetFrame &frame = queue_.getFrame(ts, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3));
 
+	// Update frame statistics
+	frame.tx_size += jpg.size() + d.size();
+
 	// Build chunk head
 	int cx = (chunk % chunks_dim_) * chunk_width_;
 	int cy = (chunk / chunks_dim_) * chunk_height_;
@@ -183,13 +221,6 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig
 	cv::Mat chunkRGB = frame.channel1(roi);
 	cv::Mat chunkDepth = frame.channel2(roi);
 
-	auto start = std::chrono::high_resolution_clock::now();
-	int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count();
-	//LOG(INFO) << ts << " - Chunk Latency (" << chunk_count_ << ") = " << (now - ts) << " - " << ftl::pool.q_size();
-	//if (now - ts > 160) {
-	//	LOG(INFO) << "OLD PACKET: " << host_->getURI() << " (" << chunk << ") - " << ts << " (" << (now - ts) << ")";
-	//}
-
 	// Decode in temporary buffers to prevent long locks
 	cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb);
 	if (d.size() > 0) cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth);
@@ -213,10 +244,10 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig
 	// Downsized so needs a scale up
 	} else {
 		cv::resize(tmp_rgb, chunkRGB, chunkRGB.size());
-		tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f);
+		//tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f);
 		if (!tmp_depth.empty() && tmp_depth.type() == CV_16U && chunkDepth.type() == CV_32F) {
 			tmp_depth.convertTo(tmp_depth, CV_32FC1, 1.0f/1000.0f); //(16.0f*10.0f));
-			cv::resize(tmp_depth, chunkDepth, chunkDepth.size());
+			cv::resize(tmp_depth, chunkDepth, chunkDepth.size(), 0, 0, cv::INTER_NEAREST);
 		} else if (!tmp_depth.empty() && tmp_depth.type() == CV_8UC3 && chunkDepth.type() == CV_8UC3) {
 			cv::resize(tmp_depth, chunkDepth, chunkDepth.size());
 		} else {
@@ -233,11 +264,23 @@ void NetSource::_recvChunk(int64_t ts, int chunk, bool delta, const vector<unsig
 
 	if (frame.chunk_count > chunk_count_) LOG(FATAL) << "TOO MANY CHUNKS";
 
+	// Capture tx time of first received chunk
+	if (frame.chunk_count == 1) {
+		UNIQUE_LOCK(frame.mtx, flk);
+		if (frame.chunk_count == 1) {
+			frame.tx_latency = int64_t(ttimeoff);
+		}
+	}
+
+	// Last chunk now received
 	if (frame.chunk_count == chunk_count_) {
 		UNIQUE_LOCK(frame.mtx, flk);
-		timestamp_ = frame.timestamp;
 
 		if (frame.timestamp >= 0 && frame.chunk_count == chunk_count_) {
+			timestamp_ = frame.timestamp;
+			frame.tx_latency = now-(ts+frame.tx_latency);
+
+			adaptive_ = abr_.selectBitrate(frame);
 			//LOG(INFO) << "Frame finished: " << frame.timestamp;
 			auto cb = host_->callback();
 			if (cb) {
@@ -304,8 +347,8 @@ void NetSource::_updateURI() {
 
 		has_calibration_ = _getCalibration(*host_->getNet(), peer_, *uri, params_, ftl::rgbd::kChanLeft);
 
-		host_->getNet()->bind(*uri, [this](int64_t frame, int chunk, bool delta, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
-			_recvChunk(frame, chunk, delta, jpg, d);
+		host_->getNet()->bind(*uri, [this](int64_t frame, short ttimeoff, int chunk, const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
+			_recvChunk(frame, ttimeoff, chunk, jpg, d);
 		});
 
 		N_ = 0;
@@ -333,7 +376,7 @@ bool NetSource::compute(int n, int b) {
 	maxN_ = std::max(maxN_,(n == -1) ? ftl::rgbd::detail::kDefaultFrameCount : n);
 
 	// Choose best requested quality
-	minB_ = std::min(minB_,(b == -1) ? 0 : b);
+	minB_ = std::min(minB_,(b == -1) ? int(adaptive_) : b);
 
 	// Send k frames before end to prevent unwanted pause
 	// Unless only a single frame is requested
@@ -360,6 +403,8 @@ bool NetSource::compute(int n, int b) {
 			active_ = false;
 		}
 
+		abr_.notifyChanged();
+
 		maxN_ = 1;  // Reset to single frame
 		minB_ = 9;  // Reset to worst quality
 	}
diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp
index 23206896c..b71b52150 100644
--- a/components/rgbd-sources/src/net.hpp
+++ b/components/rgbd-sources/src/net.hpp
@@ -4,7 +4,9 @@
 
 #include <ftl/net/universe.hpp>
 #include <ftl/rgbd/source.hpp>
+#include <ftl/rgbd/detail/abr.hpp>
 #include <ftl/threads.hpp>
+#include <ftl/rgbd/detail/netframe.hpp>
 #include <string>
 
 namespace ftl {
@@ -12,40 +14,12 @@ namespace rgbd {
 namespace detail {
 
 static const int kDefaultFrameCount = 30;
+static const int kLatencyThreshold = 5;		// Milliseconds delay considered as late
+static const int kSlowFramesThreshold = 5;	// Number of late frames before change
+static const int kAdaptationRate = 5000;	// Milliseconds between bitrate changes
 
 /**
- * Buffers for a single frame as it is being received over the network.
- */
-struct NetFrame {
-	cv::Mat channel1;
-	cv::Mat channel2;
-	volatile int64_t timestamp;
-	std::atomic<int> chunk_count;
-	MUTEX mtx;
-};
-
-/**
- * Manage multiple frames with their timestamp as an identifier. Once a frame
- * is completed it should be freed from the queue for reuse.
- */
-class NetFrameQueue {
-	public:
-	explicit NetFrameQueue(int size=2);
-	~NetFrameQueue();
-
-	NetFrame &getFrame(int64_t ts, const cv::Size &, int c1type, int c2type);
-	void freeFrame(NetFrame &);
-
-	private:
-	std::vector<NetFrame> frames_;
-	MUTEX mtx_;
-};
-
-/**
- * RGBD source from either a stereo video file with left + right images, or
- * direct from two camera devices. A variety of algorithms are included for
- * calculating disparity, before converting to depth.  Calibration of the images
- * is also performed.
+ * A two channel network streamed source for RGB-Depth.
  */
 class NetSource : public detail::Source {
 	public:
@@ -81,19 +55,21 @@ class NetSource : public detail::Source {
 	int default_quality_;
 	int chunk_count_;
 	ftl::rgbd::channel_t prev_chan_;
-	//volatile int64_t current_frame_;
-	//std::atomic<int> chunk_count_;
 
-	// Double buffering
-	//cv::Mat d_depth_;
-	//cv::Mat d_rgb_;
+	ftl::rgbd::detail::ABRController abr_;
+
+	// Adaptive bitrate functionality
+	ftl::rgbd::detail::bitrate_t adaptive_;	 // Current adapted bitrate
+	//unsigned int slow_log_;		// Bit count of delayed frames
+	//int64_t last_br_change_;	// Time of last adaptive change
 
 	NetFrameQueue queue_;
 
 	bool _getCalibration(ftl::net::Universe &net, const ftl::UUID &peer, const std::string &src, ftl::rgbd::Camera &p, ftl::rgbd::channel_t chan);
 	void _recv(const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
-	void _recvChunk(int64_t frame, int chunk, bool delta, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
+	void _recvChunk(int64_t frame, short ttimeoff, int chunk, const std::vector<unsigned char> &jpg, const std::vector<unsigned char> &d);
 	void _updateURI();
+	//void _checkAdaptive(int64_t);
 };
 
 }
diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp
index bf7304f72..81731781c 100644
--- a/components/rgbd-sources/src/streamer.cpp
+++ b/components/rgbd-sources/src/streamer.cpp
@@ -7,13 +7,14 @@
 #include <tuple>
 #include <algorithm>
 
-#include "bitrate_settings.hpp"
+#include <ftl/rgbd/detail/abr.hpp>
 
 using ftl::rgbd::Streamer;
 using ftl::rgbd::Source;
 using ftl::rgbd::detail::StreamSource;
 using ftl::rgbd::detail::StreamClient;
 using ftl::rgbd::detail::bitrate_settings;
+using ftl::rgbd::detail::ABRController;
 using ftl::net::Universe;
 using std::string;
 using std::list;
@@ -207,21 +208,23 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID
 
 	SHARED_LOCK(mutex_, slk);
 	UNIQUE_LOCK(s->mutex, lk2);
-	for (auto &client : s->clients[rate]) {
+	for (auto &client : s->clients) {
 		// If already listening, just update chunk counters
 		if (client.peerid == peer) {
 			client.txmax = N * chunk_count_;
 			client.txcount = 0;
+			client.bitrate = rate;
 			return;
 		}
 	}
 
 	// Not an existing client so add one
-	StreamClient &c = s->clients[rate].emplace_back();
+	StreamClient &c = s->clients.emplace_back();
 	c.peerid = peer;
 	c.uri = dest;
 	c.txcount = 0;
 	c.txmax = N * chunk_count_;
+	c.bitrate = rate;
 	++s->clientCount;
 }
 
@@ -259,17 +262,20 @@ void Streamer::_cleanUp() {
 		StreamSource *src = s.second;
 		UNIQUE_LOCK(src->mutex,lk);
 
-		for (unsigned int b=0; b<10; ++b) {
-			auto i = src->clients[b].begin();
-			while (i != src->clients[b].end()) {
-				// Client request completed so remove from list
-				if ((*i).txcount >= (*i).txmax) {
-					LOG(INFO) << "Remove client: " << (*i).uri;
-					i = src->clients[b].erase(i);
-					--src->clientCount;
-				} else {
-					i++;
+		auto i = src->clients.begin();
+		while (i != src->clients.end()) {
+			// Client request completed so remove from list
+			if ((*i).txcount >= (*i).txmax) {
+				// If peer was clock sync master, the remove that...
+				if ((*i).peerid == time_peer_) {
+					timer_job_.cancel();
+					time_peer_ = ftl::UUID(0);
 				}
+				LOG(INFO) << "Remove client: " << (*i).uri;
+				i = src->clients.erase(i);
+				--src->clientCount;
+			} else {
+				i++;
 			}
 		}
 	}
@@ -327,14 +333,18 @@ void Streamer::_transmit(ftl::rgbd::FrameSet &fs) {
 	}
 
 	// Go to sleep if no clients instead of spinning the cpu
-	if (totalclients == 0 || sources_.size() == 0) sleep_for(milliseconds(200));
-	else _cleanUp();
+	if (totalclients == 0 || sources_.size() == 0) {
+		// Make sure to unlock so clients can connect!
+		lk.unlock();
+		slk.unlock();
+		sleep_for(milliseconds(50));
+	} else _cleanUp();
 }
 
 void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const cv::Mat &depth, int chunk) {
 	bool hasChan2 = (!depth.empty() && src->src->getChannel() != ftl::rgbd::kChanNone);
 
-	bool delta = (chunk+src->frame) % 8 > 0;  // Do XOR or not
+	//bool delta = (chunk+src->frame) % 8 > 0;  // Do XOR or not
 	int chunk_width = rgb.cols / chunk_dim_;
 	int chunk_height = rgb.rows / chunk_dim_;
 
@@ -342,13 +352,13 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
 	int cx = (chunk % chunk_dim_) * chunk_width;
 	int cy = (chunk / chunk_dim_) * chunk_height;
 	cv::Rect roi(cx,cy,chunk_width,chunk_height);
-	vector<unsigned char> rgb_buf;
+	//vector<unsigned char> rgb_buf;
 	cv::Mat chunkRGB = rgb(roi);
 	cv::Mat chunkDepth;
 	//cv::Mat chunkDepthPrev = src->prev_depth(roi);
 
 	cv::Mat d2, d3;
-	vector<unsigned char> d_buf;
+	//vector<unsigned char> d_buf;
 
 	if (hasChan2) {
 		chunkDepth = depth(roi);
@@ -359,8 +369,52 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
 		//d2.copyTo(chunkDepthPrev);
 	}
 
+	// TODO: Verify these don't allocate memory if not needed.
+	// TODO: Reuse these buffers to reduce allocations.
+	vector<unsigned char> brgb[ftl::rgbd::detail::kMaxBitrateLevels];
+	vector<unsigned char> bdepth[ftl::rgbd::detail::kMaxBitrateLevels];
+
+	// Lock to prevent clients being added / removed
+	SHARED_LOCK(src->mutex,lk);
+	auto c = src->clients.begin();
+	while (c != src->clients.end()) {
+		const int b = (*c).bitrate;
+
+		if (brgb[b].empty()) {
+			// Max bitrate means no changes
+			if (b == 0) {
+				_encodeChannel1(chunkRGB, brgb[b], b);
+				if (hasChan2) _encodeChannel2(d2, bdepth[b], src->src->getChannel(), b);
+
+			// Otherwise must downscale and change compression params
+			} else {
+				cv::Mat downrgb, downdepth;
+				cv::resize(chunkRGB, downrgb, cv::Size(ABRController::getColourWidth(b) / chunk_dim_, ABRController::getColourHeight(b) / chunk_dim_));
+				if (hasChan2) cv::resize(d2, downdepth, cv::Size(ABRController::getDepthWidth(b) / chunk_dim_, ABRController::getDepthHeight(b) / chunk_dim_), 0, 0, cv::INTER_NEAREST);
+
+				_encodeChannel1(downrgb, brgb[b], b);
+				if (hasChan2) _encodeChannel2(downdepth, bdepth[b], src->src->getChannel(), b);
+			}
+		}
+
+		try {
+			// TODO:(Nick) Send pose
+			short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
+			if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, chunk, brgb[b], bdepth[b])) {
+				// Send failed so mark as client stream completed
+				(*c).txcount = (*c).txmax;
+			} else {
+				++(*c).txcount;
+				//LOG(INFO) << "SENT CHUNK : " << frame_no_ << "-" << chunk;
+			}
+		} catch(...) {
+			(*c).txcount = (*c).txmax;
+		}
+		++c;
+	}
+
 	// For each allowed bitrate setting (0 = max quality)
-	for (unsigned int b=0; b<10; ++b) {
+	/*for (unsigned int b=0; b<10; ++b) {
 		{
 			//SHARED_LOCK(src->mutex,lk);
 			if (src->clients[b].size() == 0) continue;
@@ -390,7 +444,8 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
 		while (c != src->clients[b].end()) {
 			try {
 				// TODO:(Nick) Send pose
-				if (!net_->send((*c).peerid, (*c).uri, frame_no_, chunk, delta, rgb_buf, d_buf)) {
+				short pre_transmit_latency = short(ftl::timer::get_time() - frame_no_);
+				if (!net_->send((*c).peerid, (*c).uri, frame_no_, pre_transmit_latency, chunk, rgb_buf, d_buf)) {
 					// Send failed so mark as client stream completed
 					(*c).txcount = (*c).txmax;
 				} else {
@@ -402,11 +457,11 @@ void Streamer::_encodeAndTransmit(StreamSource *src, const cv::Mat &rgb, const c
 			}
 			++c;
 		}
-	}
+	}*/
 }
 
 void Streamer::_encodeChannel1(const cv::Mat &in, vector<unsigned char> &out, unsigned int b) {
-	vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality};
+	vector<int> jpgparams = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)};
 	cv::imencode(".jpg", in, out, jpgparams);
 }
 
@@ -414,11 +469,14 @@ bool Streamer::_encodeChannel2(const cv::Mat &in, vector<unsigned char> &out, ft
 	if (c == ftl::rgbd::kChanNone) return false;  // NOTE: Should not happen
 
 	if (isFloatChannel(c) && in.type() == CV_16U && in.channels() == 1) {
-		vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, bitrate_settings[b].png_compression};
-		cv::imencode(".png", in, out, params);
+		vector<int> params = {cv::IMWRITE_PNG_COMPRESSION, ABRController::getDepthQuality(b)};
+		if (!cv::imencode(".png", in, out, params)) {
+			LOG(ERROR) << "PNG Encoding error";
+			return false;
+		}
 		return true;
 	} else if (!isFloatChannel(c) && in.type() == CV_8UC3) {
-		vector<int> params = {cv::IMWRITE_JPEG_QUALITY, bitrate_settings[b].jpg_quality};
+		vector<int> params = {cv::IMWRITE_JPEG_QUALITY, ABRController::getColourQuality(b)};
 		cv::imencode(".jpg", in, out, params);
 		return true;
 	} else {
-- 
GitLab