From 2dbf7045fcb3f739301436cad2984fd9cb83a556 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Wed, 19 Aug 2020 20:15:31 +0300
Subject: [PATCH] Improve pipeline callback code

---
 applications/vision/src/main.cpp              | 26 ++----
 .../include/ftl/operators/operator.hpp        | 17 +---
 components/operators/src/depth.cpp            | 44 ++++------
 components/operators/src/operator.cpp         | 82 ++++++++-----------
 components/streams/src/feed.cpp               |  6 +-
 5 files changed, 65 insertions(+), 110 deletions(-)

diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp
index fbd809e40..33f3b0901 100644
--- a/applications/vision/src/main.cpp
+++ b/applications/vision/src/main.cpp
@@ -222,26 +222,18 @@ static void run(ftl::Configurable *root) {
 
 		fs->set(ftl::data::FSFlag::AUTO_SEND);
 
-		if (busy.test_and_set()) {
+		bool did_pipe = pipeline->apply(*fs, *fs, [fs,&frames,&latency,&busy]() {
+			if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth);
+			++frames;
+			latency += float(ftl::timer::get_time() - fs->timestamp());
+			const_cast<ftl::data::FrameSetPtr&>(fs).reset();
+		});
+
+		if (!did_pipe) {
 			LOG(WARNING) << "Depth pipeline drop: " << fs->timestamp();
 			fs->firstFrame().message(ftl::data::Message::Warning_PIPELINE_DROP, "Depth pipeline drop");
-		} else {
-			pipeline->apply(*fs, *fs, nullptr, fs->frames[0].stream());
-
-			cudaCallback(fs->frames[0].stream(), [fs,&frames,&latency,&busy]() {
-				busy.clear();
-
-				// Must be in another thread and not callback directly
-				ftl::pool.push([fs,&frames,&latency](int id) {
-					if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth);
-					++frames;
-					latency += float(ftl::timer::get_time() - fs->timestamp());
-					const_cast<ftl::data::FrameSetPtr&>(fs).reset();
-				});
-
-				// FIXME: Possible issue if fs is flushed in this thread...
-			});
 		}
+	
 
 		// Do some encoding (eg. colour) whilst pipeline runs
 		ftl::pool.push([fs,&stats_count,&latency,&frames,&stats_time,&busy](int id){
diff --git a/components/operators/include/ftl/operators/operator.hpp b/components/operators/include/ftl/operators/operator.hpp
index 97fd83e69..86c406caa 100644
--- a/components/operators/include/ftl/operators/operator.hpp
+++ b/components/operators/include/ftl/operators/operator.hpp
@@ -125,19 +125,9 @@ class Graph : public ftl::Configurable {
 	template <typename T, typename... ARGS>
 	ftl::Configurable *append(const std::string &name, ARGS...);
 
-	void trigger();
-
-	bool apply(ftl::rgbd::Frame &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr, cudaStream_t stream=0);
-	bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, const std::function<void()> &cb=nullptr, cudaStream_t stream=0);
-	bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr, cudaStream_t stream=0);
-
-	bool asyncApply(ftl::rgbd::Frame &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr, cudaStream_t stream=0);
-	bool asyncApply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, const std::function<void()> &cb=nullptr, cudaStream_t stream=0);
-	bool asyncApply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr, cudaStream_t stream=0);
-
-	void sync(cudaStream_t stream=0);
-
-	cudaStream_t getStream() const { return stream_; }
+	bool apply(ftl::rgbd::Frame &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr);
+	bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, const std::function<void()> &cb=nullptr);
+	bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr);
 
 	/**
 	 * Make sure all async operators have also completed. This is automatically
@@ -158,7 +148,6 @@ class Graph : public ftl::Configurable {
 	private:
 	std::list<ftl::operators::detail::OperatorNode> operators_;
 	std::map<std::string, ftl::Configurable*> configs_;
-	cudaStream_t stream_;
 	std::atomic_flag busy_;
 	std::unordered_map<uint32_t,cv::cuda::GpuMat> buffers_;
 	std::unordered_set<uint32_t> valid_buffers_;
diff --git a/components/operators/src/depth.cpp b/components/operators/src/depth.cpp
index 5344e3c72..11a8d2243 100644
--- a/components/operators/src/depth.cpp
+++ b/components/operators/src/depth.cpp
@@ -172,11 +172,7 @@ bool DepthChannel::apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, cuda
 
 	rbuf_.resize(in.frames.size());
 
-	/*if (in.frames.size() > 0) {
-		if (depth_size_.width == 0) {
-			depth_size_ = in.firstFrame().getLeft()
-		}
-	}*/
+	int valid_count = 0;
 
 	for (size_t i=0; i<in.frames.size(); ++i) {
 		if (!in.hasFrame(i)) continue;
@@ -189,20 +185,23 @@ bool DepthChannel::apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, cuda
 				if (!cdata.enabled) continue;
 			}
 
-			_createPipeline(in.frames.size());
-
 			const cv::cuda::GpuMat& left = f.get<cv::cuda::GpuMat>(Channel::Left);
 			const cv::cuda::GpuMat& right = f.get<cv::cuda::GpuMat>(Channel::Right);
+			if (left.empty() || right.empty()) continue;
+
 			cv::cuda::GpuMat& depth = f.create<cv::cuda::GpuMat>(Channel::Depth);
 
 			const auto &intrin = f.getLeft();
 			depth.create(intrin.height, intrin.width, CV_32FC1);
-
-			if (left.empty() || right.empty()) continue;
-			pipe_->apply(f, f, nullptr, stream);
+			++valid_count;
 		}
 	}
 
+	if (valid_count > 0) {
+		_createPipeline(in.frames.size());
+		pipe_->apply(in, out);
+	}
+
 	return true;
 }
 
@@ -213,28 +212,21 @@ bool DepthChannel::apply(ftl::rgbd::Frame &in, ftl::rgbd::Frame &out, cudaStream
 
 	auto &f = in;
 	if (!f.hasChannel(Channel::Depth) && f.hasChannel(Channel::Right)) {
-		_createPipeline(1);
+		if (f.hasChannel(Channel::CalibrationData)) {
+			auto &cdata = f.get<ftl::calibration::CalibrationData>(Channel::CalibrationData);
+			if (!cdata.enabled) return true;
+		}
 
 		const cv::cuda::GpuMat& left = f.get<cv::cuda::GpuMat>(Channel::Left);
 		const cv::cuda::GpuMat& right = f.get<cv::cuda::GpuMat>(Channel::Right);
-		cv::cuda::GpuMat& depth = f.create<cv::cuda::GpuMat>(Channel::Depth);
-		depth.create(depth_size_, CV_32FC1);
-
 		if (left.empty() || right.empty()) return false;
+		
+		_createPipeline(1);
 
-		/*if (depth_size_ != left.size()) {
-			auto &col2 = f.create<cv::cuda::GpuMat>(Channel::ColourHighRes);
-			cv::cuda::resize(left, col2, depth_size_, 0.0, 0.0, cv::INTER_CUBIC, cvstream);
-			f.createTexture<uchar4>(Channel::ColourHighRes, true);
-			f.swapChannels(Channel::Colour, Channel::ColourHighRes);
-		}
-
-		if (depth_size_ != right.size()) {
-			cv::cuda::resize(right, rbuf_[i], depth_size_, 0.0, 0.0, cv::INTER_CUBIC, cvstream);
-			cv::cuda::swap(right, rbuf_[i]);
-		}*/
+		cv::cuda::GpuMat& depth = f.create<cv::cuda::GpuMat>(Channel::Depth);
+		depth.create(depth_size_, CV_32FC1);
 
-		pipe_->apply(f, f, nullptr, stream);
+		pipe_->apply(f, f);
 	}
 
 	return true;
diff --git a/components/operators/src/operator.cpp b/components/operators/src/operator.cpp
index 007a96490..2402aed07 100644
--- a/components/operators/src/operator.cpp
+++ b/components/operators/src/operator.cpp
@@ -35,7 +35,6 @@ bool Operator::apply(FrameSet &in, Frame &out, cudaStream_t stream) {
 
 
 Graph::Graph(nlohmann::json &config) : ftl::Configurable(config) {
-	cudaSafeCall( cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking) );
 	busy_.clear();
 }
 
@@ -49,7 +48,6 @@ Graph::~Graph() {
 			delete i;
 		}
 	}
-	cudaStreamDestroy(stream_);
 }
 
 cv::cuda::GpuMat &Graph::createBuffer(ftl::operators::Buffer b, uint32_t fid) {
@@ -70,47 +68,18 @@ bool Graph::hasBuffer(ftl::operators::Buffer b, uint32_t fid) const {
 	return valid_buffers_.count((uint32_t(b) << 8) + fid) > 0;
 }
 
-static void cuda_callback(void *udata) {
-	auto *g = (Graph*)udata;
-	g->trigger();
-}
-
-void Graph::trigger() {
-	ftl::pool.push([this](int id) { callback_(); busy_.clear(); });
-}
-
-bool Graph::apply(FrameSet &in, FrameSet &out, const std::function<void()> &cb, cudaStream_t stream) {
-	bool res = asyncApply(in, out, cb, stream);
-	sync(stream);
-	return res;
-}
-
-void Graph::sync(cudaStream_t stream) {
-	auto stream_actual = (stream == 0) ? stream_ : stream;
+bool Graph::apply(FrameSet &in, FrameSet &out, const std::function<void()> &cb) {
+	if (!value("enabled", true)) return true;
+	if (in.frames.size() < 1) return true;
 
-	if (stream == 0) {
-		if (callback_) {
-			cudaSafeCall(cudaLaunchHostFunc(stream_actual, cuda_callback, this));
-		} else {
-			cudaSafeCall(cudaStreamSynchronize(stream_actual));
-			busy_.clear();
-		}
-	} else {
-		busy_.clear();
-	}
-}
-
-bool Graph::asyncApply(FrameSet &in, FrameSet &out, const std::function<void()> &cb, cudaStream_t stream) {
-	if (!value("enabled", true)) return false;
-
-	auto stream_actual = (stream == 0) ? stream_ : stream;
+	auto stream_actual = in.frames[0].stream();
 	bool success = true;
 
-	if (in.frames.size() != out.frames.size()) return false;
+	if (in.frames.size() != out.frames.size()) return true;
 
 	if (busy_.test_and_set()) {
 		LOG(ERROR) << "Pipeline already in use: " << in.timestamp();
-		if (cb) cb();
+		//if (cb) cb();
 		return false;
 	}
 
@@ -174,7 +143,18 @@ bool Graph::asyncApply(FrameSet &in, FrameSet &out, const std::function<void()>
 	}
 
 	success = waitAll(stream_actual) && success;
-	return success;
+
+	if (cb) {
+		cudaCallback(stream_actual, [this,cb]() {
+			busy_.clear();
+			ftl::pool.push([cb](int id) { cb(); });
+		});
+	} else {
+		//cudaSafeCall(cudaStreamSynchronize(stream_actual));
+		busy_.clear();
+	}
+
+	return true;
 }
 
 bool Graph::waitAll(cudaStream_t stream) {
@@ -191,21 +171,15 @@ bool Graph::waitAll(cudaStream_t stream) {
 	return true;
 }
 
-bool Graph::apply(Frame &in, Frame &out, const std::function<void()> &cb, cudaStream_t stream) {
-	bool res = asyncApply(in, out, cb, stream);
-	sync(stream);
-	return res;
-}
-
-bool Graph::asyncApply(Frame &in, Frame &out, const std::function<void()> &cb, cudaStream_t stream) {
-	if (!value("enabled", true)) return false;
+bool Graph::apply(Frame &in, Frame &out, const std::function<void()> &cb) {
+	if (!value("enabled", true)) return true;
 
-	auto stream_actual = (stream == 0) ? stream_ : stream;
+	auto stream_actual = in.stream();
 	bool success = true;
 
 	if (busy_.test_and_set()) {
 		LOG(ERROR) << "Pipeline already in use: " << in.timestamp();
-		if (cb) cb();
+		//if (cb) cb();
 		return false;
 	}
 
@@ -239,8 +213,18 @@ bool Graph::asyncApply(Frame &in, Frame &out, const std::function<void()> &cb, c
 
 	success = waitAll(stream_actual) && success;
 
+	if (cb) {
+		cudaCallback(stream_actual, [this,cb]() {
+			busy_.clear();
+			ftl::pool.push([cb](int id) { cb(); });
+		});
+	} else {
+		cudaSafeCall(cudaStreamSynchronize(stream_actual));
+		busy_.clear();
+	}
+
 	//busy_.clear();
-	return success;
+	return true;
 }
 
 ftl::Configurable *Graph::_append(ftl::operators::detail::ConstructionHelperBase *m) {
diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp
index e344d4c60..ba46586d5 100644
--- a/components/streams/src/feed.cpp
+++ b/components/streams/src/feed.cpp
@@ -184,10 +184,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
 
 			lk.unlock();
 
-			if (pipeline) pipeline->apply(*fs, *fs, nullptr, fs->frames[0].stream());
-			//cudaSafeCall(cudaStreamSynchronize(fs->frames[0].stream()));
-
-			cudaCallback(fs->frames[0].stream(), [this,fs]() {
+			if (pipeline) pipeline->apply(*fs, *fs, [this,fs]() {
 				SHARED_LOCK(mtx_, lk);
 
 				std::atomic_store(&latest_.at(fs->frameset()), fs);
@@ -221,6 +218,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
 					}
 				}
 			});
+			//cudaSafeCall(cudaStreamSynchronize(fs->frames[0].stream()));
 
 			//lk.lock();
 
-- 
GitLab