From 49707974c146629c5f1a033aa3f68b367ca445ac Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Mon, 27 Jul 2020 11:18:53 +0300
Subject: [PATCH] Feed record on flush

---
 applications/reconstruct2/src/main.cpp        |  3 +++
 components/net/cpp/include/ftl/net/peer.hpp   |  4 +--
 components/net/cpp/src/dispatcher.cpp         |  4 +--
 components/net/cpp/src/peer.cpp               |  8 +++---
 .../streams/include/ftl/streams/feed.hpp      |  3 +++
 components/streams/src/feed.cpp               | 25 ++++++++++++++++---
 6 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/applications/reconstruct2/src/main.cpp b/applications/reconstruct2/src/main.cpp
index 813b71727..d79ac84ee 100644
--- a/applications/reconstruct2/src/main.cpp
+++ b/applications/reconstruct2/src/main.cpp
@@ -72,6 +72,8 @@ static void run(ftl::Configurable *root) {
 	});
 
 	auto *filter = feed->filter({Channel::Colour, Channel::Depth});
+	feed->set("uri", root->value("uri", std::string("ftl://ftlab.utu.fi/reconstruction")));
+	//feed->lowLatencyMode();
 	feed->startStreaming(filter);
 
 	// Just do whatever jobs are available
@@ -84,6 +86,7 @@ static void run(ftl::Configurable *root) {
 		}
 	}
 
+	nsrc_handle.cancel();
 	feed->stopRecording();
 	feed->removeFilter(filter);
 
diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp
index 5ee90444c..44dee3439 100644
--- a/components/net/cpp/include/ftl/net/peer.hpp
+++ b/components/net/cpp/include/ftl/net/peer.hpp
@@ -210,8 +210,8 @@ class Peer {
 
 	void _badClose(bool retry=true);
 	
-	void _dispatchResponse(uint32_t id, msgpack::object &obj);
-	void _sendResponse(uint32_t id, const msgpack::object &obj);
+	void _dispatchResponse(uint32_t id, const std::string &name, msgpack::object &obj);
+	void _sendResponse(uint32_t id, const std::string &name, const msgpack::object &obj);
 	
 	/**
 	 * Get the internal OS dependent socket.
diff --git a/components/net/cpp/src/dispatcher.cpp b/components/net/cpp/src/dispatcher.cpp
index 997ca1238..2be0055ae 100644
--- a/components/net/cpp/src/dispatcher.cpp
+++ b/components/net/cpp/src/dispatcher.cpp
@@ -67,7 +67,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
     
     if (type == 1) {
     	//DLOG(INFO) << "RPC return for " << id;
-    	s._dispatchResponse(id, args);
+    	s._dispatchResponse(id, name, args);
     } else if (type == 0) {
 		//DLOG(INFO) << "RPC " << name << "() <- " << s.getURI();
 
@@ -77,7 +77,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
 			//DLOG(INFO) << "Found binding for " << name;
 		    try {
 		        auto result = (*func)(s, args); //->get();
-		        s._sendResponse(id, result->get());
+		        s._sendResponse(id, name, result->get());
 		        /*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get());
 				std::stringstream buf;
 				msgpack::pack(buf, res_obj);			
diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp
index effe08b1a..0e87eaf92 100644
--- a/components/net/cpp/src/peer.cpp
+++ b/components/net/cpp/src/peer.cpp
@@ -570,7 +570,7 @@ bool Peer::_data() {
 	return true;
 }
 
-void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {	
+void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::object &res) {	
 	// TODO: Handle error reporting...
 	UNIQUE_LOCK(cb_mtx_,lk);
 	if (callbacks_.count(id) > 0) {
@@ -587,7 +587,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
 			LOG(ERROR) << "Exception in RPC response: " << e.what();
 		}
 	} else {
-		LOG(WARNING) << "Missing RPC callback for result - discarding";
+		LOG(WARNING) << "Missing RPC callback for result - discarding: " << name;
 	}
 }
 
@@ -598,8 +598,8 @@ void Peer::cancelCall(int id) {
 	}
 }
 
-void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
-	Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res);
+void Peer::_sendResponse(uint32_t id, const std::string &name, const msgpack::object &res) {
+	Dispatcher::response_t res_obj = std::make_tuple(1,id,name,res);
 	UNIQUE_LOCK(send_mtx_,lk);
 	if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
 	msgpack::pack(send_buf_, res_obj);
diff --git a/components/streams/include/ftl/streams/feed.hpp b/components/streams/include/ftl/streams/feed.hpp
index 0aa28254c..307fe36d2 100644
--- a/components/streams/include/ftl/streams/feed.hpp
+++ b/components/streams/include/ftl/streams/feed.hpp
@@ -149,6 +149,8 @@ public:
 
 	void autoConnect();
 
+	void lowLatencyMode();
+
 private:
 	// public methods acquire lock if necessary, private methods assume locking
 	// managed by caller
@@ -172,6 +174,7 @@ private:
 	std::unique_ptr<ftl::stream::Sender> recorder_;
 	std::unique_ptr<ftl::stream::Broadcast> record_stream_;
 	ftl::Handle handle_record_;
+	ftl::Handle handle_record2_;
 	ftl::Handle record_recv_handle_;
 	ftl::Handle record_new_client_;
 	Filter *record_filter_;
diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp
index 52ca7d87b..b686b8bdd 100644
--- a/components/streams/src/feed.cpp
+++ b/components/streams/src/feed.cpp
@@ -692,6 +692,7 @@ std::string Feed::getName(const std::string &puri) {
 	if (uri.isValid() == false) return "Invalid";
 
 	if (uri.getScheme() == ftl::URI::SCHEME_FTL) {
+		if (uri.hasAttribute("name")) return uri.getAttribute<std::string>("name");
 		try {
 			auto *cfgble = ftl::config::find(puri);
 			if (cfgble) {
@@ -1036,6 +1037,10 @@ std::vector<unsigned int> Feed::listFrameSets() {
 	return result;
 }
 
+void Feed::lowLatencyMode() {
+	receiver_->set("frameset_buffer_size", 0);
+}
+
 // ==== Record =================================================================
 
 void Feed::startRecording(Filter *f, const std::string &filename) {
@@ -1084,14 +1089,25 @@ void Feed::startStreaming(Filter *f) {
 }
 
 void Feed::_beginRecord(Filter *f) {
-	handle_record_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) {
-		record_stream_->select(fs->frameset(), f->channels(), true);
 
-		for (auto c : f->channels()) {
-			if (fs->hasAnyChanged(c)) recorder_->post(*fs.get(), c);
+	handle_record_ = pool_->onFlushSet([this, f](ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
+		// Skip framesets not in filter.
+		if (!f->sources().empty() && f->sources().count(fs.frameset()) == 0) return true;
+
+		if (f->channels().count(c)) {
+			recorder_->post(fs, c);
+		} else {
+			recorder_->post(fs, c, true);
 		}
 		return true;
 	});
+
+	handle_record2_ = f->onWithHandle([this, f](const ftl::data::FrameSetPtr &fs) {
+		record_stream_->select(fs->frameset(), f->channels(), true);
+		stream_->select(fs->frameset(), f->channels(), true);
+		fs->flush();  // Force now to reduce latency
+		return true;
+	});
 }
 
 void Feed::stopRecording() {
@@ -1101,6 +1117,7 @@ void Feed::stopRecording() {
 
 void Feed::_stopRecording() {
 	handle_record_.cancel();
+	handle_record2_.cancel();
 	record_new_client_.cancel();
 	record_stream_->end();
 
-- 
GitLab