Skip to content
Snippets Groups Projects
Commit 86120280 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Parallelise the disparity+stream into a pipeline

parent 21931ab4
No related branches found
No related tags found
No related merge requests found
Pipeline #9759 passed
File moved
...@@ -125,6 +125,7 @@ static void run(const string &file) { ...@@ -125,6 +125,7 @@ static void run(const string &file) {
// Make sure connections are complete // Make sure connections are complete
sleep_for(milliseconds(500)); sleep_for(milliseconds(500));
// TODO(nick) Allow for many sources
net.subscribe(config["source"], [&rgb,&m,&depth](const vector<unsigned char> &jpg, const vector<unsigned char> &d) { net.subscribe(config["source"], [&rgb,&m,&depth](const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
unique_lock<mutex> lk(m); unique_lock<mutex> lk(m);
cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb); cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb);
...@@ -153,6 +154,7 @@ static void run(const string &file) { ...@@ -153,6 +154,7 @@ static void run(const string &file) {
unique_lock<mutex> lk(m); unique_lock<mutex> lk(m);
if (depth.cols > 0) { if (depth.cols > 0) {
// If no calibration data then get it from the remote machine
if (Q.rows == 0) { if (Q.rows == 0) {
auto buf = net.findOne<vector<unsigned char>>((string)config["source"]+"/calibration"); auto buf = net.findOne<vector<unsigned char>>((string)config["source"]+"/calibration");
if (buf) { if (buf) {
...@@ -162,9 +164,6 @@ static void run(const string &file) { ...@@ -162,9 +164,6 @@ static void run(const string &file) {
disp.setCalibration(Q); disp.setCalibration(Q);
} }
} }
//depth.convertTo(idepth, CV_8U, 255.0f / 256.0f); // TODO(nick)
//applyColorMap(idepth, idepth, cv::COLORMAP_JET);
//cv::imshow("Depth", idepth);
} }
if (rgb.cols > 0) { if (rgb.cols > 0) {
...@@ -172,6 +171,7 @@ static void run(const string &file) { ...@@ -172,6 +171,7 @@ static void run(const string &file) {
disp.render(rgb,depth); disp.render(rgb,depth);
} }
// TODO(nick) Use a swap buffer so this can be unlocked earlier
lk.unlock(); lk.unlock();
//if (cv::waitKey(40) == 27) break; //if (cv::waitKey(40) == 27) break;
disp.wait(40); disp.wait(40);
......
...@@ -28,11 +28,11 @@ void FixstarsSGM::compute(const cv::Mat &l, const cv::Mat &r, cv::Mat &disp) { ...@@ -28,11 +28,11 @@ void FixstarsSGM::compute(const cv::Mat &l, const cv::Mat &r, cv::Mat &disp) {
disp = Mat(cv::Size(l.cols, l.rows), CV_16UC1); disp = Mat(cv::Size(l.cols, l.rows), CV_16UC1);
auto start = std::chrono::high_resolution_clock::now(); //auto start = std::chrono::high_resolution_clock::now();
ssgm_->execute(lbw.data, rbw.data, disp.data); ssgm_->execute(lbw.data, rbw.data, disp.data);
std::chrono::duration<double> elapsed = //std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start; // std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "CUDA sgm in " << elapsed.count() << "s"; //LOG(INFO) << "CUDA sgm in " << elapsed.count() << "s";
disp.convertTo(disp, CV_32F, 1.0f/16.0f); disp.convertTo(disp, CV_32F, 1.0f/16.0f);
} }
......
...@@ -6,11 +6,15 @@ ...@@ -6,11 +6,15 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <ftl/config.h> #include <ftl/config.h>
#include <ctpl_stl.h>
#include <string> #include <string>
#include <map> #include <map>
#include <vector> #include <vector>
#include <fstream> #include <fstream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <opencv2/opencv.hpp> #include <opencv2/opencv.hpp>
#include <ftl/local.hpp> #include <ftl/local.hpp>
...@@ -42,6 +46,9 @@ using ftl::net::Universe; ...@@ -42,6 +46,9 @@ using ftl::net::Universe;
using std::string; using std::string;
using std::vector; using std::vector;
using std::map; using std::map;
using std::condition_variable;
using std::mutex;
using std::unique_lock;
using cv::Mat; using cv::Mat;
using json = nlohmann::json; using json = nlohmann::json;
using std::ifstream; using std::ifstream;
...@@ -118,6 +125,7 @@ static void process_options(const map<string, string> &opts) { ...@@ -118,6 +125,7 @@ static void process_options(const map<string, string> &opts) {
} }
static void run(const string &file) { static void run(const string &file) {
ctpl::thread_pool pool(2);
Universe net(config["net"]); Universe net(config["net"]);
LocalSource *lsrc; LocalSource *lsrc;
...@@ -157,6 +165,7 @@ static void run(const string &file) { ...@@ -157,6 +165,7 @@ static void run(const string &file) {
if (!disparity) LOG(FATAL) << "Unknown disparity algorithm : " << config["disparity"]; if (!disparity) LOG(FATAL) << "Unknown disparity algorithm : " << config["disparity"];
Mat l, r, disp; Mat l, r, disp;
Mat pl, pdisp;
Display display(config["display"]); Display display(config["display"]);
display.setCalibration(Q_32F); display.setCalibration(Q_32F);
...@@ -164,25 +173,58 @@ static void run(const string &file) { ...@@ -164,25 +173,58 @@ static void run(const string &file) {
Streamer stream(net, config["stream"]); Streamer stream(net, config["stream"]);
while (display.active()) { while (display.active()) {
// Read calibrated images. mutex m;
calibrate.rectified(l, r); condition_variable cv;
int jobs = 0;
// Feed into sync buffer and network forward
sync->feed(ftl::LEFT, l, lsrc->getTimestamp()); pool.push([&](int id) {
sync->feed(ftl::RIGHT, r, lsrc->getTimestamp()); auto start = std::chrono::high_resolution_clock::now();
// Read calibrated images.
// Read back from buffer calibrate.rectified(l, r);
sync->get(ftl::LEFT, l);
sync->get(ftl::RIGHT, r); // Feed into sync buffer and network forward
sync->feed(ftl::LEFT, l, lsrc->getTimestamp());
// TODO(nick) Pipeline this sync->feed(ftl::RIGHT, r, lsrc->getTimestamp());
disparity->compute(l, r, disp);
// Read back from buffer
sync->get(ftl::LEFT, l);
sync->get(ftl::RIGHT, r);
// TODO(nick) Pipeline this
disparity->compute(l, r, disp);
unique_lock<mutex> lk(m);
jobs++;
lk.unlock();
cv.notify_one();
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "Disparity in " << elapsed.count() << "s";
});
pool.push([&](int id) {
auto start = std::chrono::high_resolution_clock::now();
if (pl.rows != 0) stream.send(pl, pdisp);
unique_lock<mutex> lk(m);
jobs++;
lk.unlock();
cv.notify_one();
std::chrono::duration<double> elapsed =
std::chrono::high_resolution_clock::now() - start;
LOG(INFO) << "Stream in " << elapsed.count() << "s";
});
// Send RGB+Depth images for local rendering // Send RGB+Depth images for local rendering
display.render(l, disp); display.render(l, disp);
stream.send(l, disp);
display.wait(1); display.wait(1);
unique_lock<mutex> lk(m);
cv.wait(lk, [&jobs]{return jobs == 2;});
l.copyTo(pl);
disp.copyTo(pdisp);
} }
} }
......
...@@ -40,7 +40,7 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { ...@@ -40,7 +40,7 @@ void Streamer::send(const Mat &rgb, const Mat &depth) {
deflateEnd(&defstream); deflateEnd(&defstream);
d_buf.resize(defstream.total_out); d_buf.resize(defstream.total_out);
LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f)); //LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f));
net_.publish(uri_, rgb_buf, d_buf); net_.publish(uri_, rgb_buf, d_buf);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment