diff --git a/components/streams/include/ftl/streams/new_receiver.hpp b/components/streams/include/ftl/streams/new_receiver.hpp new file mode 100644 index 0000000000000000000000000000000000000000..8541dc853dc6abdf84a0965164e426212d199636 --- /dev/null +++ b/components/streams/include/ftl/streams/new_receiver.hpp @@ -0,0 +1,19 @@ +#ifndef _FTL_STREAMS_NRECEIVER_HPP_ +#define _FTL_STREAMS_NRECEIVER_HPP_ + +namespace ftl { +namespace streams { + +/** + * Responsible for managing the stream packet to frameset decoding process. + */ +class Receiver { + public: + + void process(ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &pkt, ftl::data::FrameSet &fs); +}; + +} +} + +#endif \ No newline at end of file diff --git a/components/structures/include/ftl/data/framepool.hpp b/components/structures/include/ftl/data/framepool.hpp index 204965ff77d6379559e7b5ed6ba8de19385119b6..35e1b9ee971b4a29383276df36724e1c60e1793a 100644 --- a/components/structures/include/ftl/data/framepool.hpp +++ b/components/structures/include/ftl/data/framepool.hpp @@ -26,7 +26,7 @@ class Pool { struct PoolData { std::list<ftl::data::Frame*> pool; ftl::data::Session session; - int64_t last_timestamp; + int64_t last_timestamp=0; }; std::unordered_map<uint32_t, PoolData> pool_; diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index b41b109584ba247e282af9481a314038c6ba8602..a5e34dbe566015e5074eb3bc1f13c92fbe76e0ef 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -238,6 +238,11 @@ class Frame { inline MUTEX &mutex(); + /** + * Generate a new frame to respond to this one. + */ + Frame response(); + protected: std::any &createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t); diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 8d191801828a29c3ebbb9f2d1100ff9e7e3c1e48..c980a6b3ed990365115fe88dca32064a3f2c14ce 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -1,5 +1,6 @@ #include <ftl/data/new_frame.hpp> #include <ftl/data/framepool.hpp> +#include <ftl/timer.hpp> using ftl::data::Frame; using ftl::data::Session; @@ -242,6 +243,13 @@ void Frame::hardReset() { data_.clear(); } +Frame Frame::response() { + if (!pool_) throw FTL_Error("Frame has no pool, cannot generate response"); + Frame f = pool_->allocate(id_, ftl::timer::get_time()); + f.store(); + return f; +} + // ==== Session ================================================================ ftl::Handle Session::onChange(uint32_t pid, ftl::codecs::Channel c, const std::function<bool(Frame&,ftl::codecs::Channel)> &cb) { diff --git a/components/structures/src/pool.cpp b/components/structures/src/pool.cpp index fb493939bb6bf1953dd0f87329909a8abc364f1c..21906fa90c77d4fa71ae27e2f2188f967ce2aba1 100644 --- a/components/structures/src/pool.cpp +++ b/components/structures/src/pool.cpp @@ -21,7 +21,7 @@ Frame Pool::allocate(uint32_t id, int64_t timestamp) { auto &pool = _getPool(id); if (timestamp < pool.last_timestamp) { - throw FTL_Error("New frame timestamp is older than previous: " << timestamp); + throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp); } // Add items as required diff --git a/components/structures/test/CMakeLists.txt b/components/structures/test/CMakeLists.txt index 653cfd3c455b53afbe86b3c3a13f811324b8e0f8..d3fc22d9891aa0763a86aab528bcc1c49659437f 100644 --- a/components/structures/test/CMakeLists.txt +++ b/components/structures/test/CMakeLists.txt @@ -3,7 +3,6 @@ add_executable(nframe_unit $<TARGET_OBJECTS:CatchTest> ./frame_unit.cpp ../src/new_frame.cpp - ./frame_example_1.cpp ) target_include_directories(nframe_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") target_link_libraries(nframe_unit @@ -11,6 +10,19 @@ target_link_libraries(nframe_unit add_test(NFrameUnitTest nframe_unit) +### Frame Example 1 ############################################################ +add_executable(frame_example_1 + $<TARGET_OBJECTS:CatchTest> + ../src/pool.cpp + ../src/new_frame.cpp + ./frame_example_1.cpp +) +target_include_directories(frame_example_1 PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(frame_example_1 + ftlcommon ftlcodecs) + +add_test(FrameEg1Test frame_example_1) + ### Pool Unit ################################################################## add_executable(pool_unit $<TARGET_OBJECTS:CatchTest> diff --git a/components/structures/test/frame_example_1.cpp b/components/structures/test/frame_example_1.cpp index f59c5b40b7c7ee8e4a6591b43d959a02886b5cb2..811feb60f1b470ce7b108918c43d0b01b6d9844c 100644 --- a/components/structures/test/frame_example_1.cpp +++ b/components/structures/test/frame_example_1.cpp @@ -1,6 +1,8 @@ #include "catch.hpp" #include <ftl/data/new_frame.hpp> +#include <ftl/data/framepool.hpp> +#include <ftl/timer.hpp> using ftl::data::Session; using ftl::data::Frame; @@ -14,8 +16,8 @@ namespace streams { /* Mock Feed class */ class Feed { public: - Feed() : buffer0_(nullptr, &session_,0,0), buffer1_(nullptr, &session_,0,0) { - flush_handle_ = session_.onFlush([this](Frame &f, Channel c) { + Feed() : pool_(5,10), buffer0_(std::move(pool_.allocate(0,0))), buffer1_(std::move(pool_.allocate(0,0))) { + flush_handle_ = pool_.session(0).onFlush([this](Frame &f, Channel c) { // Loop changes back to buffer. // Normally transmitted somewhere first. //buffer_.swapChannel(c, f); @@ -26,11 +28,10 @@ class Feed { } inline Frame &buffer() { return buffer0_; } - inline Session &session() { return session_; } inline void fakeDispatch() { buffer0_.moveTo(buffer1_); - buffer0_ = Frame(nullptr, &session_,0,0); + buffer0_ = pool_.allocate(0,ftl::timer::get_time()); // Save any persistent changes buffer1_.store(); @@ -46,16 +47,9 @@ class Feed { return frame_handler_.on(cb); } - Frame createFrame(int id) { - // TODO: Give it the id and a timestamp - Frame f(nullptr, &session_,0,0); - f.store(); - return f; - } - private: + ftl::data::Pool pool_; ftl::Handler<Frame&> frame_handler_; - Session session_; Frame buffer0_; Frame buffer1_; ftl::Handle flush_handle_; @@ -93,7 +87,7 @@ TEST_CASE("ftl::data::Frame full non-owner example", "[example]") { REQUIRE( f.get<VideoFrame>(Channel::Depth).gpudata == 1 ); // Create a new frame for same source for some return state - Frame nf = feed.createFrame(f.id()); + Frame nf = f.response(); nf.create<std::list<std::string>>(Channel::Messages) = {"First Message"}; nf.create<std::list<std::string>>(Channel::Messages) = {"Second Message"}; nf.create<int>(Channel::Control) = 3456; diff --git a/components/structures/test/frame_unit.cpp b/components/structures/test/frame_unit.cpp index 8ab5ef34ec23f25f291357650febad48b899bd19..13f87524f6c4396e969a243a0152c888dcd40469 100644 --- a/components/structures/test/frame_unit.cpp +++ b/components/structures/test/frame_unit.cpp @@ -21,6 +21,8 @@ class Pool { static Frame make(Session *s, int id, uint64_t ts) { return Frame(nullptr, s, id, ts); } void release(Frame &f); + + Frame allocate(uint32_t id, int64_t ts); }; } @@ -42,6 +44,10 @@ void ftl::data::Pool::release(Frame &f) { } +Frame ftl::data::Pool::allocate(uint32_t id, int64_t ts) { + return make(nullptr, id, ts); +} + /* #1.1.1 */ static_assert(sizeof(ftl::codecs::Channel) >= 4, "Channel must be at least 32bit");