Skip to content
Snippets Groups Projects
Commit f79fa506 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/376/pipequeue' into 'master'

Implements #376 Pipeline queues

Closes #376

See merge request nicolas.pope/ftl!341
parents 43e3f842 107a00c0
No related branches found
No related tags found
1 merge request!341Implements #376 Pipeline queues
Pipeline #29369 passed
......@@ -358,7 +358,8 @@ ftlError_t ftlNextFrame(ftlStream_t stream) {
try {
cudaSetDevice(0);
if (stream->pipelines) {
stream->pipelines->apply(*stream->video_fs, *stream->video_fs, 0);
stream->pipelines->apply(*stream->video_fs, *stream->video_fs);
// FIXME: Stream sync
}
for (auto &c : stream->video_fs->firstFrame().changed()) {
......@@ -384,7 +385,8 @@ ftlError_t ftlDestroyStream(ftlStream_t stream) {
try {
cudaSetDevice(0);
if (stream->pipelines) {
stream->pipelines->apply(*stream->video_fs, *stream->video_fs, 0);
stream->pipelines->apply(*stream->video_fs, *stream->video_fs);
// FIXME: Stream sync
}
for (auto &c : stream->video_fs->firstFrame().changed()) {
stream->sender->post(*stream->video_fs, c.first);
......
......@@ -216,7 +216,7 @@ static void run(ftl::Configurable *root) {
fs->set(ftl::data::FSFlag::AUTO_SEND);
bool did_pipe = pipeline->apply(*fs, *fs, [fs,&frames,&latency]() {
bool did_pipe = pipeline->queue(fs, [fs,&frames,&latency]() {
if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth);
++frames;
latency += float(ftl::timer::get_time() - fs->timestamp());
......
......@@ -126,8 +126,11 @@ class Graph : public ftl::Configurable {
ftl::Configurable *append(const std::string &name, ARGS...);
bool apply(ftl::rgbd::Frame &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr);
bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, const std::function<void()> &cb=nullptr);
bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr);
bool queue(const ftl::data::FrameSetPtr &fs, const std::function<void()> &cb);
bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out);
//bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr);
/**
* Make sure all async operators have also completed. This is automatically
......@@ -152,8 +155,12 @@ class Graph : public ftl::Configurable {
std::unordered_map<uint32_t,cv::cuda::GpuMat> buffers_;
std::unordered_set<uint32_t> valid_buffers_;
std::function<void()> callback_;
std::list<std::pair<ftl::data::FrameSetPtr, std::function<void()>>> queue_;
MUTEX mtx_;
ftl::Configurable *_append(ftl::operators::detail::ConstructionHelperBase*);
void _processOne();
bool _apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out);
};
}
......
......@@ -68,21 +68,39 @@ bool Graph::hasBuffer(ftl::operators::Buffer b, uint32_t fid) const {
return valid_buffers_.count((uint32_t(b) << 8) + fid) > 0;
}
bool Graph::apply(FrameSet &in, FrameSet &out, const std::function<void()> &cb) {
bool Graph::queue(const ftl::data::FrameSetPtr &fs, const std::function<void()> &cb) {
if (!value("enabled", true)) return true;
if (in.frames.size() < 1) return true;
if (fs->frames.size() < 1) return true;
auto stream_actual = in.frames[0].stream();
bool success = true;
if (in.frames.size() != out.frames.size()) return true;
{
UNIQUE_LOCK(mtx_, lk);
if (queue_.size() > 3) {
LOG(ERROR) << "Pipeline queue exceeded";
return false;
}
queue_.emplace_back(fs, cb);
}
if (busy_.test_and_set()) {
LOG(ERROR) << "Pipeline already in use: " << in.timestamp();
//if (cb) cb();
return false;
LOG(INFO) << "Pipeline queued... " << queue_.size();
return true;
}
_processOne();
return true;
}
bool Graph::apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out) {
if (!value("enabled", true)) return true;
if (in.frames.size() < 1) return true;
return _apply(in, out);
}
bool Graph::_apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out) {
auto stream_actual = in.frames[0].stream();
bool success = true;
valid_buffers_.clear();
for (auto &f : out.frames) {
......@@ -142,18 +160,51 @@ bool Graph::apply(FrameSet &in, FrameSet &out, const std::function<void()> &cb)
}
success = waitAll(stream_actual) && success;
return success;
}
void Graph::_processOne() {
ftl::data::FrameSetPtr fs;
std::function<void()> cb;
{
UNIQUE_LOCK(mtx_, lk);
if(queue_.size() == 0) {
busy_.clear();
return;
}
fs = queue_.front().first;
cb = queue_.front().second;
queue_.pop_front();
}
auto &in = *fs;
auto &out = *fs;
auto stream_actual = in.frames[0].stream();
_apply(in, out);
if (cb) {
cudaCallback(stream_actual, [this,cb]() {
busy_.clear();
ftl::pool.push([cb](int id) { cb(); });
bool sched = false;
{
UNIQUE_LOCK(mtx_, lk);
if (queue_.size() == 0) busy_.clear();
else sched = true;
}
ftl::pool.push([this,cb,sched](int id) {
if (sched) _processOne();
cb();
});
});
} else {
//cudaSafeCall(cudaStreamSynchronize(stream_actual));
busy_.clear();
}
return true;
return;
}
bool Graph::waitAll(cudaStream_t stream) {
......
......@@ -187,7 +187,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) :
bool did_pipe = false;
if (pipeline) {
bool did_pipe = pipeline->apply(*fs, *fs, [this,fs]() {
bool did_pipe = pipeline->queue(fs, [this,fs]() {
_dispatch(fs);
});
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment