From d61f58a586fcf33f9403822b5d8ab95a489bbe42 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Thu, 8 Aug 2019 12:48:49 +0300
Subject: [PATCH] Move rectify to CPU

---
 .../include/ftl/rgbd/streamer.hpp             |   1 +
 components/rgbd-sources/src/calibrate.cpp     |  28 ++-
 components/rgbd-sources/src/calibrate.hpp     |   7 +
 components/rgbd-sources/src/local.cpp         |  86 ++++----
 components/rgbd-sources/src/local.hpp         |  22 +-
 components/rgbd-sources/src/stereovideo.cpp   |   8 +-
 components/rgbd-sources/src/streamer.cpp      | 188 ++++++++++--------
 7 files changed, 187 insertions(+), 153 deletions(-)

diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
index c01237e15..d2beacb21 100644
--- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
+++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp
@@ -127,6 +127,7 @@ class Streamer : public ftl::Configurable {
 	//int drop_count_;
 
 	void _schedule();
+	void _schedule(detail::StreamSource *);
 	void _swap(detail::StreamSource *);
 	void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest);
 	void _encodeAndTransmit(detail::StreamSource *src, int chunk);
diff --git a/components/rgbd-sources/src/calibrate.cpp b/components/rgbd-sources/src/calibrate.cpp
index acc580e2a..c31fea5c5 100644
--- a/components/rgbd-sources/src/calibrate.cpp
+++ b/components/rgbd-sources/src/calibrate.cpp
@@ -159,7 +159,6 @@ void Calibrate::_updateIntrinsics() {
 	// TODO: pass parameters?
 
 	Mat R1, R2, P1, P2;
-	std::pair<Mat, Mat> map1, map2;
 	ftl::rgbd::Camera params();
 
 	if (this->value("use_intrinsics", true)) {
@@ -182,14 +181,14 @@ void Calibrate::_updateIntrinsics() {
 	C_l_ = P1;
 	C_r_ = P2;
 
-	initUndistortRectifyMap(M1_, D1_, R1, P1, img_size_, CV_32FC1, map1.first, map2.first);
-	initUndistortRectifyMap(M2_, D2_, R2, P2, img_size_, CV_32FC1, map1.second, map2.second);
+	initUndistortRectifyMap(M1_, D1_, R1, P1, img_size_, CV_32FC1, map1_.first, map2_.first);
+	initUndistortRectifyMap(M2_, D2_, R2, P2, img_size_, CV_32FC1, map1_.second, map2_.second);
 
 	// CHECK Is this thread safe!!!!
-	map1_gpu_.first.upload(map1.first);
-	map1_gpu_.second.upload(map1.second);
-	map2_gpu_.first.upload(map2.first);
-	map2_gpu_.second.upload(map2.second);
+	map1_gpu_.first.upload(map1_.first);
+	map1_gpu_.second.upload(map1_.second);
+	map2_gpu_.first.upload(map2_.first);
+	map2_gpu_.second.upload(map2_.second);
 }
 
 void Calibrate::rectifyStereo(GpuMat &l, GpuMat &r, Stream &stream) {
@@ -204,6 +203,21 @@ void Calibrate::rectifyStereo(GpuMat &l, GpuMat &r, Stream &stream) {
 	r = r_tmp;
 }
 
+void Calibrate::rectifyStereo(cv::Mat &l, cv::Mat &r) {
+	// cv::cuda::remap() can not use same Mat for input and output
+
+	cv::remap(l, l, map1_.first, map2_.first, cv::INTER_LINEAR, 0, cv::Scalar());
+	cv::remap(r, r, map1_.second, map2_.second, cv::INTER_LINEAR, 0, cv::Scalar());
+
+	/*GpuMat l_tmp(l.size(), l.type());
+	GpuMat r_tmp(r.size(), r.type());
+	cv::cuda::remap(l, l_tmp, map1_gpu_.first, map2_gpu_.first, cv::INTER_LINEAR, 0, cv::Scalar(), stream);
+	cv::cuda::remap(r, r_tmp, map1_gpu_.second, map2_gpu_.second, cv::INTER_LINEAR, 0, cv::Scalar(), stream);
+	stream.waitForCompletion();
+	l = l_tmp;
+	r = r_tmp;*/
+}
+
 bool Calibrate::isCalibrated() {
 	return calibrated_;
 }
\ No newline at end of file
diff --git a/components/rgbd-sources/src/calibrate.hpp b/components/rgbd-sources/src/calibrate.hpp
index c5f478683..7f6b262c1 100644
--- a/components/rgbd-sources/src/calibrate.hpp
+++ b/components/rgbd-sources/src/calibrate.hpp
@@ -40,6 +40,11 @@ class Calibrate : public ftl::Configurable {
 	 */
 	void rectifyStereo(cv::cuda::GpuMat &l, cv::cuda::GpuMat &r, cv::cuda::Stream &stream);
 
+	/**
+	 * Rectify and remove distortions from from images l and r using cv::remap()
+	 */
+	void rectifyStereo(cv::Mat &l, cv::Mat &r);
+
 	bool isCalibrated();
 
 	void updateCalibration(const ftl::rgbd::Camera &p);
@@ -60,6 +65,8 @@ private:
 	private:
 	bool calibrated_;
 
+	std::pair<cv::Mat, cv::Mat> map1_;
+	std::pair<cv::Mat, cv::Mat> map2_;
 	std::pair<cv::cuda::GpuMat, cv::cuda::GpuMat> map1_gpu_;
 	std::pair<cv::cuda::GpuMat, cv::cuda::GpuMat> map2_gpu_;
 
diff --git a/components/rgbd-sources/src/local.cpp b/components/rgbd-sources/src/local.cpp
index 410cb9f96..cf5521352 100644
--- a/components/rgbd-sources/src/local.cpp
+++ b/components/rgbd-sources/src/local.cpp
@@ -9,11 +9,13 @@
 #include <thread>
 
 #include "local.hpp"
+#include "calibrate.hpp"
 #include <opencv2/core.hpp>
 #include <opencv2/opencv.hpp>
 #include <opencv2/xphoto.hpp>
 
 using ftl::rgbd::detail::LocalSource;
+using ftl::rgbd::detail::Calibrate;
 using cv::Mat;
 using cv::VideoCapture;
 using cv::Rect;
@@ -27,28 +29,15 @@ using std::this_thread::sleep_for;
 LocalSource::LocalSource(nlohmann::json &config)
 		: Configurable(config), timestamp_(0.0) {
 
-	REQUIRED({
-		{"flip","Switch left and right views","boolean"},
-		{"flip_vert","Rotate image 180 degrees","boolean"},
-		{"nostereo","Force single camera mode","boolean"},
-		{"width","Pixel width of camera source","number"},
-		{"height","Pixel height of camera source","number"},
-		{"max_fps","Maximum frames per second","number"},
-		{"scale","Change the input image or video scaling","number"}
-	});
-
-	flip_ = value("flip", false);
-	flip_v_ = value("flip_vert", false);
 	nostereo_ = value("nostereo", false);
-	downsize_ = value("scale", 1.0f);
 
 	// Use cameras
 	camera_a_ = new VideoCapture;
 	LOG(INFO) << "Cameras check... ";
-	camera_a_->open((flip_) ? 1 : 0);
+	camera_a_->open(0);
 
 	if (!nostereo_) {
-		camera_b_ = new VideoCapture((flip_) ? 0 : 1);
+		camera_b_ = new VideoCapture(1);
 	} else {
 		camera_b_ = nullptr;
 	}
@@ -82,26 +71,18 @@ LocalSource::LocalSource(nlohmann::json &config)
 		stereo_ = true;
 	}
 
-	tps_ = 1.0 / value("max_fps", 25.0);
+	// Allocate page locked host memory for fast GPU transfer
+	left_hm_ = cv::cuda::HostMem(height_, width_, CV_8UC3);
+	right_hm_ = cv::cuda::HostMem(height_, width_, CV_8UC3);
 }
 
 LocalSource::LocalSource(nlohmann::json &config, const string &vid)
 	:	Configurable(config), timestamp_(0.0) {
 
-	REQUIRED({
-		{"flip","Switch left and right views","boolean"},
-		{"flip_vert","Rotate image 180 degrees","boolean"},
-		{"nostereo","Force single camera mode","boolean"},
-		{"width","Pixel width of camera source","number"},
-		{"height","Pixel height of camera source","number"},
-		{"max_fps","Maximum frames per second","number"},
-		{"scale","Change the input image or video scaling","number"}
-	});
-
-	flip_ = value("flip", false);
-	flip_v_ = value("flip_vert", false);
+	//flip_ = value("flip", false);
+	//flip_v_ = value("flip_vert", false);
 	nostereo_ = value("nostereo", false);
-	downsize_ = value("scale", 1.0f);
+	//downsize_ = value("scale", 1.0f);
 
 	if (vid == "") {
 		LOG(FATAL) << "No video file specified";
@@ -138,10 +119,14 @@ LocalSource::LocalSource(nlohmann::json &config, const string &vid)
 		stereo_ = false;
 	}
 
-	tps_ = 1.0 / value("max_fps", 25.0);
+	// Allocate page locked host memory for fast GPU transfer
+	left_hm_ = cv::cuda::HostMem(height_, width_, CV_8UC3);
+	right_hm_ = cv::cuda::HostMem(height_, width_, CV_8UC3);
+
+	//tps_ = 1.0 / value("max_fps", 25.0);
 }
 
-bool LocalSource::left(cv::Mat &l) {
+/*bool LocalSource::left(cv::Mat &l) {
 	if (!camera_a_) return false;
 
 	if (!camera_a_->grab()) {
@@ -174,9 +159,9 @@ bool LocalSource::left(cv::Mat &l) {
 	}
 
 	return true;
-}
+}*/
 
-bool LocalSource::right(cv::Mat &r) {
+/*bool LocalSource::right(cv::Mat &r) {
 	if (!camera_a_->grab()) {
 		LOG(ERROR) << "Unable to grab from camera A";
 		return false;
@@ -212,10 +197,15 @@ bool LocalSource::right(cv::Mat &r) {
 	}
 
 	return true;
-}
+}*/
 
-bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, cv::cuda::Stream &stream) {
+bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, Calibrate *c, cv::cuda::Stream &stream) {
 	Mat l, r;
+
+	// Use page locked memory
+	l = left_hm_.createMatHeader();
+	r = right_hm_.createMatHeader();
+
 	if (!camera_a_) return false;
 
 	if (!camera_a_->grab()) {
@@ -239,7 +229,7 @@ bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, cv::cuda
 	timestamp_ = timestamp;
 
 	if (camera_b_ || !stereo_) {
-		if (!camera_a_->retrieve(left_)) {
+		if (!camera_a_->retrieve(l)) {
 			LOG(ERROR) << "Unable to read frame from camera A";
 			return false;
 		}
@@ -255,23 +245,23 @@ bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, cv::cuda
 		}
 
 		int resx = frame.cols / 2;
-		if (flip_) {
-			r = Mat(frame, Rect(0, 0, resx, frame.rows));
-			left_ = Mat(frame, Rect(resx, 0, frame.cols-resx, frame.rows));
-		} else {
-			left_ = Mat(frame, Rect(0, 0, resx, frame.rows));
+		//if (flip_) {
+		//	r = Mat(frame, Rect(0, 0, resx, frame.rows));
+		//	l = Mat(frame, Rect(resx, 0, frame.cols-resx, frame.rows));
+		//} else {
+			l = Mat(frame, Rect(0, 0, resx, frame.rows));
 			r = Mat(frame, Rect(resx, 0, frame.cols-resx, frame.rows));
-		}
+		//}
 	}
 
-	if (downsize_ != 1.0f) {
+	/*if (downsize_ != 1.0f) {
 		// cv::cuda::resize()
 
 		cv::resize(left_, left_, cv::Size((int)(left_.cols * downsize_), (int)(left_.rows * downsize_)),
 				0, 0, cv::INTER_LINEAR);
 		cv::resize(r, r, cv::Size((int)(r.cols * downsize_), (int)(r.rows * downsize_)),
 				0, 0, cv::INTER_LINEAR);
-	}
+	}*/
 
 	// Note: this seems to be too slow on CPU...
 	/*cv::Ptr<cv::xphoto::WhiteBalancer> wb;
@@ -279,15 +269,17 @@ bool LocalSource::get(cv::cuda::GpuMat &l_out, cv::cuda::GpuMat &r_out, cv::cuda
 	wb->balanceWhite(l, l);
 	wb->balanceWhite(r, r);*/
 
-	if (flip_v_) {
+	/*if (flip_v_) {
 		Mat tl, tr;
 		cv::flip(left_, tl, 0);
 		cv::flip(r, tr, 0);
 		left_ = tl;
 		r = tr;
-	}
+	}*/
+
+	c->rectifyStereo(l, r);
 
-	l_out.upload(left_, stream);
+	l_out.upload(l, stream);
 	r_out.upload(r, stream);
 
 	return true;
diff --git a/components/rgbd-sources/src/local.hpp b/components/rgbd-sources/src/local.hpp
index e3fcb91bd..4fd71c5ee 100644
--- a/components/rgbd-sources/src/local.hpp
+++ b/components/rgbd-sources/src/local.hpp
@@ -15,19 +15,19 @@ namespace ftl {
 namespace rgbd {
 namespace detail {
 
+class Calibrate;
+
 class LocalSource : public Configurable {
 	public:
 	explicit LocalSource(nlohmann::json &config);
 	LocalSource(nlohmann::json &config, const std::string &vid);
 	
-	bool left(cv::Mat &m);
-	bool right(cv::Mat &m);
-	bool get(cv::cuda::GpuMat &l, cv::cuda::GpuMat &r, cv::cuda::Stream &stream);
+	//bool left(cv::Mat &m);
+	//bool right(cv::Mat &m);
+	bool get(cv::cuda::GpuMat &l, cv::cuda::GpuMat &r, Calibrate *c, cv::cuda::Stream &stream);
 
 	unsigned int width() const { return width_; }
 	unsigned int height() const { return height_; }
-
-	cv::Mat &cachedLeft() { return left_; }
 	
 	//void setFramerate(float fps);
 	//float getFramerate() const;
@@ -38,18 +38,20 @@ class LocalSource : public Configurable {
 	
 	private:
 	double timestamp_;
-	double tps_;
+	//double tps_;
 	bool stereo_;
 	//float fps_;
-	bool flip_;
-	bool flip_v_;
+	//bool flip_;
+	//bool flip_v_;
 	bool nostereo_;
-	float downsize_;
+	//float downsize_;
 	cv::VideoCapture *camera_a_;
 	cv::VideoCapture *camera_b_;
 	unsigned int width_;
 	unsigned int height_;
-	cv::Mat left_;
+
+	cv::cuda::HostMem left_hm_;
+	cv::cuda::HostMem right_hm_;
 };
 
 }
diff --git a/components/rgbd-sources/src/stereovideo.cpp b/components/rgbd-sources/src/stereovideo.cpp
index 0d20780c2..d83fdb194 100644
--- a/components/rgbd-sources/src/stereovideo.cpp
+++ b/components/rgbd-sources/src/stereovideo.cpp
@@ -154,7 +154,7 @@ static void disparityToDepth(const cv::cuda::GpuMat &disparity, cv::cuda::GpuMat
 }
 
 bool StereoVideoSource::capture() {
-	lsrc_->get(cap_left_, cap_right_, stream2_);
+	lsrc_->get(cap_left_, cap_right_, calib_, stream2_);
 	stream2_.waitForCompletion();
 	return true;
 }
@@ -178,7 +178,7 @@ bool StereoVideoSource::compute(int n, int b) {
 		//lsrc_->get(left_, right_, stream_);
 		if (depth_tmp_.empty()) depth_tmp_ = cv::cuda::GpuMat(left_.size(), CV_32FC1);
 		if (disp_tmp_.empty()) disp_tmp_ = cv::cuda::GpuMat(left_.size(), CV_32FC1);
-		calib_->rectifyStereo(left_, right_, stream_);
+		//calib_->rectifyStereo(left_, right_, stream_);
 		disp_->compute(left_, right_, disp_tmp_, stream_);
 		ftl::cuda::disparity_to_depth(disp_tmp_, depth_tmp_, params_, stream_);
 		left_.download(rgb_, stream_);
@@ -188,13 +188,13 @@ bool StereoVideoSource::compute(int n, int b) {
 		stream_.waitForCompletion();  // TODO:(Nick) Move to getFrames
 	} else if (chan == ftl::rgbd::kChanRight) {
 		//lsrc_->get(left_, right_, stream_);
-		calib_->rectifyStereo(left_, right_, stream_);
+		//calib_->rectifyStereo(left_, right_, stream_);
 		left_.download(rgb_, stream_);
 		right_.download(depth_, stream_);
 		stream_.waitForCompletion();  // TODO:(Nick) Move to getFrames
 	} else {
 		//lsrc_->get(left_, right_, stream_);
-		calib_->rectifyStereo(left_, right_, stream_);
+		//calib_->rectifyStereo(left_, right_, stream_);
 		//rgb_ = lsrc_->cachedLeft();
 		left_.download(rgb_, stream_);
 		stream_.waitForCompletion();  // TODO:(Nick) Move to getFrames
diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp
index dd4a93d69..ab03f2a65 100644
--- a/components/rgbd-sources/src/streamer.cpp
+++ b/components/rgbd-sources/src/streamer.cpp
@@ -309,120 +309,138 @@ void Streamer::wait() {
 	frame_no_ = last_frame_;
 }
 
-void Streamer::_schedule() {
-	wait();
+void Streamer::_schedule(StreamSource *src) {
+	// There will be two jobs for this source...
+	//UNIQUE_LOCK(job_mtx_,lk);
+	jobs_ += 2 + kChunkCount;
+	//lk.unlock();
 
-	// Prevent new clients during processing.
-	SHARED_LOCK(mutex_,slk);
+	//StreamSource *src = sources_[uri];
+	if (src == nullptr || src->jobs != 0) return;
+	src->jobs = 2 + kChunkCount;
 
-	for (auto s : sources_) {
-		string uri = s.first;
+	// Grab / capture job
+	ftl::pool.push([this,src](int id) {
+		//auto start = std::chrono::high_resolution_clock::now();
 
-		// No point in doing work if no clients
-		if (s.second->clientCount == 0) {
-			continue;
-		}
-
-		// There will be two jobs for this source...
-		//UNIQUE_LOCK(job_mtx_,lk);
-		jobs_ += 2 + kChunkCount;
-		//lk.unlock();
-
-		StreamSource *src = sources_[uri];
-		if (src == nullptr || src->jobs != 0) continue;
-		src->jobs = 2 + kChunkCount;
+		auto start = std::chrono::high_resolution_clock::now();
+		int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
+		int64_t target = now / mspf_;
+		int64_t msdelay = mspf_ - (now % mspf_);
 
-		// Grab / capture job
-		ftl::pool.push([this,src](int id) {
-			//auto start = std::chrono::high_resolution_clock::now();
+		if (target != last_frame_ && msdelay != mspf_) LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%mspf_) << "ms";
 
-			auto start = std::chrono::high_resolution_clock::now();
-			int64_t now = std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count()+clock_adjust_;
-			int64_t target = now / mspf_;
-
-			// TODO:(Nick) A now%mspf_ == 0 should be accepted
-			if (target != last_frame_) {
-				LOG(WARNING) << "Frame " << "(" << (target-last_frame_) << ") dropped by " << (now%mspf_) << "ms";
-			}
-			//_decideFrameRate(target-last_frame_, now%mspf_);
+		// Use sleep_for for larger delays
+		
+		//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
+		while (msdelay >= 20 && msdelay < mspf_) {
+			sleep_for(milliseconds(10));
+			now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
+			msdelay = mspf_ - (now % mspf_);
+		}
 
-			// Use sleep_for for larger delays
-			int64_t msdelay = mspf_ - (now % mspf_);
-			//LOG(INFO) << "Required Delay: " << (now / 40) << " = " << msdelay;
-			while (msdelay >= 20) {
-				sleep_for(milliseconds(10));
-				now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
-				msdelay = mspf_ - (now % mspf_);
-			}
+		// Spin loop until exact grab time
+		//LOG(INFO) << "Spin Delay: " << (now / 40) << " = " << (40 - (now%40));
 
-			// Spin loop until exact grab time
+		if (msdelay != mspf_) {
 			target = now / mspf_;
 			while ((now/mspf_) == target) {
 				_mm_pause();  // SSE2 nano pause intrinsic
 				now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
 			};
-			last_frame_ = now/mspf_;
+		}
+		last_frame_ = now/mspf_;
 
-			try {
-				src->src->capture();
-			} catch (std::exception &ex) {
-				LOG(ERROR) << "Exception when grabbing frame";
-				LOG(ERROR) << ex.what();
-			}
-			catch (...) {
-				LOG(ERROR) << "Unknown exception when grabbing frame";
-			}
+		try {
+			src->src->capture();
+		} catch (std::exception &ex) {
+			LOG(ERROR) << "Exception when grabbing frame";
+			LOG(ERROR) << ex.what();
+		}
+		catch (...) {
+			LOG(ERROR) << "Unknown exception when grabbing frame";
+		}
 
-			src->jobs--;
-			_swap(src);
+		//now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
+		//if (now%40 > 0) LOG(INFO) << "Grab in: " << (now%40) << "ms";
 
-			// Mark job as finished
-			std::unique_lock<std::mutex> lk(job_mtx_);
-			--jobs_;
-			if (jobs_ == 0) job_cv_.notify_one();
-		});
+		//std::chrono::duration<double> elapsed =
+		//	std::chrono::high_resolution_clock::now() - start;
+		//LOG(INFO) << "Grab in " << elapsed.count() << "s";
+
+		src->jobs--;
+		_swap(src);
+
+		// Mark job as finished
+		std::unique_lock<std::mutex> lk(job_mtx_);
+		--jobs_;
+		job_cv_.notify_one();
+	});
+
+	// Compute job
+	ftl::pool.push([this,src](int id) {
+		try {
+			src->src->compute();
+		} catch (std::exception &ex) {
+			LOG(ERROR) << "Exception when computing frame";
+			LOG(ERROR) << ex.what();
+		}
+		catch (...) {
+			LOG(ERROR) << "Unknown exception when computing frame";
+		}
+
+		src->jobs--;
+		_swap(src);
 
-		// Compute job
-		ftl::pool.push([this,src](int id) {
+		// Mark job as finished
+		std::unique_lock<std::mutex> lk(job_mtx_);
+		--jobs_;
+		job_cv_.notify_one();
+	});
+
+	// Create jobs for each chunk
+	for (int i=0; i<kChunkCount; ++i) {
+		// Add chunk job to thread pool
+		ftl::pool.push([this,src,i](int id) {
+			int chunk = i;
 			try {
-				src->src->compute();
-			} catch (std::exception &ex) {
-				LOG(ERROR) << "Exception when computing frame";
-				LOG(ERROR) << ex.what();
+			if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) {
+				_encodeAndTransmit(src, chunk);
 			}
-			catch (...) {
-				LOG(ERROR) << "Unknown exception when computing frame";
+			} catch(...) {
+				LOG(ERROR) << "Encode Exception: " << chunk;
 			}
 
 			src->jobs--;
 			_swap(src);
-
-			// Mark job as finished
 			std::unique_lock<std::mutex> lk(job_mtx_);
 			--jobs_;
 			job_cv_.notify_one();
 		});
+	}
+}
 
-		// Create jobs for each chunk
-		for (int i=0; i<kChunkCount; ++i) {
-			// Add chunk job to thread pool
-			ftl::pool.push([this,src,i](int id) {
-				int chunk = i;
-				try {
-				if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) {
-					_encodeAndTransmit(src, chunk);
-				}
-				} catch(...) {
-					LOG(ERROR) << "Encode Exception: " << chunk;
-				}
+void Streamer::_schedule() {
+	wait();
+	//std::mutex job_mtx;
+	//std::condition_variable job_cv;
+	//int jobs = 0;
+
+	//auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now()).time_since_epoch().count()+clock_adjust_;
+	//LOG(INFO) << "Frame time = " << (now-(last_frame_*40)) << "ms";
+
+	// Prevent new clients during processing.
+	SHARED_LOCK(mutex_,slk);
 
-				src->jobs--;
-				_swap(src);
-				std::unique_lock<std::mutex> lk(job_mtx_);
-				--jobs_;
-				if (jobs_ == 0) job_cv_.notify_one();
-			});
+	for (auto s : sources_) {
+		string uri = s.first;
+
+		// No point in doing work if no clients
+		if (s.second->clientCount == 0) {
+			continue;
 		}
+
+		_schedule(s.second);
 	}
 }
 
-- 
GitLab