diff --git a/SDK/C/src/streams.cpp b/SDK/C/src/streams.cpp index 4584ec6267b2412bb8da7350178a8b95cd046632..943ba8e667e0abd0ae1fb9d23dda84e6e74e18c5 100644 --- a/SDK/C/src/streams.cpp +++ b/SDK/C/src/streams.cpp @@ -358,7 +358,8 @@ ftlError_t ftlNextFrame(ftlStream_t stream) { try { cudaSetDevice(0); if (stream->pipelines) { - stream->pipelines->apply(*stream->video_fs, *stream->video_fs, 0); + stream->pipelines->apply(*stream->video_fs, *stream->video_fs); + // FIXME: Stream sync } for (auto &c : stream->video_fs->firstFrame().changed()) { @@ -384,7 +385,8 @@ ftlError_t ftlDestroyStream(ftlStream_t stream) { try { cudaSetDevice(0); if (stream->pipelines) { - stream->pipelines->apply(*stream->video_fs, *stream->video_fs, 0); + stream->pipelines->apply(*stream->video_fs, *stream->video_fs); + // FIXME: Stream sync } for (auto &c : stream->video_fs->firstFrame().changed()) { stream->sender->post(*stream->video_fs, c.first); diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 4f370c1a0f15bf7e2abb7e5b7ad188d778144f0e..f9213729d9818e6bf028f1b451bbc10cbf7dee59 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -216,7 +216,7 @@ static void run(ftl::Configurable *root) { fs->set(ftl::data::FSFlag::AUTO_SEND); - bool did_pipe = pipeline->apply(*fs, *fs, [fs,&frames,&latency]() { + bool did_pipe = pipeline->queue(fs, [fs,&frames,&latency]() { if (fs->hasAnyChanged(Channel::Depth)) fs->flush(Channel::Depth); ++frames; latency += float(ftl::timer::get_time() - fs->timestamp()); diff --git a/components/operators/include/ftl/operators/operator.hpp b/components/operators/include/ftl/operators/operator.hpp index 86c406caa2da23eb88f2c2493c7b2e6864fa0b74..3f77b6900e4456898076031b3d97ba5bae190f90 100644 --- a/components/operators/include/ftl/operators/operator.hpp +++ b/components/operators/include/ftl/operators/operator.hpp @@ -126,8 +126,11 @@ class Graph : public ftl::Configurable { ftl::Configurable *append(const std::string &name, ARGS...); bool apply(ftl::rgbd::Frame &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr); - bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out, const std::function<void()> &cb=nullptr); - bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr); + + bool queue(const ftl::data::FrameSetPtr &fs, const std::function<void()> &cb); + + bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out); + //bool apply(ftl::rgbd::FrameSet &in, ftl::rgbd::Frame &out, const std::function<void()> &cb=nullptr); /** * Make sure all async operators have also completed. This is automatically @@ -152,8 +155,12 @@ class Graph : public ftl::Configurable { std::unordered_map<uint32_t,cv::cuda::GpuMat> buffers_; std::unordered_set<uint32_t> valid_buffers_; std::function<void()> callback_; + std::list<std::pair<ftl::data::FrameSetPtr, std::function<void()>>> queue_; + MUTEX mtx_; ftl::Configurable *_append(ftl::operators::detail::ConstructionHelperBase*); + void _processOne(); + bool _apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out); }; } diff --git a/components/operators/src/operator.cpp b/components/operators/src/operator.cpp index 4cc0f90598d49fc70eb6b48eec773a5733b6a459..81adaaa8752f17ab69070c47291ed53ba414c2db 100644 --- a/components/operators/src/operator.cpp +++ b/components/operators/src/operator.cpp @@ -68,21 +68,39 @@ bool Graph::hasBuffer(ftl::operators::Buffer b, uint32_t fid) const { return valid_buffers_.count((uint32_t(b) << 8) + fid) > 0; } -bool Graph::apply(FrameSet &in, FrameSet &out, const std::function<void()> &cb) { +bool Graph::queue(const ftl::data::FrameSetPtr &fs, const std::function<void()> &cb) { if (!value("enabled", true)) return true; - if (in.frames.size() < 1) return true; + if (fs->frames.size() < 1) return true; - auto stream_actual = in.frames[0].stream(); - bool success = true; - - if (in.frames.size() != out.frames.size()) return true; + { + UNIQUE_LOCK(mtx_, lk); + if (queue_.size() > 3) { + LOG(ERROR) << "Pipeline queue exceeded"; + return false; + } + queue_.emplace_back(fs, cb); + } if (busy_.test_and_set()) { - LOG(ERROR) << "Pipeline already in use: " << in.timestamp(); - //if (cb) cb(); - return false; + LOG(INFO) << "Pipeline queued... " << queue_.size(); + return true; } + _processOne(); + return true; +} + +bool Graph::apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out) { + if (!value("enabled", true)) return true; + if (in.frames.size() < 1) return true; + + return _apply(in, out); +} + +bool Graph::_apply(ftl::rgbd::FrameSet &in, ftl::rgbd::FrameSet &out) { + auto stream_actual = in.frames[0].stream(); + bool success = true; + valid_buffers_.clear(); for (auto &f : out.frames) { @@ -142,18 +160,51 @@ bool Graph::apply(FrameSet &in, FrameSet &out, const std::function<void()> &cb) } success = waitAll(stream_actual) && success; + return success; +} + +void Graph::_processOne() { + + ftl::data::FrameSetPtr fs; + std::function<void()> cb; + + { + UNIQUE_LOCK(mtx_, lk); + if(queue_.size() == 0) { + busy_.clear(); + return; + } + + fs = queue_.front().first; + cb = queue_.front().second; + queue_.pop_front(); + } + + auto &in = *fs; + auto &out = *fs; + + auto stream_actual = in.frames[0].stream(); + + _apply(in, out); if (cb) { cudaCallback(stream_actual, [this,cb]() { - busy_.clear(); - ftl::pool.push([cb](int id) { cb(); }); + bool sched = false; + { + UNIQUE_LOCK(mtx_, lk); + if (queue_.size() == 0) busy_.clear(); + else sched = true; + } + ftl::pool.push([this,cb,sched](int id) { + if (sched) _processOne(); + cb(); + }); }); } else { - //cudaSafeCall(cudaStreamSynchronize(stream_actual)); busy_.clear(); } - - return true; + + return; } bool Graph::waitAll(cudaStream_t stream) { diff --git a/components/streams/src/feed.cpp b/components/streams/src/feed.cpp index 6822813cb30c7bddb12c39c6384ad89ac130913f..11df5649776ff63bb1de281687011d4de5abb434 100644 --- a/components/streams/src/feed.cpp +++ b/components/streams/src/feed.cpp @@ -187,7 +187,7 @@ Feed::Feed(nlohmann::json &config, ftl::net::Universe*net) : bool did_pipe = false; if (pipeline) { - bool did_pipe = pipeline->apply(*fs, *fs, [this,fs]() { + bool did_pipe = pipeline->queue(fs, [this,fs]() { _dispatch(fs); });