From bd7455ab880f639057260d86ceaa9378d448ec71 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Wed, 9 Oct 2019 13:26:47 +0300
Subject: [PATCH] Resolves #159 proxy of sources

---
 applications/reconstruct/src/main.cpp         |  1 +
 .../rgbd-sources/include/ftl/rgbd/group.hpp   | 17 ++++++
 .../rgbd-sources/include/ftl/rgbd/source.hpp  | 19 ++++++
 .../include/ftl/rgbd/streamer.hpp             | 15 ++++-
 components/rgbd-sources/src/group.cpp         | 13 ++++
 components/rgbd-sources/src/net.cpp           | 29 ++++-----
 components/rgbd-sources/src/source.cpp        | 22 +++++++
 components/rgbd-sources/src/streamer.cpp      | 61 +++++++++++++++----
 8 files changed, 151 insertions(+), 26 deletions(-)

diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp
index 52d9b603e..2ea940813 100644
--- a/applications/reconstruct/src/main.cpp
+++ b/applications/reconstruct/src/main.cpp
@@ -160,6 +160,7 @@ static void run(ftl::Configurable *root) {
 	}
 
 	stream->setLatency(5);  // FIXME: This depends on source!?
+	stream->add(&group);
 	stream->run();
 
 	bool busy = false;
diff --git a/components/rgbd-sources/include/ftl/rgbd/group.hpp b/components/rgbd-sources/include/ftl/rgbd/group.hpp
index 0ded29e80..3c7b26e17 100644
--- a/components/rgbd-sources/include/ftl/rgbd/group.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/group.hpp
@@ -6,6 +6,7 @@
 #include <ftl/timer.hpp>
 #include <ftl/rgbd/frame.hpp>
 #include <ftl/rgbd/frameset.hpp>
+#include <ftl/codecs/packet.hpp>
 
 #include <opencv2/opencv.hpp>
 #include <vector>
@@ -65,6 +66,22 @@ class Group {
 	 */
 	void sync(std::function<bool(FrameSet &)>);
 
+	/**
+	 * Whenever any source within the group receives raw data, this callback
+	 * will be called with that raw data. This is used to allow direct data
+	 * capture (to disk) or proxy over a network without needing to re-encode.
+	 * There is no guarantee about order or timing and the callback itself will
+	 * need to ensure synchronisation of timestamps.
+	 */
+	void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
+
+	/**
+	 * Removes a raw data callback from all sources in the group.
+	 */
+	void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
+
+	inline std::vector<Source*> sources() const { return sources_; }
+
 	/** @deprecated */
 	//bool getFrames(FrameSet &, bool complete=false);
 
diff --git a/components/rgbd-sources/include/ftl/rgbd/source.hpp b/components/rgbd-sources/include/ftl/rgbd/source.hpp
index 0ee163add..4c27baf86 100644
--- a/components/rgbd-sources/include/ftl/rgbd/source.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/source.hpp
@@ -8,6 +8,7 @@
 #include <ftl/net/universe.hpp>
 #include <ftl/uri.hpp>
 #include <ftl/rgbd/detail/source.hpp>
+#include <ftl/codecs/packet.hpp>
 #include <opencv2/opencv.hpp>
 #include <Eigen/Eigen>
 #include <string>
@@ -201,9 +202,26 @@ class Source : public ftl::Configurable {
 	SHARED_MUTEX &mutex() { return mutex_; }
 
 	std::function<void(int64_t, cv::Mat &, cv::Mat &)> &callback() { return callback_; }
+
+	/**
+	 * Set the callback that receives decoded frames as they are generated.
+	 */
 	void setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb);
 	void removeCallback() { callback_ = nullptr; }
 
+	/**
+	 * Add a callback to immediately receive any raw data from this source.
+	 * Currently this only works for a net source since other sources don't
+	 * produce raw encoded data.
+	 */
+	void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
+
+	void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
+
+	/**
+	 * INTERNAL. Used to send raw data to callbacks.
+	 */
+	void notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt);
 
 	protected:
 	detail::Source *impl_;
@@ -220,6 +238,7 @@ class Source : public ftl::Configurable {
 	cudaStream_t stream_;
 	int64_t timestamp_;
 	std::function<void(int64_t, cv::Mat &, cv::Mat &)> callback_;
+	std::list<std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>> rawcallbacks_;
 
 	detail::Source *_createImplementation();
 	detail::Source *_createFileImpl(const ftl::URI &uri);
diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
index 7c6e6f479..642add6dc 100644
--- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
@@ -101,6 +101,11 @@ class Streamer : public ftl::Configurable {
 	 */
 	void add(Source *);
 
+	/**
+	 * Allow all sources in another group to be proxy streamed by this streamer.
+	 */
+	void add(ftl::rgbd::Group *grp);
+
 	void remove(Source *);
 	void remove(const std::string &);
 
@@ -130,6 +135,7 @@ class Streamer : public ftl::Configurable {
 	private:
 	ftl::rgbd::Group group_;
 	std::map<std::string, detail::StreamSource*> sources_;
+	std::list<ftl::rgbd::Group*> proxy_grps_;
 	//ctpl::thread_pool pool_;
 	SHARED_MUTEX mutex_;
 	bool active_;
@@ -152,10 +158,17 @@ class Streamer : public ftl::Configurable {
 
 	ftl::codecs::device_t hq_devices_;
 
+	enum class Quality {
+		High,
+		Low,
+		Any
+	};
+
 	void _process(ftl::rgbd::FrameSet &);
 	void _cleanUp();
 	void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
-	void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly);
+	void _transmitPacket(detail::StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q);
+	void _transmitPacket(detail::StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q);
 
 	//void _encodeHQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
 	//void _encodeLQAndTransmit(detail::StreamSource *src, const cv::Mat &, const cv::Mat &, int chunk);
diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp
index 96ca3a82f..8dec0292a 100644
--- a/components/rgbd-sources/src/group.cpp
+++ b/components/rgbd-sources/src/group.cpp
@@ -222,9 +222,22 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
 		return true;
 	});
 
+	LOG(INFO) << "Start timer";
 	ftl::timer::start(true);
 }
 
+void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
+	for (auto s : sources_) {
+		s->addRawCallback(f);
+	}
+}
+
+void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
+	for (auto s : sources_) {
+		s->removeRawCallback(f);
+	}
+}
+
 //ftl::rgbd::FrameSet &Group::_getRelativeFrameset(int rel) {
 //	int idx = (rel < 0) ? (head_+kFrameBufferSize+rel)%kFrameBufferSize : (head_+rel)%kFrameBufferSize;
 //	return framesets_[idx];
diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp
index 4f1f5b141..416defb9a 100644
--- a/components/rgbd-sources/src/net.cpp
+++ b/components/rgbd-sources/src/net.cpp
@@ -242,29 +242,30 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk
 	int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count();
 	if (!active_) return;
 
+	// Allow acccess to the raw data elsewhere...
+	host_->notifyRaw(spkt, pkt);
+
 	const ftl::rgbd::Channel chan = host_->getChannel();
 	int rchan = spkt.channel & 0x1;
 
-	// Ignore any unwanted second channel
-	if (chan == ftl::rgbd::Channel::None && rchan > 0) {
-		LOG(INFO) << "Unwanted channel";
-		//return;
-		// TODO: Allow decode to be skipped
-	}
-
 	NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3));
 
 	// Update frame statistics
 	frame.tx_size += pkt.data.size();
 
-	_createDecoder(rchan, pkt);
-	auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_;
-	if (!decoder) {
-		LOG(ERROR) << "No frame decoder available";
-		return;
-	}
+	// Ignore any unwanted second channel
+	if (!(chan == ftl::rgbd::Channel::None && rchan > 0)) {
+		_createDecoder(rchan, pkt);
+		auto *decoder = (rchan == 0) ? decoder_c1_ : decoder_c2_;
+		if (!decoder) {
+			LOG(ERROR) << "No frame decoder available";
+			return;
+		}
 
-	decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2);
+		decoder->decode(pkt, (rchan == 0) ? frame.channel1 : frame.channel2);
+	} else {
+		//LOG(INFO) << "Unwanted frame";
+	}
 
 	// Apply colour correction to chunk
 	//ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_);
diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp
index 35d23f27a..4ec34a5e5 100644
--- a/components/rgbd-sources/src/source.cpp
+++ b/components/rgbd-sources/src/source.cpp
@@ -310,3 +310,25 @@ void Source::setCallback(std::function<void(int64_t, cv::Mat &, cv::Mat &)> cb)
 	if (bool(callback_)) LOG(ERROR) << "Source already has a callback: " << getURI();
 	callback_ = cb;
 }
+
+void Source::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
+	UNIQUE_LOCK(mutex_,lk);
+	rawcallbacks_.push_back(f);
+}
+
+void Source::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
+	UNIQUE_LOCK(mutex_,lk);
+	for (auto i=rawcallbacks_.begin(); i!=rawcallbacks_.end(); ++i) {
+		if (i->target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>() == f.target<void(*)(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)>()) {
+			rawcallbacks_.erase(i);
+			return;
+		}
+	}
+}
+
+void Source::notifyRaw(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
+	SHARED_LOCK(mutex_,lk);
+	for (auto &i : rawcallbacks_) {
+		i(this, spkt, pkt);
+	}
+}
diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp
index 7a9118c9f..4290a2890 100644
--- a/components/rgbd-sources/src/streamer.cpp
+++ b/components/rgbd-sources/src/streamer.cpp
@@ -174,6 +174,38 @@ void Streamer::add(Source *src) {
 	net_->broadcast("add_stream", src->getID());
 }
 
