From 170db52a7a815bb50df5bc73f6049a9ca7e1e4de Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Thu, 6 Jun 2019 11:36:36 +0300
Subject: [PATCH] Should improve #55 stability and fps stability

---
 applications/vision/src/main.cpp              |  6 +-
 .../common/cpp/include/ftl/configuration.hpp  |  3 +
 components/common/cpp/src/configuration.cpp   |  5 +-
 components/control/cpp/src/slave.cpp          |  2 +-
 .../net/cpp/include/ftl/net/universe.hpp      |  6 +-
 components/net/cpp/src/universe.cpp           |  8 ++-
 components/rgbd-sources/src/rgbd_streamer.cpp | 59 ++++++++++++++++---
 7 files changed, 71 insertions(+), 18 deletions(-)

diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp
index bb77b0810..64c2cfbb7 100644
--- a/applications/vision/src/main.cpp
+++ b/applications/vision/src/main.cpp
@@ -96,16 +96,16 @@ static void run(ftl::Configurable *root) {
 	stream->add(source);
 	stream->run();
 
-	while (display->active()) {
+	while (ftl::running && display->active()) {
 		cv::Mat rgb, depth;
 		source->getRGBD(rgb, depth);
 		if (!rgb.empty()) display->render(rgb, depth, source->getParameters());
 		display->wait(10);
 	}
 
+	LOG(INFO) << "Stopping...";
 	stream->stop();
 
-	LOG(INFO) << "Finished.";
 	delete stream;
 	delete display;
 	delete source;
@@ -125,5 +125,7 @@ int main(int argc, char **argv) {
 	//} else {
 	//	ftl::middlebury::test(config);
 	//}
+
+	return ftl::exit_code;
 }
 
diff --git a/components/common/cpp/include/ftl/configuration.hpp b/components/common/cpp/include/ftl/configuration.hpp
index 6a91c9b05..531852ca5 100644
--- a/components/common/cpp/include/ftl/configuration.hpp
+++ b/components/common/cpp/include/ftl/configuration.hpp
@@ -12,6 +12,9 @@
 
 namespace ftl {
 
+extern bool running;
+extern int exit_code;
+
 class Configurable;
 
 bool is_directory(const std::string &path);
diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp
index c4369bd00..11385dee3 100644
--- a/components/common/cpp/src/configuration.cpp
+++ b/components/common/cpp/src/configuration.cpp
@@ -47,6 +47,9 @@ using ftl::config::config;
 
 static Configurable *rootCFG = nullptr;
 
+bool ftl::running = true;
+int ftl::exit_code = 0;
+
 bool ftl::is_directory(const std::string &path) {
 #ifdef WIN32
 	DWORD attrib = GetFileAttributesA(path.c_str());
@@ -413,7 +416,7 @@ static void signalIntHandler( int signum ) {
    // cleanup and close up stuff here  
    // terminate program  
 
-   exit(0);
+   ftl::running = false;
 }
 
 Configurable *ftl::config::configure(int argc, char **argv, const std::string &root) {
diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp
index 700c1da56..fb30b8d2b 100644
--- a/components/control/cpp/src/slave.cpp
+++ b/components/control/cpp/src/slave.cpp
@@ -7,7 +7,7 @@ using ftl::ctrl::Slave;
 
 static void netLog(void* user_data, const loguru::Message& message) {
 	Universe *net = (Universe*)user_data;
-	net->publish("log", message.preamble, message.message);
+	//net->publish("log", message.preamble, message.message);
 }
 
 Slave::Slave(Universe *net, ftl::Configurable *root) {
diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp
index d849f9b0e..21a5da2ea 100644
--- a/components/net/cpp/include/ftl/net/universe.hpp
+++ b/components/net/cpp/include/ftl/net/universe.hpp
@@ -108,7 +108,7 @@ class Universe : public ftl::Configurable {
 	R call(const UUID &pid, const std::string &name, ARGS... args);
 	
 	template <typename... ARGS>
-	void send(const UUID &pid, const std::string &name, ARGS... args);
+	bool send(const UUID &pid, const std::string &name, ARGS... args);
 
 	template <typename R, typename... ARGS>
 	std::optional<R> findOne(const std::string &name, ARGS... args);
@@ -316,13 +316,13 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
 }
 
 template <typename... ARGS>
-void Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
+bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) {
 	Peer *p = getPeer(pid);
 	if (p == nullptr) {
 		LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string();
 		throw -1;
 	}
-	p->send(name, args...);
+	return p->send(name, args...) > 0;
 }
 
 template <typename... ARGS>
diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp
index 2b28a36b0..04404e8da 100644
--- a/components/net/cpp/src/universe.cpp
+++ b/components/net/cpp/src/universe.cpp
@@ -253,6 +253,7 @@ void Universe::_run() {
 		int n = _setDescriptors();
 		int selres = 1;
 
+		// It is an error to use "select" with no sockets ... so just sleep
 		if (n == 0) {
 			std::this_thread::sleep_for(std::chrono::milliseconds(300));
 			continue;
@@ -294,6 +295,8 @@ void Universe::_run() {
 						_installBindings(p);
 						p->onConnect([this](Peer &p) {
 							peer_ids_[p.id()] = &p;
+							// Note, called in another thread so above lock
+							// does not apply.
 							_notifyConnect(&p);
 						});
 					}
@@ -314,6 +317,7 @@ void Universe::_run() {
 				}
 			}
 		}
+		// TODO(Nick) Don't always need to call this
 		_cleanupPeers();
 	}
 }
@@ -381,8 +385,8 @@ void Universe::_notifyConnect(Peer *p) {
 }
 
 void Universe::_notifyDisconnect(Peer *p) {
+	// In all cases, should already be locked outside this function call
 	//unique_lock<mutex> lk(net_mutex_);
-	LOG(INFO) << "NOTIFY DISCONNECT";
 	for (auto &i : on_disconnect_) {
 		try {
 			i.h(p);
@@ -393,5 +397,5 @@ void Universe::_notifyDisconnect(Peer *p) {
 }
 
 void Universe::_notifyError(Peer *p, const ftl::net::Error &e) {
-
+	// TODO(Nick)
 }
diff --git a/components/rgbd-sources/src/rgbd_streamer.cpp b/components/rgbd-sources/src/rgbd_streamer.cpp
index 375ae8ebd..6aefb9725 100644
--- a/components/rgbd-sources/src/rgbd_streamer.cpp
+++ b/components/rgbd-sources/src/rgbd_streamer.cpp
@@ -113,6 +113,7 @@ void Streamer::remove(const std::string &) {
 
 void Streamer::stop() {
 	active_ = false;
+	pool_.stop();
 }
 
 void Streamer::run(bool block) {
@@ -128,7 +129,11 @@ void Streamer::run(bool block) {
 			std::chrono::duration<double> elapsed =
 				std::chrono::high_resolution_clock::now() - start;
 
-			sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
+			if (elapsed.count() >= wait) {
+				LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
+			} else {
+				sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
+			}
 		}
 	} else {
 		// Create thread job for frame ticking
@@ -142,7 +147,11 @@ void Streamer::run(bool block) {
 				std::chrono::duration<double> elapsed =
 					std::chrono::high_resolution_clock::now() - start;
 
-				sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
+				if (elapsed.count() >= wait) {
+					LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
+				} else {
+					sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
+				}
 			}
 		});
 	}
@@ -156,12 +165,16 @@ void Streamer::_swap(StreamSource &src) {
 }
 
 void Streamer::_schedule() {
+	std::mutex job_mtx;
+	std::condition_variable job_cv;
+	int jobs = 0;
+
 	for (auto s : sources_) {
 		string uri = s.first;
 
 		shared_lock<shared_mutex> slk(s.second->mutex);
 		if (s.second->state != 0) {
-			LOG(WARNING) << "Stream not ready to schedule on time: " << uri;
+			LOG(WARNING) << "Stream not ready to schedule on time: " << uri << " (" << s.second->state << ")";
 			continue;
 		}
 		if (s.second->clients[0].size() == 0) {
@@ -170,19 +183,34 @@ void Streamer::_schedule() {
 		}
 		slk.unlock();
 
+		unique_lock<mutex> lk(job_mtx);
+		jobs += 2;
+		lk.unlock();
+
 		// Grab job
-		pool_.push([this,uri](int id) {
+		pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) {
 			StreamSource *src = sources_[uri];
-			src->src->grab();
+
+			try {
+				src->src->grab();
+			} catch(...) {
+				LOG(ERROR) << "Grab Exception for: " << uri;
+			}
 
 			unique_lock<shared_mutex> lk(src->mutex);
 			src->state |= ftl::rgbd::detail::kGrabbed;
 			_swap(*src);
+			lk.unlock();
+
+			unique_lock<mutex> ulk(job_mtx);
+			jobs--;
+			ulk.unlock();
+			job_cv.notify_one();
 		});
 
 		// Transmit job
 		// TODO, could do one for each bitrate...
-		pool_.push([this,uri](int id) {
+		pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) {
 			StreamSource *src = sources_[uri];
 
 			if (src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) {
@@ -197,13 +225,16 @@ void Streamer::_schedule() {
 				auto i = src->clients[0].begin();
 				while (i != src->clients[0].end()) {
 					try {
-						net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf);
+						if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) {
+							(*i).txcount = (*i).txmax;
+						}
 					} catch(...) {
 						(*i).txcount = (*i).txmax;
 					}
 					(*i).txcount++;
 					if ((*i).txcount >= (*i).txmax) {
-						LOG(INFO) << "Remove client";
+						DLOG(2) << "Remove client";
+						unique_lock<shared_mutex> lk(src->mutex);
 						i = src->clients[0].erase(i);
 					} else {
 						i++;
@@ -212,11 +243,21 @@ void Streamer::_schedule() {
 			}
 
 			unique_lock<shared_mutex> lk(src->mutex);
-			LOG(INFO) << "Tx Frame: " << uri;
+			DLOG(1) << "Tx Frame: " << uri;
 			src->state |= ftl::rgbd::detail::kTransmitted;
 			_swap(*src);
+			lk.unlock();
+
+			unique_lock<mutex> ulk(job_mtx);
+			jobs--;
+			ulk.unlock();
+			job_cv.notify_one();
 		});
 	}
+
+	// TODO Wait until all jobs completed...
+	unique_lock<mutex> lk(job_mtx);
+	job_cv.wait(lk, [&jobs]{ return jobs == 0; });
 }
 
 RGBDSource *Streamer::get(const std::string &uri) {
-- 
GitLab