diff --git a/components/structures/include/ftl/data/channels.hpp b/components/structures/include/ftl/data/channels.hpp index 341759b13dad344fac4c11888d0dd3a302dc8e3c..0f2b79b775ecca1aea7d91abca1a468077b7df79 100644 --- a/components/structures/include/ftl/data/channels.hpp +++ b/components/structures/include/ftl/data/channels.hpp @@ -19,7 +19,8 @@ enum class StorageMode { /** If a channel has changed, what is the current status of that change. */ enum class ChangeType { UNCHANGED, - LOCAL, // Explicit local modification occurred + PRIMARY, // Explicit local primary modification occurred + RESPONSE, // Explicit local response change FOREIGN, // Received externally, to be forwarded COMPLETED // Received externally, not to be forwarded }; diff --git a/components/structures/include/ftl/data/creators.hpp b/components/structures/include/ftl/data/creators.hpp new file mode 100644 index 0000000000000000000000000000000000000000..a68169e137877f6296ceea6cd04eee8c67f34846 --- /dev/null +++ b/components/structures/include/ftl/data/creators.hpp @@ -0,0 +1,50 @@ +#ifndef _FTL_DATA_FRAMECREATOR_HPP_ +#define _FTL_DATA_FRAMECREATOR_HPP_ + +namespace ftl { +namespace data { + +class Pool; + +/** + * Create frames on demand. + */ +class FrameCreator { + friend class Pool; + + public: + Frame create(); + Frame create(int64_t timestamp); + + inline uint32_t id() const { return id_; } + inline Pool *pool() const { return pool_; } + + private: + FrameCreator(Pool *p_pool, uint32_t p_id) : pool_(p_pool), id_(p_id) {} + + Pool *pool_; + uint32_t id_; +}; + +/** + * Create frames at the global frame rate with both capture and retrieve steps. + */ +class IntervalFrameCreator : public FrameCreator { + public: + ftl::Handle onCapture(const std::function<bool(int64_t)> &cb); + ftl::Handle onRetrieve(const std::function<bool(Frame&)> &cb); +}; + +/** + * Create a response frame rather than a primary frame. + */ +class ResponseFrameCreator : public FrameCreator { + public: + Frame create(); + Frame create(int64_t timestamp); +}; + +} +} + +#endif \ No newline at end of file diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index a5e34dbe566015e5074eb3bc1f13c92fbe76e0ef..84a598b661736fa9f7b82670abbff6be83199a9f 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -24,6 +24,11 @@ namespace data { class Session; class Pool; +enum FrameMode { + PRIMARY, + RESPONSE +}; + enum FrameStatus { CREATED, // Initial state, before store STORED, // Changed to this after call to `store` @@ -126,7 +131,7 @@ class Frame { T *getMutable(ftl::codecs::Channel c); inline void touch(ftl::codecs::Channel c) { - changed_[c] = ChangeType::LOCAL; + changed_[c] = (mode_ == FrameMode::PRIMARY) ? ChangeType::PRIMARY : ChangeType::RESPONSE; } inline void touch(ftl::codecs::Channel c, ChangeType type) { @@ -265,6 +270,7 @@ class Frame { Pool *pool_; Session *parent_; FrameStatus status_; + FrameMode mode_ = FrameMode::PRIMARY; inline void restart(int64_t ts) { timestamp_ = ts; diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index c980a6b3ed990365115fe88dca32064a3f2c14ce..b33a41424f094898fde88cac1b4aed3716aceb33 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -214,7 +214,7 @@ void Frame::moveTo(Frame &f) { void Frame::swapChanged(Frame &f) { for (auto x : changed_) { f.data_[x.first].data.swap(data_[x.first].data); - f.changed_[x.first] = ftl::data::ChangeType::LOCAL; + f.changed_[x.first] = (mode_ == FrameMode::PRIMARY) ? ChangeType::PRIMARY : ChangeType::RESPONSE; } } @@ -225,7 +225,7 @@ void Frame::swapChannel(ftl::codecs::Channel c, Frame &f) { fd.data.swap(d.data); d.status = ftl::data::ChannelStatus::VALID; changed_[c] = f.changed_[c]; - f.changed_[c] = ftl::data::ChangeType::LOCAL; + f.changed_[c] = (mode_ == FrameMode::PRIMARY) ? ChangeType::PRIMARY : ChangeType::RESPONSE; } } @@ -246,6 +246,7 @@ void Frame::hardReset() { Frame Frame::response() { if (!pool_) throw FTL_Error("Frame has no pool, cannot generate response"); Frame f = pool_->allocate(id_, ftl::timer::get_time()); + f.mode_ = FrameMode::RESPONSE; f.store(); return f; } @@ -272,7 +273,7 @@ void Session::notifyChanges(Frame &f) { void Session::flush(Frame &f) { // TODO: Lock for (auto c : f.changed()) { - if (c.second == ftl::data::ChangeType::LOCAL) { + if (c.second == ftl::data::ChangeType::PRIMARY || c.second == ftl::data::ChangeType::RESPONSE) { auto &d = f._getData(c.first); if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; @@ -291,7 +292,7 @@ void Session::flush(Frame &f) { void Session::flush(Frame &f, ftl::codecs::Channel c) { // TODO: Lock auto cc = f.changed_[c]; - if (cc == ftl::data::ChangeType::LOCAL) { + if (cc == ftl::data::ChangeType::PRIMARY || cc == ftl::data::ChangeType::RESPONSE) { auto &d = f._getData(c); if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; diff --git a/components/structures/test/frame_example_1.cpp b/components/structures/test/frame_example_1.cpp index 811feb60f1b470ce7b108918c43d0b01b6d9844c..e619b574a974d27247a4a05488de18be6721a390 100644 --- a/components/structures/test/frame_example_1.cpp +++ b/components/structures/test/frame_example_1.cpp @@ -22,7 +22,11 @@ class Feed { // Normally transmitted somewhere first. //buffer_.swapChannel(c, f); ChangeType cc = f.getChangeType(c); - buffer0_.createAnyChange(c, (cc == ChangeType::LOCAL) ? ChangeType::FOREIGN : ChangeType::COMPLETED) = f.getAnyMutable(c); + if (cc == ChangeType::RESPONSE) { + buffer1_.createAnyChange(c, ChangeType::FOREIGN) = f.getAnyMutable(c); + } else { + buffer0_.createAnyChange(c, (cc == ChangeType::PRIMARY) ? ChangeType::FOREIGN : ChangeType::COMPLETED) = f.getAnyMutable(c); + } return true; }); } @@ -30,17 +34,26 @@ class Feed { inline Frame &buffer() { return buffer0_; } inline void fakeDispatch() { - buffer0_.moveTo(buffer1_); + Frame f = std::move(buffer0_); buffer0_ = pool_.allocate(0,ftl::timer::get_time()); // Save any persistent changes - buffer1_.store(); + f.store(); // Transmit any forwarding changes and prevent further changes - buffer1_.flush(); // TODO: use special dispatched function + f.flush(); // TODO: use special dispatched function // Call the onFrame handler. // Would be in another thread in real version of this class. - frame_handler_.trigger(buffer1_); + frame_handler_.trigger(f); + } + + inline Frame getFrame() { + Frame f = std::move(buffer1_); + buffer1_ = pool_.allocate(0,ftl::timer::get_time()); + + // Save any persistent changes + f.store(); + return f; } inline ftl::Handle onFrame(const std::function<bool(Frame&)> &cb) { @@ -79,6 +92,73 @@ TEST_CASE("ftl::data::Frame full non-owner example", "[example]") { int changed = 0; ftl::Handle myhandle; + auto h = feed.onFrame([&i,&feed,&myhandle,&changed](Frame &f) { + i++; + + // First frame received + // User of Frame makes changes or reads values from state + REQUIRE( f.get<float>(Channel::Pose) == 6.0f ); + REQUIRE( f.get<VideoFrame>(Channel::Depth).gpudata == 1 ); + + // Create a new frame for same source for some return state + Frame nf = f.response(); + nf.create<std::list<std::string>>(Channel::Messages) = {"First Message"}; + nf.create<std::list<std::string>>(Channel::Messages) = {"Second Message"}; + nf.create<int>(Channel::Control) = 3456; + //nf.set<float>(Channel::Pose) = 7.0f; + + // Listen for this `Control` change to be confirmed + myhandle = nf.onChange(Channel::Control, [&changed](Frame &f, Channel c) { + changed++; + return false; // Call once only + }); + + // Either by destruction or manually, final action is flush to send + nf.flush(); + + return true; + }); + + // Generate some incoming changes from network + // Usually this is done inside the Feed class... + feed.buffer().createChange<VideoFrame>(Channel::Colour, ChangeType::FOREIGN).gpudata = 1; + feed.buffer().createChange<VideoFrame>(Channel::Depth, ChangeType::COMPLETED).gpudata = 1; + feed.buffer().createChange<float>(Channel::Pose, ChangeType::FOREIGN) = 6.0f; + + // Fake a frame being completely received on network or from file + feed.fakeDispatch(); + + // Now pretend to be an owner and create a new frame... it should have the + // response data in it, so check for that. + { + Frame f = feed.getFrame(); + REQUIRE( changed == 1 ); // Change notified before `onFrame` + REQUIRE( f.get<float>(Channel::Pose) == 6.0f ); + REQUIRE( f.get<int>(Channel::Control) == 3456 ); + REQUIRE( (*f.get<std::list<std::string>>(Channel::Messages).begin()) == "First Message" ); + } + // We wont bother dispatching this new frame + //feed.fakeDispatch(); + + REQUIRE( i == 1 ); + + // For testing only... + ftl::data::clearRegistry(); +} + +TEST_CASE("ftl::data::Frame full owner example", "[example]") { + // Register channels somewhere at startup + ftl::data::make_channel<VideoFrame>(Channel::Colour, "colour", StorageMode::TRANSIENT); + ftl::data::make_channel<VideoFrame>(Channel::Depth, "depth", StorageMode::TRANSIENT); + ftl::data::make_channel<std::list<std::string>>(Channel::Messages, "messages", StorageMode::AGGREGATE); + ftl::data::make_channel<float>(Channel::Pose, "pose", StorageMode::PERSISTENT); + + Feed feed; + + int i=0; + int changed = 0; + ftl::Handle myhandle; + auto h = feed.onFrame([&i,&feed,&myhandle,&changed](Frame &f) { // First frame received if (i++ == 0 ) { @@ -91,7 +171,7 @@ TEST_CASE("ftl::data::Frame full non-owner example", "[example]") { nf.create<std::list<std::string>>(Channel::Messages) = {"First Message"}; nf.create<std::list<std::string>>(Channel::Messages) = {"Second Message"}; nf.create<int>(Channel::Control) = 3456; - //nf.set<float>(Channel::Pose) = 7.0f; + nf.set<float>(Channel::Pose) = 7.0f; // Listen for this `Control` change to be confirmed myhandle = nf.onChange(Channel::Control, [&changed](Frame &f, Channel c) { @@ -103,23 +183,29 @@ TEST_CASE("ftl::data::Frame full non-owner example", "[example]") { nf.flush(); // Second frame received } else { - REQUIRE( changed == 1 ); // Change notified before `onFrame` - REQUIRE( f.get<float>(Channel::Pose) == 6.0f ); - REQUIRE( f.get<int>(Channel::Control) == 3456 ); - REQUIRE( (*f.get<std::list<std::string>>(Channel::Messages).begin()) == "First Message" ); + } return true; }); - // Generate some incoming changes from network - // Usually this is done inside the Feed class... - feed.buffer().createChange<VideoFrame>(Channel::Colour, ChangeType::FOREIGN).gpudata = 1; - feed.buffer().createChange<VideoFrame>(Channel::Depth, ChangeType::COMPLETED).gpudata = 1; - feed.buffer().createChange<float>(Channel::Pose, ChangeType::FOREIGN) = 6.0f; - - // Fake a frame being received on network or from file + // Create an entirely new frame, destruction will send it. + { + Frame f = feed.getFrame(); + f.create<VideoFrame>(Channel::Colour).gpudata = 1; + f.create<VideoFrame>(Channel::Depth).gpudata = 1; + f.create<float>(Channel::Pose) = 6.0f; + } + // Trigger local onFrame callback with the above frame. feed.fakeDispatch(); - // And dispatch the response frame also + + // Create next new frame, now includes response changes + { + Frame f = feed.getFrame(); + REQUIRE( changed == 1 ); // Change notified before `onFrame` + REQUIRE( f.get<float>(Channel::Pose) == 7.0f ); + REQUIRE( f.get<int>(Channel::Control) == 3456 ); + REQUIRE( (*f.get<std::list<std::string>>(Channel::Messages).begin()) == "First Message" ); + } feed.fakeDispatch(); REQUIRE( i == 2 ); diff --git a/components/structures/test/frame_unit.cpp b/components/structures/test/frame_unit.cpp index 13f87524f6c4396e969a243a0152c888dcd40469..8108a83d3809b7ac2b3f42d8a83626e2e548e767 100644 --- a/components/structures/test/frame_unit.cpp +++ b/components/structures/test/frame_unit.cpp @@ -653,7 +653,7 @@ TEST_CASE("ftl::data::Frame merge is change", "[2.1.9]") { f2.untouch(Channel::Colour2); f2.merge(f1); - REQUIRE( f2.getChangeType(Channel::Colour) == ChangeType::LOCAL ); + REQUIRE( f2.getChangeType(Channel::Colour) == ChangeType::PRIMARY ); REQUIRE( !f2.changed(Channel::Colour2) ); } } @@ -840,7 +840,7 @@ TEST_CASE("ftl::data::Frame change type", "[2.3.3]") { REQUIRE( !f.changed(Channel::Pose) ); f.create<int>(Channel::Pose) = 55; - REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::LOCAL ); + REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::PRIMARY ); } SECTION("local change overrides foreign change") { @@ -851,7 +851,7 @@ TEST_CASE("ftl::data::Frame change type", "[2.3.3]") { f.store(); f.set<int>(Channel::Pose) = 66; - REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::LOCAL ); + REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::PRIMARY ); } SECTION("local change overrides completed change") { @@ -861,7 +861,7 @@ TEST_CASE("ftl::data::Frame change type", "[2.3.3]") { REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::COMPLETED ); f.store(); f.set<int>(Channel::Pose) = 66; - REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::LOCAL ); + REQUIRE( f.getChangeType(Channel::Pose) == ChangeType::PRIMARY ); } }