From b57a5234b529e602babf88659e2d4e8fd6678ab2 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Sat, 15 Jun 2019 11:08:49 +0300
Subject: [PATCH] Split jpg and png compression

---
 components/net/cpp/include/ftl/net/peer.hpp   |   3 +-
 .../include/ftl/rgbd/streamer.hpp             |   7 +-
 components/rgbd-sources/src/streamer.cpp      | 112 ++++++++++++------
 3 files changed, 82 insertions(+), 40 deletions(-)

diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp
index 35b7bfac3..605c0fb0d 100644
--- a/components/net/cpp/include/ftl/net/peer.hpp
+++ b/components/net/cpp/include/ftl/net/peer.hpp
@@ -273,7 +273,8 @@ int Peer::send(const std::string &s, ARGS... args) {
 	auto args_obj = std::make_tuple(args...);
 	auto call_obj = std::make_tuple(0,s,args_obj);
 	msgpack::pack(send_buf_, call_obj);
-	return _send();
+	int rc = _send();
+	return rc;
 }
 
 template <typename F>
diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
index 63f5c06fa..208fef758 100644
--- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
@@ -26,13 +26,16 @@ struct StreamClient {
 };
 
 static const unsigned int kGrabbed = 0x1;
-static const unsigned int kTransmitted = 0x2; 
+static const unsigned int kRGB = 0x2;
+static const unsigned int kDepth = 0x4; 
 
 struct StreamSource {
 	ftl::rgbd::Source *src;
 	std::atomic<unsigned int> state;				// Busy or ready to swap?
 	cv::Mat rgb;									// Tx buffer
 	cv::Mat depth;									// Tx buffer
+	std::vector<unsigned char> rgb_buf;
+	std::vector<unsigned char> d_buf;
 	std::vector<detail::StreamClient> clients[10];	// One list per bitrate
 	std::shared_mutex mutex;
 };
@@ -108,7 +111,7 @@ class Streamer : public ftl::Configurable {
 	std::atomic<int> jobs_;
 
 	void _schedule();
-	void _swap(detail::StreamSource &);
+	void _swap(detail::StreamSource *);
 	void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
 };
 
diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp
index 2e084975d..651434ee0 100644
--- a/components/rgbd-sources/src/streamer.cpp
+++ b/components/rgbd-sources/src/streamer.cpp
@@ -170,6 +170,7 @@ void Streamer::poll() {
 	if (elapsed.count() >= wait) {
 		LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
 	} else {
+		LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count());
 		// Otherwise, wait until next frame should start.
 		// CHECK(Nick) Is this accurate enough? Almost certainly not
 		// TODO(Nick) Synchronise by time corrections and use of fixed time points
@@ -196,10 +197,32 @@ void Streamer::run(bool block) {
 }
 
 // Must be called in source locked state or src.state must be atomic
-void Streamer::_swap(StreamSource &src) {
-	if (src.state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kTransmitted)) {
-		src.src->getFrames(src.rgb, src.depth);
-		src.state = 0;
+void Streamer::_swap(StreamSource *src) {
+	if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) {
+		UNIQUE_LOCK(src->mutex,lk);
+
+		if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) {
+			auto i = src->clients[0].begin();
+			while (i != src->clients[0].end()) {
+				try {
+					// TODO(Nick) Send pose and timestamp
+					if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) {
+						(*i).txcount = (*i).txmax;
+					}
+				} catch(...) {
+					(*i).txcount = (*i).txmax;
+				}
+				(*i).txcount++;
+				if ((*i).txcount >= (*i).txmax) {
+					LOG(INFO) << "Remove client: " << (*i).uri;
+					i = src->clients[0].erase(i);
+				} else {
+					i++;
+				}
+			}
+		}
+		src->src->getFrames(src->rgb, src->depth);
+		src->state = 0;
 	}
 }
 
