From 2b772870610de84605a62f091b188b17c711d9a0 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Sun, 5 Jul 2020 18:46:32 +0300 Subject: [PATCH] Use a more global flush mechanism --- applications/tools/simple_viewer/main.cpp | 9 +++-- applications/vision/src/main.cpp | 28 +++++++--------- .../structures/include/ftl/data/framepool.hpp | 10 ++++++ .../structures/include/ftl/data/new_frame.hpp | 2 ++ .../include/ftl/data/new_frameset.hpp | 1 + components/structures/src/frameset.cpp | 33 +++++++++++++++++-- components/structures/src/new_frame.cpp | 2 ++ 7 files changed, 64 insertions(+), 21 deletions(-) diff --git a/applications/tools/simple_viewer/main.cpp b/applications/tools/simple_viewer/main.cpp index e8d10030a..c63c36cc8 100644 --- a/applications/tools/simple_viewer/main.cpp +++ b/applications/tools/simple_viewer/main.cpp @@ -134,8 +134,11 @@ static void run(ftl::Configurable *root) { stream->begin(); stream->select(0, Channel::Colour + Channel::Depth, true); - handles.push_back(std::move(pool.session(ftl::data::FrameID(0,0)).onFlush([sender](ftl::data::Frame &f, ftl::codecs::Channel c) { - sender->post(f, c); + handles.push_back(std::move(pool.onFlush([sender](ftl::data::Frame &f, ftl::codecs::Channel c) { + // Send only reponse channels on a per frame basis + if (f.mode() == ftl::data::FrameMode::RESPONSE) { + sender->post(f, c); + } return true; }))); } @@ -152,6 +155,8 @@ static void run(ftl::Configurable *root) { } int k = cv::waitKey(10); + + // Send the key back to vision node (TESTING) if (k >= 0) { auto rf = fs->firstFrame().response(); rf.create<int>(Channel::Control) = k; diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 9aa57e3ca..fe22f9526 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -163,18 +163,12 @@ static void run(ftl::Configurable *root) { receiver->setStream(outstream); receiver->registerBuilder(creatorptr); - // Listen for any flush events for frameset 0 - /*auto flushh = pool.group(0).onFlush([](ftl::data::Frame &f, ftl::codecs::Channel c) { - // Send to sender for encoding - - if (c == ftl::codecs::Channel::Colour) { - cv::Mat tmp; - f.get<cv::cuda::GpuMat>(ftl::codecs::Channel::Colour).download(tmp); - cv::imshow("Image", tmp); - cv::waitKey(1); - } + // Send channels on flush + auto flushhandle = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + // TODO: Check the channel to see if it should be sent or not + sender->post(fs, c); return true; - });*/ + }); int stats_count = 0; int frames = 0; @@ -191,9 +185,6 @@ static void run(ftl::Configurable *root) { if (busy) return true; busy = true; - // Lock colour right now to encode in parallel - fs->flush(ftl::codecs::Channel::Colour); - // TODO: Remove, this is debug code if (fs->firstFrame().changed(ftl::codecs::Channel::Control)) { LOG(INFO) << "Got control: " << fs->firstFrame().get<int>(ftl::codecs::Channel::Control); @@ -203,7 +194,10 @@ static void run(ftl::Configurable *root) { ftl::pool.push([sender,&stats_count,&latency,&frames,pipeline,&busy,fs](int id) { // Do pipeline here... pipeline->apply(*fs, *fs); - if (fs->firstFrame().has(ftl::codecs::Channel::Depth)) sender->post(*fs, ftl::codecs::Channel::Depth); + //if (fs->firstFrame().has(ftl::codecs::Channel::Depth)) fs->flush(ftl::codecs::Channel::Depth); //sender->post(*fs, ftl::codecs::Channel::Depth); + + // Send any remaining channels... + fs->flush(); ++frames; latency += float(ftl::timer::get_time() - fs->timestamp()); @@ -219,8 +213,8 @@ static void run(ftl::Configurable *root) { busy = false; }); - // Actually send colour in this original thread - sender->post(*fs, ftl::codecs::Channel::Colour); + // Lock colour right now to encode in parallel + fs->flush(ftl::codecs::Channel::Colour); return true; }); diff --git a/components/structures/include/ftl/data/framepool.hpp b/components/structures/include/ftl/data/framepool.hpp index 4413dd236..ab619531e 100644 --- a/components/structures/include/ftl/data/framepool.hpp +++ b/components/structures/include/ftl/data/framepool.hpp @@ -11,6 +11,9 @@ namespace ftl { namespace data { class Pool { + friend class Session; + friend class FrameSet; + public: explicit Pool(size_t min_n, size_t max_n); ~Pool(); @@ -21,6 +24,10 @@ class Pool { ftl::data::Session &session(FrameID id); inline ftl::data::Session &group(FrameID id) { return session(id); } + inline ftl::Handle onFlush(const std::function<bool(ftl::data::Frame&,ftl::codecs::Channel)> &cb) { return flush_.on(cb); } + + inline ftl::Handle onFlushSet(const std::function<bool(ftl::data::FrameSet&,ftl::codecs::Channel)> &cb) { return flush_fs_.on(cb); } + size_t size(FrameID id); size_t size(); @@ -43,6 +50,9 @@ class Pool { size_t max_n_; size_t ideal_n_; + ftl::Handler<ftl::data::Frame&,ftl::codecs::Channel> flush_; + ftl::Handler<ftl::data::FrameSet&,ftl::codecs::Channel> flush_fs_; + MUTEX mutex_; PoolData &_getPool(FrameID); diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index 8a3961f74..e925b95a8 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -310,6 +310,8 @@ class Frame { inline Pool *pool() const { return pool_; } + inline FrameMode mode() const { return mode_; } + protected: std::any &createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t); diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp index 63d85f6bf..d0dc21698 100644 --- a/components/structures/include/ftl/data/new_frameset.hpp +++ b/components/structures/include/ftl/data/new_frameset.hpp @@ -34,6 +34,7 @@ class FrameSet : public ftl::data::Frame { public: FrameSet(Pool *ppool, FrameID pid, int64_t ts); + ~FrameSet(); //int id=0; //int64_t timestamp; // Millisecond timestamp of all frames diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp index 46fcf3f1f..a95768dea 100644 --- a/components/structures/src/frameset.cpp +++ b/components/structures/src/frameset.cpp @@ -8,6 +8,10 @@ FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts) : Frame(ppool->allocate } +FrameSet::~FrameSet() { + if (status() != ftl::data::FrameStatus::FLUSHED) flush(); +} + void ftl::data::FrameSet::completed(size_t ix) { if (ix == 255) { @@ -75,11 +79,36 @@ const ftl::data::Frame &ftl::data::FrameSet::firstFrame() const { } void FrameSet::flush() { - for (auto &f : frames) f.flush(); + if (status() == ftl::data::FrameStatus::FLUSHED) throw FTL_Error("Cannot flush frameset multiple times"); + + // Build list of all changed but unflushed channels. + std::unordered_set<ftl::codecs::Channel> unflushed; + + { + UNIQUE_LOCK(smtx, lk); + for (auto &f : frames) { + for (auto &c : f.changed()) { + if (!f.flushed(c.first)) { + unflushed.emplace(c.first); + } + } + } + + for (auto &f : frames) f.flush(); + ftl::data::Frame::flush(); + } + + for (auto c : unflushed) { + pool()->flush_fs_.trigger(*this, c); + } } void FrameSet::flush(ftl::codecs::Channel c) { - for (auto &f : frames) f.flush(c); + { + UNIQUE_LOCK(smtx, lk); + for (auto &f : frames) f.flush(c); + } + pool()->flush_fs_.trigger(*this, c); } /** diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 6a1ef932c..c5a146ff4 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -389,6 +389,7 @@ void Session::flush(Frame &f) { if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; flush_.trigger(f, c.first); + f.pool()->flush_.trigger(f, c.first); } } else if (c.second == ftl::data::ChangeType::FOREIGN) { auto &d = f._getData(c.first); @@ -407,6 +408,7 @@ void Session::flush(Frame &f, ftl::codecs::Channel c) { if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; flush_.trigger(f, c); + f.pool()->flush_.trigger(f, c); } } else if (cc == ftl::data::ChangeType::FOREIGN) { auto &d = f._getData(c); -- GitLab