From 57e7558908b0668ecd99962b02f653beeb8ed437 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Fri, 14 Jun 2019 13:40:23 +0300
Subject: [PATCH] Partial improvement to sync

---
 applications/reconstruct/src/main.cpp    |  6 +++-
 components/net/cpp/src/peer.cpp          |  2 ++
 components/net/cpp/src/universe.cpp      |  3 ++
 components/rgbd-sources/src/net.cpp      | 13 ++++-----
 components/rgbd-sources/src/streamer.cpp | 35 ++++++++----------------
 5 files changed, 27 insertions(+), 32 deletions(-)

diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp
index 003e8cc48..7885c167e 100644
--- a/applications/reconstruct/src/main.cpp
+++ b/applications/reconstruct/src/main.cpp
@@ -194,6 +194,11 @@ static void run(ftl::Configurable *root) {
 			//net.broadcast("grab");  // To sync cameras
 			scene->nextFrame();
 		
+			// TODO(Nick) Improve sync further...
+			for (size_t i = 0; i < inputs.size(); i++) {
+				if (inputs[i].source->isReady()) inputs[i].source->grab();
+			}
+
 			for (size_t i = 0; i < inputs.size(); i++) {
 				if (!inputs[i].source->isReady()) {
 					inputs[i].params.m_imageWidth = 0;
@@ -220,7 +225,6 @@ static void run(ftl::Configurable *root) {
 				// Get the RGB-Depth frame from input
 				Source *input = inputs[i].source;
 				Mat rgb, depth;
-				input->grab();
 				input->getFrames(rgb,depth);
 				
 				active += 1;
diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp
index 52e082373..1541b56fd 100644
--- a/components/net/cpp/src/peer.cpp
+++ b/components/net/cpp/src/peer.cpp
@@ -389,6 +389,8 @@ void Peer::error(int e) {
 }
 
 void Peer::data() {
+	// TODO(Nick) Should not enter here twice if recv call has yet to be
+	// processed.
 	//if (!is_waiting_) return;
 	//is_waiting_ = false;
 	std::unique_lock<std::recursive_mutex> lk(recv_mtx_);
diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp
index b0a72d1f3..af510a710 100644
--- a/components/net/cpp/src/universe.cpp
+++ b/components/net/cpp/src/universe.cpp
@@ -307,6 +307,9 @@ void Universe::_run() {
 		block.tv_usec = 10000;
 		selres = select(n+1, &sfdread_, 0, &sfderror_, &block);
 
+		// NOTE Nick: Is it possible that not all the recvs have been called before I
+		// again reach a select call!? What are the consequences of this? A double recv attempt?
+
 		//Some kind of error occured, it is usually possible to recover from this.
 		if (selres < 0) {
 			switch (errno) {
diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp
index a52bfea4e..7c56e5636 100644
--- a/components/rgbd-sources/src/net.cpp
+++ b/components/rgbd-sources/src/net.cpp
@@ -70,12 +70,6 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch
 	depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f));
 
 	N_--;
-	if (N_ == 0) {
-		N_ += 10;
-		if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) {
-			active_ = false;
-		}
-	}
 }
 
 void NetSource::setPose(const Eigen::Matrix4f &pose) {
@@ -134,7 +128,12 @@ void NetSource::_updateURI() {
 }
 
 bool NetSource::grab() {
-	// net_.broadcast("grab");
+	if (N_ == 0) {
+		N_ += 10;
+		if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) {
+			active_ = false;
+		}
+	}
 	return true;
 }
 
diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp
index b3b093e49..00a370fc6 100644
--- a/components/rgbd-sources/src/streamer.cpp
+++ b/components/rgbd-sources/src/streamer.cpp
@@ -139,7 +139,7 @@ void Streamer::stop() {
 }
 
 void Streamer::poll() {
-	double wait = 1.0f / 25.0f;
+	double wait = 1.0f / 25.0f;  // TODO(Nick) Should be in config
 	auto start = std::chrono::high_resolution_clock::now();
 	// Create frame jobs at correct FPS interval
 	_schedule();
@@ -194,22 +194,10 @@ void Streamer::_schedule() {
 	for (auto s : sources_) {
 		string uri = s.first;
 
-		//shared_lock<shared_mutex> slk(s.second->mutex);
-		// CHECK Should never be true now
-		/*if (s.second->state != 0) {
-			if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri;
-			late_ = true;
-			continue;
-		} else {
-			late_ = false;
-		}*/
-
 		// No point in doing work if no clients
 		if (s.second->clients[0].size() == 0) {
-			//LOG(ERROR) << "Stream has no clients: " << uri;
 			continue;
 		}
-		//slk.unlock();
 
 		// There will be two jobs for this source...
 		unique_lock<mutex> lk(job_mtx);
@@ -220,15 +208,7 @@ void Streamer::_schedule() {
 		pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) {
 			StreamSource *src = sources_[uri];
 
-			//auto start = std::chrono::high_resolution_clock::now();
-			//try {
-				src->src->grab();
-			//} catch(...) {
-			//	LOG(ERROR) << "Grab Exception for: " << uri;
-			//}
-			/*std::chrono::duration<double> elapsed =
-					std::chrono::high_resolution_clock::now() - start;
-			LOG(INFO) << "GRAB Elapsed: " << elapsed.count();*/
+			src->src->grab();
 
 			// CHECK (Nick) Can state be an atomic instead?
 			unique_lock<shared_mutex> lk(src->mutex);
@@ -259,7 +239,15 @@ void Streamer::_schedule() {
 				cv::Mat d2;
 				src->depth.convertTo(d2, CV_16UC1, 16*100);
 				vector<unsigned char> d_buf;
-				cv::imencode(".png", d2, d_buf);
+
+				// Setting 1 = fast but large
+				// Setting 9 = small but slow
+				// Anything up to 8 causes minimal if any impact on frame rate
+				// on my (Nicks) laptop, but 9 halves the frame rate.
+				vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 5}; // Default is 1 for fast, 9 = small but slow.
+				cv::imencode(".png", d2, d_buf, pngparams);
+
+				//LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb";
 
 				auto i = src->clients[0].begin();
 				while (i != src->clients[0].end()) {
@@ -274,7 +262,6 @@ void Streamer::_schedule() {
 					(*i).txcount++;
 					if ((*i).txcount >= (*i).txmax) {
 						LOG(INFO) << "Remove client: " << (*i).uri;
-						//unique_lock<shared_mutex> lk(src->mutex);
 						i = src->clients[0].erase(i);
 					} else {
 						i++;
-- 
GitLab