@@ -234,7 +257,7 @@ void Streamer::_schedule() {
 
 		// There will be two jobs for this source...
 		//UNIQUE_LOCK(job_mtx_,lk);
-		jobs_ += 2;
+		jobs_ += 3;
 		//lk.unlock();
 
 		StreamSource *src = sources_[uri];
@@ -248,13 +271,45 @@ void Streamer::_schedule() {
 			// CHECK (Nick) Can state be an atomic instead?
 			//UNIQUE_LOCK(src->mutex, lk);
 			src->state |= ftl::rgbd::detail::kGrabbed;
-			_swap(*src);
-			//lk.unlock();
+			_swap(src);
 
 			// Mark job as finished
-			//UNIQUE_LOCK(job_mtx_, ulk);
-			//jobs_--;
-			//ulk.unlock();
+			--jobs_;
+			job_cv_.notify_one();
+		});
+
+		// Compress colour job
+		pool_.push([this,src](int id) {
+			if (!src->rgb.empty()) {
+				auto start = std::chrono::high_resolution_clock::now();
+
+				//vector<unsigned char> src->rgb_buf;
+				cv::imencode(".jpg", src->rgb, src->rgb_buf);
+			}
+
+			src->state |= ftl::rgbd::detail::kRGB;
+			_swap(src);
+			--jobs_;
+			job_cv_.notify_one();
+		});
+
+		// Compress depth job
+		pool_.push([this,src](int id) {
+			if (!src->depth.empty()) {
+				cv::Mat d2;
+				src->depth.convertTo(d2, CV_16UC1, 16*100);
+				//vector<unsigned char> d_buf;
+
+				// Setting 1 = fast but large
+				// Setting 9 = small but slow
+				// Anything up to 8 causes minimal if any impact on frame rate
+				// on my (Nicks) laptop, but 9 halves the frame rate.
+				vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow.
+				cv::imencode(".png", d2, src->d_buf, pngparams);
+			}
+
+			src->state |= ftl::rgbd::detail::kDepth;
+			_swap(src);
 			--jobs_;
 			job_cv_.notify_one();
 		});
@@ -264,13 +319,19 @@ void Streamer::_schedule() {
 		// meaning that no lock is required here since outer shared_lock
 		// prevents addition of new clients.
 		// TODO, could do one for each bitrate...
-		pool_.push([this,src](int id) {
+		/* pool_.push([this,src](int id) {
 			//StreamSource *src = sources_[uri];
 
 			try {
 			if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) {
+				auto start = std::chrono::high_resolution_clock::now();
+
 				vector<unsigned char> rgb_buf;
 				cv::imencode(".jpg", src->rgb, rgb_buf);
+
+				std::chrono::duration<double> elapsed =
+					std::chrono::high_resolution_clock::now() - start;
+				LOG(INFO) << "JPG in " << elapsed.count() << "s";
 				
 				cv::Mat d2;
 				src->depth.convertTo(d2, CV_16UC1, 16*100);
@@ -285,36 +346,12 @@ void Streamer::_schedule() {
 
 				//LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb";
 
-				{
-					UNIQUE_LOCK(src->mutex,lk);
-					auto i = src->clients[0].begin();
-					while (i != src->clients[0].end()) {
-						try {
-							// TODO(Nick) Send pose and timestamp
-							if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) {
-								(*i).txcount = (*i).txmax;
-							}
-						} catch(...) {
-							(*i).txcount = (*i).txmax;
-						}
-						(*i).txcount++;
-						if ((*i).txcount >= (*i).txmax) {
-							LOG(INFO) << "Remove client: " << (*i).uri;
-							i = src->clients[0].erase(i);
-						} else {
-							i++;
-						}
-					}
-				}
+				
 			}
 			} catch(...) {
 				LOG(ERROR) << "Error in transmission loop";
 			}
 
-			/*std::chrono::duration<double> elapsed =
-					std::chrono::high_resolution_clock::now() - start;
-			LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/
-
 			// CHECK (Nick) Could state be an atomic?
 			//UNIQUE_LOCK(src->mutex,lk);
 			//LOG(INFO) << "Tx Frame: " << uri;
@@ -326,9 +363,10 @@ void Streamer::_schedule() {
 			//UNIQUE_LOCK(job_mtx_,ulk);
 			//jobs_--;
 			//ulk.unlock();
+
 			--jobs_;
 			job_cv_.notify_one();
-		});
+		});*/
 	}
 }
 
-- 
GitLab