From 6d362abe08f5ec0ed306460b3640132ea48b4d74 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Thu, 6 Jun 2019 16:55:57 +0300
Subject: [PATCH] Resolves #55 and manages recursive log

---
 applications/vision/src/main.cpp              |  2 +
 components/control/cpp/include/ftl/slave.hpp  |  3 +-
 components/control/cpp/src/slave.cpp          | 16 ++++++--
 .../net/cpp/include/ftl/net/universe.hpp      |  7 ++++
 components/net/cpp/src/peer.cpp               | 37 +++++++------------
 components/net/cpp/src/universe.cpp           |  5 +++
 components/rgbd-sources/src/rgbd_streamer.cpp | 15 ++++++--
 7 files changed, 54 insertions(+), 31 deletions(-)

diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp
index 64c2cfbb7..a79cbd37b 100644
--- a/applications/vision/src/main.cpp
+++ b/applications/vision/src/main.cpp
@@ -105,6 +105,7 @@ static void run(ftl::Configurable *root) {
 
 	LOG(INFO) << "Stopping...";
 	stream->stop();
+	net->shutdown();
 
 	delete stream;
 	delete display;
@@ -126,6 +127,7 @@ int main(int argc, char **argv) {
 	//	ftl::middlebury::test(config);
 	//}
 
+	delete root;
 	return ftl::exit_code;
 }
 
diff --git a/components/control/cpp/include/ftl/slave.hpp b/components/control/cpp/include/ftl/slave.hpp
index dc6010635..329c7e664 100644
--- a/components/control/cpp/include/ftl/slave.hpp
+++ b/components/control/cpp/include/ftl/slave.hpp
@@ -19,7 +19,8 @@ class Slave {
 	private:
 	std::vector<ftl::UUID> log_peers_;
 	ftl::net::Universe *net_;
-	std::mutex mutex_;
+	std::recursive_mutex mutex_;
+	bool in_log_;
 };
 
 }
diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp
index 75af907a8..c63df697a 100644
--- a/components/control/cpp/src/slave.cpp
+++ b/components/control/cpp/src/slave.cpp
@@ -6,13 +6,14 @@ using ftl::ctrl::Slave;
 using std::string;
 using std::mutex;
 using std::unique_lock;
+using std::recursive_mutex;
 
 static void netLog(void* user_data, const loguru::Message& message) {
 	Slave *slave = static_cast<Slave*>(user_data);
 	slave->sendLog(message);
 }
 
-Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) {
+Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) {
 	net->bind("restart", []() {
 		LOG(WARNING) << "Remote restart...";
 		//exit(1);
@@ -43,7 +44,7 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) {
 	});
 
 	net->bind("log_subscribe", [this](const ftl::UUID &peer) {
-		unique_lock<mutex> lk(mutex_);
+		unique_lock<recursive_mutex> lk(mutex_);
 		log_peers_.push_back(peer);
 	});
 
@@ -55,10 +56,19 @@ Slave::~Slave() {
 }
 
 void Slave::sendLog(const loguru::Message& message) {
-	unique_lock<mutex> lk(mutex_);
+	unique_lock<recursive_mutex> lk(mutex_);
+	if (in_log_) return;
+	in_log_ = true;
+
 	for (auto &p : log_peers_) {
+		auto peer = net_->getPeer(p);
+		if (!peer || !peer->isConnected()) continue;
+
+		std::cout << "sending log to master..." << std::endl;
 		if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) {
 			// TODO(Nick) Remove peer from loggers list...
 		}
 	}
+
+	in_log_ = false;
 }
diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp
index 2d4bdc71f..66d0dac54 100644
--- a/components/net/cpp/include/ftl/net/universe.hpp
+++ b/components/net/cpp/include/ftl/net/universe.hpp
@@ -53,6 +53,13 @@ class Universe : public ftl::Configurable {
 	 * @param addr URI giving protocol, interface and port
 	 */
 	bool listen(const std::string &addr);
+
+	/**
+	 * Essential to call this before destroying anything that registered
+	 * callbacks or binds for RPC. It will terminate all connections and
+	 * stop any network activity but without deleting the net object.
+	 */
+	void shutdown();
 	
 	/**
 	 * Create a new peer connection.
diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp
index ed732f1b9..24377a4d7 100644
--- a/components/net/cpp/src/peer.cpp
+++ b/components/net/cpp/src/peer.cpp
@@ -165,6 +165,7 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) {
 
 		bind("__disconnect__", [this]() {
 			_badClose(false);
+			LOG(INFO) << "Peer elected to disconnect: " << id().to_string();
 		});
 
 		send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); 
@@ -257,6 +258,7 @@ void Peer::close(bool retry) {
 		send("__disconnect__");
 
 		_badClose(retry);
+		LOG(INFO) << "Deliberate disconnect of peer.";
 	}
 }
 
@@ -269,35 +271,16 @@ void Peer::_badClose(bool retry) {
 		#endif
 		sock_ = INVALID_SOCKET;
 		status_ = kDisconnected;
-
-		// Attempt auto reconnect?
-		if (retry) LOG(INFO) << "Should attempt reconnect...";
 		
 		//auto i = find(sockets.begin(),sockets.end(),this);
 		//sockets.erase(i);
 		
 		_trigger(close_handlers_);
-	}
-}
 
-/*void Peer::setProtocol(Protocol *p) {
-	if (p != NULL) {
-		if (proto_ == p) return;
-		if (proto_ && proto_->id() == p->id()) return;
-		
-		if (remote_proto_ != "") {
-			Handshake hs1;
-			hs1.magic = ftl::net::MAGIC;
-			//hs1.name_size = 0;
-			hs1.proto_size = p->id().size();
-			send(FTL_PROTOCOL_HS1, hs1, p->id());
-			LOG(INFO) << "Handshake initiated with " << uri_;
-		}
-		
-		proto_ = p;
-	} else {
+		// Attempt auto reconnect?
+		if (retry) LOG(INFO) << "Should attempt reconnect...";
 	}
-}*/
+}
 
 void Peer::socketError() {
 	int err;
@@ -307,6 +290,11 @@ void Peer::socketError() {
 	uint32_t optlen = sizeof(err);
 #endif
 	getsockopt(sock_, SOL_SOCKET, SO_ERROR, (char*)&err, &optlen);
+
+	// Must close before log since log may try to send over net causing
+	// more socket errors...
+	_badClose();
+
 	LOG(ERROR) << "Socket: " << uri_ << " - error " << err;
 }
 
@@ -415,6 +403,8 @@ void Peer::_connected() {
 }
 
 int Peer::_send() {
+	if (sock_ == INVALID_SOCKET) return -1;
+
 	// Are we using a websocket?
 	if (scheme_ == ftl::URI::SCHEME_WS) {
 		// Create a websocket header as well.
@@ -453,7 +443,7 @@ int Peer::_send() {
 	// We are blocking, so -1 should mean actual error
 	if (c == -1) {
 		socketError();
-		_badClose();
+		//_badClose();
 	}
 	
 	return c;
@@ -463,6 +453,7 @@ Peer::~Peer() {
 	std::unique_lock<std::mutex> lk1(send_mtx_);
 	std::unique_lock<std::mutex> lk2(recv_mtx_);
 	_badClose(false);
+	LOG(INFO) << "Deleting peer object";
 
 	delete disp_;
 }
diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp
index 04404e8da..38c67a611 100644
--- a/components/net/cpp/src/universe.cpp
+++ b/components/net/cpp/src/universe.cpp
@@ -51,6 +51,11 @@ Universe::Universe(nlohmann::json &config) :
 }
 
 Universe::~Universe() {
+	shutdown();
+}
+
+void Universe::shutdown() {
+	if (!active_) return;
 	LOG(INFO) << "Cleanup Network ...";
 
 	active_ = false;
diff --git a/components/rgbd-sources/src/rgbd_streamer.cpp b/components/rgbd-sources/src/rgbd_streamer.cpp
index fe758646f..517adfa9f 100644
--- a/components/rgbd-sources/src/rgbd_streamer.cpp
+++ b/components/rgbd-sources/src/rgbd_streamer.cpp
@@ -89,7 +89,7 @@ void Streamer::add(RGBDSource *src) {
 	unique_lock<shared_mutex> ulk(mutex_);
 	if (sources_.find(src->getURI()) != sources_.end()) return;
 
-	StreamSource *s = new StreamSource; // = sources_.emplace(std::make_pair<std::string,StreamSource>(src->getURI(),{}));
+	StreamSource *s = new StreamSource;
 	s->src = src;
 	s->state = 0;
 	sources_[src->getURI()] = s;
@@ -140,6 +140,8 @@ void Streamer::poll() {
 	if (elapsed.count() >= wait) {
 		LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count());
 	} else {
+		// Otherwise, wait until next frame should start.
+		// CHECK(Nick) Is this accurate enough?
 		sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f)));
 	}
 }
@@ -148,13 +150,13 @@ void Streamer::run(bool block) {
 	active_ = true;
 
 	if (block) {
-		while (active_) {
+		while (ftl::running && active_) {
 			poll();
 		}
 	} else {
 		// Create thread job for frame ticking
 		pool_.push([this](int id) {
-			while (active_) {
+			while (ftl::running && active_) {
 				poll();
 			}
 		});
@@ -177,6 +179,7 @@ void Streamer::_schedule() {
 		string uri = s.first;
 
 		shared_lock<shared_mutex> slk(s.second->mutex);
+		// CHECK Should never be true now
 		if (s.second->state != 0) {
 			if (!late_) LOG(WARNING) << "Stream not ready to schedule on time: " << uri;
 			late_ = true;
@@ -185,12 +188,14 @@ void Streamer::_schedule() {
 			late_ = false;
 		}
 
+		// No point in doing work if no clients
 		if (s.second->clients[0].size() == 0) {
 			//LOG(ERROR) << "Stream has no clients: " << uri;
 			continue;
 		}
 		slk.unlock();
 
+		// There will be two jobs for this source...
 		unique_lock<mutex> lk(job_mtx);
 		jobs += 2;
 		lk.unlock();
@@ -211,6 +216,7 @@ void Streamer::_schedule() {
 			_swap(*src);
 			lk.unlock();
 
+			// Mark job as finished
 			unique_lock<mutex> ulk(job_mtx);
 			jobs--;
 			ulk.unlock();
@@ -261,6 +267,7 @@ void Streamer::_schedule() {
 			_swap(*src);
 			lk.unlock();
 
+			// Mark job as finished
 			unique_lock<mutex> ulk(job_mtx);
 			jobs--;
 			ulk.unlock();
@@ -268,7 +275,7 @@ void Streamer::_schedule() {
 		});
 	}
 
-	// TODO Wait until all jobs completed...
+	// Wait for all jobs to complete before finishing frame
 	unique_lock<mutex> lk(job_mtx);
 	job_cv.wait(lk, [&jobs]{ return jobs == 0; });
 }
-- 
GitLab