From 8af1fbe6ddb10853e081349431abb278b240f498 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Sun, 5 Jul 2020 16:50:41 +0300
Subject: [PATCH] WIP Implement the response mechanism

---
 applications/tools/simple_viewer/main.cpp     |  13 +-
 applications/vision/src/main.cpp              |   7 +-
 .../streams/include/ftl/streams/sender.hpp    |   4 +
 components/streams/src/receiver.cpp           |   6 +-
 components/streams/src/sender.cpp             | 125 ++++++++++++++++++
 components/structures/src/new_frame.cpp       |   4 +-
 components/structures/src/pool.cpp            |   3 +-
 7 files changed, 154 insertions(+), 8 deletions(-)

diff --git a/applications/tools/simple_viewer/main.cpp b/applications/tools/simple_viewer/main.cpp
index 2710530cc..e8d10030a 100644
--- a/applications/tools/simple_viewer/main.cpp
+++ b/applications/tools/simple_viewer/main.cpp
@@ -116,7 +116,9 @@ static void run(ftl::Configurable *root) {
 	if (stream_uris.size() > 0) {
 		ftl::stream::Muxer *stream = ftl::create<ftl::stream::Muxer>(root, "muxstream");
 		ftl::stream::Receiver *gen = ftl::create<ftl::stream::Receiver>(root, "receiver", &pool);
+		ftl::stream::Sender *sender = ftl::create<ftl::stream::Sender>(root, "sender");
 		gen->setStream(stream);
+		sender->setStream(stream);
 
 		int count = 0;
 		for (auto &s : stream_uris) {
@@ -131,6 +133,11 @@ static void run(ftl::Configurable *root) {
 		generators.push_back(gen);
 		stream->begin();
 		stream->select(0, Channel::Colour + Channel::Depth, true);
+
+		handles.push_back(std::move(pool.session(ftl::data::FrameID(0,0)).onFlush([sender](ftl::data::Frame &f, ftl::codecs::Channel c) {
+			sender->post(f, c);
+			return true;
+		})));
 	}
 
 	for (auto *g : generators) {
@@ -144,7 +151,11 @@ static void run(ftl::Configurable *root) {
 				}
 			}
 
-			cv::waitKey(1);
+			int k = cv::waitKey(10);
+			if (k >= 0) {
+				auto rf = fs->firstFrame().response();
+				rf.create<int>(Channel::Control) = k;
+			}
 
 			return true;
 		})));
diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp
index 4c3553a4c..9aa57e3ca 100644
--- a/applications/vision/src/main.cpp
+++ b/applications/vision/src/main.cpp
@@ -160,7 +160,7 @@ static void run(ftl::Configurable *root) {
 	std::shared_ptr<ftl::streams::BaseBuilder> creatorptr(creator);
 
 	ftl::stream::Receiver *receiver = ftl::create<ftl::stream::Receiver>(root, "receiver", &pool);
-	//receiver->setStream(outstream);
+	receiver->setStream(outstream);
 	receiver->registerBuilder(creatorptr);
 
 	// Listen for any flush events for frameset 0
@@ -194,6 +194,11 @@ static void run(ftl::Configurable *root) {
 		// Lock colour right now to encode in parallel
 		fs->flush(ftl::codecs::Channel::Colour);
 
+		// TODO: Remove, this is debug code
+		if (fs->firstFrame().changed(ftl::codecs::Channel::Control)) {
+			LOG(INFO) << "Got control: " << fs->firstFrame().get<int>(ftl::codecs::Channel::Control);
+		}
+
 		// Do all processing in another thread...
 		ftl::pool.push([sender,&stats_count,&latency,&frames,pipeline,&busy,fs](int id) {
 			// Do pipeline here...
diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp
index 5e1045d07..45574e731 100644
--- a/components/streams/include/ftl/streams/sender.hpp
+++ b/components/streams/include/ftl/streams/sender.hpp
@@ -29,6 +29,8 @@ class Sender : public ftl::Configurable {
 	 */
 	void post(ftl::data::FrameSet &fs, ftl::codecs::Channel c);
 
+	void post(ftl::data::Frame &f, ftl::codecs::Channel c);
+
 	/**
 	 * Encode and transmit a set of audio channels.
 	 */
@@ -66,9 +68,11 @@ class Sender : public ftl::Configurable {
 
 	//ftl::codecs::Encoder *_getEncoder(int fsid, int fid, ftl::codecs::Channel c);
 	void _encodeChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
+	void _encodeChannel(ftl::data::Frame &f, ftl::codecs::Channel c, bool reset);
 	void _encodeVideoChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
 	void _encodeAudioChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
 	void _encodeDataChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset);
+	void _encodeDataChannel(ftl::data::Frame &fs, ftl::codecs::Channel c, bool reset);
 
 	int _generateTiles(const ftl::rgbd::FrameSet &fs, int offset, ftl::codecs::Channel c, cv::cuda::Stream &stream, bool, bool);
 	EncodingState &_getTile(int fsid, ftl::codecs::Channel c);
diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp
index 68b32c8cf..e0531ead0 100644
--- a/components/streams/src/receiver.cpp
+++ b/components/streams/src/receiver.cpp
@@ -407,6 +407,9 @@ void Receiver::setStream(ftl::stream::Stream *s) {
 	handle_ = s->onPacket([this](const StreamPacket &spkt, const Packet &pkt) {
 		const unsigned int channum = (unsigned int)spkt.channel;
 
+		// Dummy no data packet.
+		if (pkt.data.size() == 0) return true;
+
 		//LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec << ", " << (int)pkt.definition;
 
 		// TODO: Allow for multiple framesets
@@ -423,9 +426,6 @@ void Receiver::setStream(ftl::stream::Stream *s) {
 		//if (spkt.frameNumber() >= value("max_frames",32)) return;
 		if (spkt.frameNumber() >= 32 || ((1 << spkt.frameNumber()) & frame_mask_) == 0) return true;
 
-		// Dummy no data packet.
-		if (pkt.data.size() == 0) return true;
-
 
 		if (channum >= 64) {
 			_processData(spkt,pkt);
diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp
index 5d9671bc1..fdfd1320a 100644
--- a/components/streams/src/sender.cpp
+++ b/components/streams/src/sender.cpp
@@ -45,6 +45,8 @@ void Sender::setStream(ftl::stream::Stream*s) {
 	//if (stream_) stream_->onPacket(nullptr);
 	stream_ = s;
 	handle_ = stream_->onPacket([this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
+		if (pkt.data.size() > 0) return true;
+
 		LOG(INFO) << "SENDER REQUEST : " << (int)spkt.channel;
 
 		//if (state_cb_) state_cb_(spkt.channel, spkt.streamID, spkt.frame_number);
@@ -287,6 +289,91 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) {
 	//do_inject_ = false;
 }
 
+void Sender::post(ftl::data::Frame &frame, ftl::codecs::Channel c) {
+	if (!stream_) return;
+
+	FTL_Profile("SenderPost", 0.02);
+
+	bool available = false;
+	bool needs_encoding = true;
+
+		// FIXME: Allow data channel selection rather than always send
+		/*for (auto c : frame.getDataChannels()) {
+			StreamPacket spkt;
+			spkt.version = 4;
+			spkt.timestamp = fs.timestamp;
+			spkt.streamID = 0; //fs.id;
+			spkt.frame_number = i;
+			spkt.channel = c;
+
+			ftl::codecs::Packet pkt;
+			pkt.codec = ftl::codecs::codec_t::MSGPACK;
+			pkt.frame_count = 1;
+			pkt.flags = 0;
+			pkt.bitrate = 0;
+			pkt.data = frame.getRawData(c);
+			stream_->post(spkt, pkt);
+		}*/
+
+		//for (auto ic : frame.changed()) {
+			//auto c = ic.first;
+			if (true) { //if (selected.has(c)) {
+				// FIXME: Sends high res colour, but receive end currently broken
+				//auto cc = (c == Channel::Colour && frame.hasChannel(Channel::ColourHighRes)) ? Channel::ColourHighRes : c;
+				auto cc = c;
+
+				StreamPacket spkt;
+				spkt.version = 4;
+				spkt.timestamp = frame.timestamp();
+				spkt.streamID = 0; //fs.id;
+				spkt.frame_number = frame.source();
+				spkt.channel = c;
+
+				// Check if there are existing encoded packets
+				const auto &packets = frame.getEncoded(cc);
+				if (packets.size() > 0) {
+					needs_encoding = false;
+					if (packets.size() > 1) {
+						LOG(WARNING) << "Multi-packet send: " << (int)cc;
+						ftl::codecs::Packet pkt;
+						//mergeNALUnits(packets, pkt);
+						//stream_->post(spkt, pkt);
+					} else {
+						// Send existing encoding instead of re-encoding
+						//for (auto &pkt : packets) {
+						stream_->post(spkt, packets.front());
+						//}
+					}
+				}
+			} else {
+				available = true;
+			}
+		//}
+
+	if (available) {
+		// Not selected so send an empty packet...
+		StreamPacket spkt;
+		spkt.version = 4;
+		spkt.timestamp = frame.timestamp();
+		spkt.streamID = frame.frameset();
+		spkt.frame_number = 255;
+		spkt.channel = c;
+
+		Packet pkt;
+		pkt.codec = codec_t::Any;
+		pkt.frame_count = 1;
+		pkt.bitrate = 0;
+		stream_->post(spkt, pkt);
+	}
+
+	if (needs_encoding) {
+		// TODO: One thread per channel.
+		_encodeChannel(frame, c, false);
+	}
+
+	//do_inject_ = false;
+}
+
 void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
 	bool lossless = value("lossless", false);
 	int max_bitrate = std::max(0, std::min(255, value("max_bitrate", 255)));
@@ -466,6 +553,32 @@ void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset)
 	}
 }
 
+void Sender::_encodeDataChannel(ftl::data::Frame &f, Channel c, bool reset) {
+	int i=0;
+
+	StreamPacket spkt;
+	spkt.version = 4;
+	spkt.timestamp = f.timestamp();
+	spkt.streamID = f.frameset();
+	spkt.frame_number = f.source();
+	spkt.channel = c;
+
+	ftl::codecs::Packet pkt;
+	pkt.frame_count = 1;
+	pkt.codec = codec_t::MSGPACK;
+	pkt.bitrate = 255;
+	pkt.flags = 0;
+	
+	auto encoder = ftl::data::getTypeEncoder(f.type(c));
+	if (encoder) {
+		if (encoder(f, c, pkt.data)) {
+			stream_->post(spkt, pkt);
+		}
+	} else {
+		LOG(WARNING) << "Missing msgpack encoder";
+	}
+}
+
 void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
 	int ic = int(c);
 
@@ -478,6 +591,18 @@ void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) {
 	}
 }
 
+void Sender::_encodeChannel(ftl::data::Frame &frame, Channel c, bool reset) {
+	int ic = int(c);
+
+	if (ic < 32) {
+		//_encodeVideoChannel(frame, c, reset);
+	} else if (ic < 64) {
+		//_encodeAudioChannel(frame, c, reset);
+	} else {
+		_encodeDataChannel(frame, c, reset);
+	}
+}
+
 cv::Rect Sender::_generateROI(const ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, int offset, bool stereo) {
 	const ftl::data::Frame &cframe = fs.firstFrame();
 	int rwidth = cframe.get<cv::cuda::GpuMat>(c).cols;
diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp
index c79bc2cfe..6a1ef932c 100644
--- a/components/structures/src/new_frame.cpp
+++ b/components/structures/src/new_frame.cpp
@@ -394,7 +394,7 @@ void Session::flush(Frame &f) {
 			auto &d = f._getData(c.first);
 			if (d.status == ftl::data::ChannelStatus::DISPATCHED) {
 				d.status = ftl::data::ChannelStatus::FLUSHED;
-				flush_.trigger(f, c.first);
+				//flush_.trigger(f, c.first);
 			}
 		}
 	}
@@ -412,7 +412,7 @@ void Session::flush(Frame &f, ftl::codecs::Channel c) {
 		auto &d = f._getData(c);
 		if (d.status == ftl::data::ChannelStatus::DISPATCHED) {
 			d.status = ftl::data::ChannelStatus::FLUSHED;
-			flush_.trigger(f, c);
+			//flush_.trigger(f, c);
 		}
 	}
 }
diff --git a/components/structures/src/pool.cpp b/components/structures/src/pool.cpp
index d022f5474..ff2484e13 100644
--- a/components/structures/src/pool.cpp
+++ b/components/structures/src/pool.cpp
@@ -26,7 +26,8 @@ Frame Pool::allocate(FrameID id, int64_t timestamp) {
 		auto &pool = _getPool(id);
 
 		if (timestamp < pool.last_timestamp) {
-			throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp);
+			timestamp = pool.last_timestamp;
+			//throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp);
 		}
 
 		// Add items as required
-- 
GitLab