+void Streamer::add(ftl::rgbd::Group *grp) {
+	auto srcs = grp->sources();
+	for (auto src : srcs) {
+		{
+			UNIQUE_LOCK(mutex_,ulk);
+			if (sources_.find(src->getID()) != sources_.end()) return;
+
+			StreamSource *s = new StreamSource;
+			s->src = src;
+			//s->prev_depth = cv::Mat(cv::Size(src->parameters().width, src->parameters().height), CV_16SC1, 0);
+			s->jobs = 0;
+			s->frame = 0;
+			s->clientCount = 0;
+			s->hq_count = 0;
+			s->lq_count = 0;
+			sources_[src->getID()] = s;
+
+			//group_.addSource(src);
+
+			src->addRawCallback([this,s](Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
+				//LOG(INFO) << "RAW CALLBACK";
+				_transmitPacket(s, spkt, pkt, Quality::Any);
+			});
+		}
+
+		LOG(INFO) << "Proxy Streaming: " << src->getID();
+		net_->broadcast("add_stream", src->getID());
+	}
+
+	LOG(INFO) << "All proxy streams added";
+}
+
 void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) {
 	StreamSource *s = nullptr;
 
@@ -349,10 +381,11 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
 	// Prevent new clients during processing.
 	SHARED_LOCK(mutex_,slk);
 
-	if (fs.sources.size() != sources_.size()) {
-		LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
-		return;
-	}
+	// This check is not valid, always assume fs.sources is correct
+	//if (fs.sources.size() != sources_.size()) {
+	//	LOG(ERROR) << "Incorrect number of sources in frameset: " << fs.sources.size() << " vs " << sources_.size();
+		//return;
+	//}
 
 	int totalclients = 0;
 
@@ -390,14 +423,15 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
 				// TODO: Each encode could be done in own thread
 				if (hasChan2) {
 					enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
-						_transmitPacket(src, blk, 1, hasChan2, true);
+						_transmitPacket(src, blk, 1, hasChan2, Quality::High);
 					});
 				} else {
 					if (enc2) enc2->reset();
 				}
 
+				if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset();
 				enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
-					_transmitPacket(src, blk, 0, hasChan2, true);
+					_transmitPacket(src, blk, 0, hasChan2, Quality::High);
 				});
 			}
 		}
@@ -418,14 +452,14 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
 				// Receiver only waits for channel 1 by default
 				if (hasChan2) {
 					enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
-						_transmitPacket(src, blk, 1, hasChan2, false);
+						_transmitPacket(src, blk, 1, hasChan2, Quality::Low);
 					});
 				} else {
 					if (enc2) enc2->reset();
 				}
 
 				enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->lq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){
-					_transmitPacket(src, blk, 0, hasChan2, false);
+					_transmitPacket(src, blk, 0, hasChan2, Quality::Low);
 				});
 			}
 		}
@@ -491,19 +525,24 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) {
 	} else _cleanUp();
 }
 
-void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, bool hqonly) {
+void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q) {
 	ftl::codecs::StreamPacket spkt = {
 		frame_no_,
 		static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0))
 	};
 
+	_transmitPacket(src, spkt, pkt, q);
+}
+
+void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt, Quality q) {
 	// Lock to prevent clients being added / removed
 	//SHARED_LOCK(src->mutex,lk);
 	auto c = src->clients.begin();
 	while (c != src->clients.end()) {
 		const ftl::codecs::preset_t b = (*c).preset;
-		if ((hqonly && b >= kQualityThreshold) || (!hqonly && b < kQualityThreshold)) {
+		if ((q == Quality::High && b >= kQualityThreshold) || (q == Quality::Low && b < kQualityThreshold)) {
 			++c;
+			LOG(INFO) << "INCORRECT QUALITY";
 			continue;
 		}
 
@@ -520,7 +559,7 @@ void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt
 				(*c).txcount = (*c).txmax;
 			} else {
 				// Count frame as completed only if last block and channel is 0
-				if (pkt.block_number == pkt.block_total - 1 && chan == 0) ++(*c).txcount;
+				if (pkt.block_number == pkt.block_total - 1 && spkt.channel & 0x1 == 0) ++(*c).txcount;
 			}
 		} catch(...) {
 			(*c).txcount = (*c).txmax;
-- 
GitLab