diff --git a/applications/tools/simple_viewer/main.cpp b/applications/tools/simple_viewer/main.cpp index e8d10030aa21a5360c6c37cf096c17e82e2ecda1..c63c36cc847f8e022701849c5c055753743c7ff4 100644 --- a/applications/tools/simple_viewer/main.cpp +++ b/applications/tools/simple_viewer/main.cpp @@ -134,8 +134,11 @@ static void run(ftl::Configurable *root) { stream->begin(); stream->select(0, Channel::Colour + Channel::Depth, true); - handles.push_back(std::move(pool.session(ftl::data::FrameID(0,0)).onFlush([sender](ftl::data::Frame &f, ftl::codecs::Channel c) { - sender->post(f, c); + handles.push_back(std::move(pool.onFlush([sender](ftl::data::Frame &f, ftl::codecs::Channel c) { + // Send only reponse channels on a per frame basis + if (f.mode() == ftl::data::FrameMode::RESPONSE) { + sender->post(f, c); + } return true; }))); } @@ -152,6 +155,8 @@ static void run(ftl::Configurable *root) { } int k = cv::waitKey(10); + + // Send the key back to vision node (TESTING) if (k >= 0) { auto rf = fs->firstFrame().response(); rf.create<int>(Channel::Control) = k; diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index 9aa57e3ca800da18426185d6c5027f336c14918a..fe22f9526298c01be2a5f68354611812cc910dc3 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -163,18 +163,12 @@ static void run(ftl::Configurable *root) { receiver->setStream(outstream); receiver->registerBuilder(creatorptr); - // Listen for any flush events for frameset 0 - /*auto flushh = pool.group(0).onFlush([](ftl::data::Frame &f, ftl::codecs::Channel c) { - // Send to sender for encoding - - if (c == ftl::codecs::Channel::Colour) { - cv::Mat tmp; - f.get<cv::cuda::GpuMat>(ftl::codecs::Channel::Colour).download(tmp); - cv::imshow("Image", tmp); - cv::waitKey(1); - } + // Send channels on flush + auto flushhandle = pool.onFlushSet([sender](ftl::data::FrameSet &fs, ftl::codecs::Channel c) { + // TODO: Check the channel to see if it should be sent or not + sender->post(fs, c); return true; - });*/ + }); int stats_count = 0; int frames = 0; @@ -191,9 +185,6 @@ static void run(ftl::Configurable *root) { if (busy) return true; busy = true; - // Lock colour right now to encode in parallel - fs->flush(ftl::codecs::Channel::Colour); - // TODO: Remove, this is debug code if (fs->firstFrame().changed(ftl::codecs::Channel::Control)) { LOG(INFO) << "Got control: " << fs->firstFrame().get<int>(ftl::codecs::Channel::Control); @@ -203,7 +194,10 @@ static void run(ftl::Configurable *root) { ftl::pool.push([sender,&stats_count,&latency,&frames,pipeline,&busy,fs](int id) { // Do pipeline here... pipeline->apply(*fs, *fs); - if (fs->firstFrame().has(ftl::codecs::Channel::Depth)) sender->post(*fs, ftl::codecs::Channel::Depth); + //if (fs->firstFrame().has(ftl::codecs::Channel::Depth)) fs->flush(ftl::codecs::Channel::Depth); //sender->post(*fs, ftl::codecs::Channel::Depth); + + // Send any remaining channels... + fs->flush(); ++frames; latency += float(ftl::timer::get_time() - fs->timestamp()); @@ -219,8 +213,8 @@ static void run(ftl::Configurable *root) { busy = false; }); - // Actually send colour in this original thread - sender->post(*fs, ftl::codecs::Channel::Colour); + // Lock colour right now to encode in parallel + fs->flush(ftl::codecs::Channel::Colour); return true; }); diff --git a/components/structures/include/ftl/data/framepool.hpp b/components/structures/include/ftl/data/framepool.hpp index 4413dd2368ca59ffd07a2a762c027a14eb6b47a1..ab619531ee4a2f52115272add67d14f0585747c2 100644 --- a/components/structures/include/ftl/data/framepool.hpp +++ b/components/structures/include/ftl/data/framepool.hpp @@ -11,6 +11,9 @@ namespace ftl { namespace data { class Pool { + friend class Session; + friend class FrameSet; + public: explicit Pool(size_t min_n, size_t max_n); ~Pool(); @@ -21,6 +24,10 @@ class Pool { ftl::data::Session &session(FrameID id); inline ftl::data::Session &group(FrameID id) { return session(id); } + inline ftl::Handle onFlush(const std::function<bool(ftl::data::Frame&,ftl::codecs::Channel)> &cb) { return flush_.on(cb); } + + inline ftl::Handle onFlushSet(const std::function<bool(ftl::data::FrameSet&,ftl::codecs::Channel)> &cb) { return flush_fs_.on(cb); } + size_t size(FrameID id); size_t size(); @@ -43,6 +50,9 @@ class Pool { size_t max_n_; size_t ideal_n_; + ftl::Handler<ftl::data::Frame&,ftl::codecs::Channel> flush_; + ftl::Handler<ftl::data::FrameSet&,ftl::codecs::Channel> flush_fs_; + MUTEX mutex_; PoolData &_getPool(FrameID); diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index 8a3961f748d457663fb996fe649b8dfac5e7e87d..e925b95a8811d6d4cc106cd388f983c6859b6bba 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -310,6 +310,8 @@ class Frame { inline Pool *pool() const { return pool_; } + inline FrameMode mode() const { return mode_; } + protected: std::any &createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t); diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp index 63d85f6bf33c03f2b86c8fe2e57485a79126dde2..d0dc216985e9dc70d53fa217dcf88679f76fb22a 100644 --- a/components/structures/include/ftl/data/new_frameset.hpp +++ b/components/structures/include/ftl/data/new_frameset.hpp @@ -34,6 +34,7 @@ class FrameSet : public ftl::data::Frame { public: FrameSet(Pool *ppool, FrameID pid, int64_t ts); + ~FrameSet(); //int id=0; //int64_t timestamp; // Millisecond timestamp of all frames diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp index 46fcf3f1f34c10bc709adc8cfe1cf6ff86032a76..a95768dea63bc72dd83cd7f77008af6c12eff3cd 100644 --- a/components/structures/src/frameset.cpp +++ b/components/structures/src/frameset.cpp @@ -8,6 +8,10 @@ FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts) : Frame(ppool->allocate } +FrameSet::~FrameSet() { + if (status() != ftl::data::FrameStatus::FLUSHED) flush(); +} + void ftl::data::FrameSet::completed(size_t ix) { if (ix == 255) { @@ -75,11 +79,36 @@ const ftl::data::Frame &ftl::data::FrameSet::firstFrame() const { } void FrameSet::flush() { - for (auto &f : frames) f.flush(); + if (status() == ftl::data::FrameStatus::FLUSHED) throw FTL_Error("Cannot flush frameset multiple times"); + + // Build list of all changed but unflushed channels. + std::unordered_set<ftl::codecs::Channel> unflushed; + + { + UNIQUE_LOCK(smtx, lk); + for (auto &f : frames) { + for (auto &c : f.changed()) { + if (!f.flushed(c.first)) { + unflushed.emplace(c.first); + } + } + } + + for (auto &f : frames) f.flush(); + ftl::data::Frame::flush(); + } + + for (auto c : unflushed) { + pool()->flush_fs_.trigger(*this, c); + } } void FrameSet::flush(ftl::codecs::Channel c) { - for (auto &f : frames) f.flush(c); + { + UNIQUE_LOCK(smtx, lk); + for (auto &f : frames) f.flush(c); + } + pool()->flush_fs_.trigger(*this, c); } /** diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 6a1ef932cf4724ef7e772e3025fe34806bac870f..c5a146ff426b906d086dca9e036b4f2abaa7f3a5 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -389,6 +389,7 @@ void Session::flush(Frame &f) { if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; flush_.trigger(f, c.first); + f.pool()->flush_.trigger(f, c.first); } } else if (c.second == ftl::data::ChangeType::FOREIGN) { auto &d = f._getData(c.first); @@ -407,6 +408,7 @@ void Session::flush(Frame &f, ftl::codecs::Channel c) { if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; flush_.trigger(f, c); + f.pool()->flush_.trigger(f, c); } } else if (cc == ftl::data::ChangeType::FOREIGN) { auto &d = f._getData(c);