diff --git a/components/streams/include/ftl/streams/builder.hpp b/components/streams/include/ftl/streams/builder.hpp index 1b0da1b57eeed78d229e545303144d3ff1b2ab16..77c3ee848569305e3b57167fc9af73ce800f7b9a 100644 --- a/components/streams/include/ftl/streams/builder.hpp +++ b/components/streams/include/ftl/streams/builder.hpp @@ -45,11 +45,14 @@ class BaseBuilder : public ftl::data::Generator { inline const int id() const { return id_; } + inline const ftl::data::ChangeType changeType() const { return ctype_; } + protected: ftl::data::Pool *pool_; int id_; size_t size_; ftl::Handler<const ftl::data::FrameSetPtr&> cb_; + ftl::data::ChangeType ctype_ = ftl::data::ChangeType::COMPLETED; }; /** diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index 4575ce3c9a073ae25a1be13604db5af1dace6d7b..72d432ba61ca7fdac9cc8e766dd12d446e0a9fb6 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -38,11 +38,13 @@ BaseBuilder::~BaseBuilder() { // ============================================================================= LocalBuilder::LocalBuilder(ftl::data::Pool *pool, int id) : BaseBuilder(pool, id) { - + // Host receives responses that must propagate + ctype_ = ftl::data::ChangeType::FOREIGN; } LocalBuilder::LocalBuilder() : BaseBuilder() { - + // Host receives responses that must propagate + ctype_ = ftl::data::ChangeType::FOREIGN; } LocalBuilder::~LocalBuilder() { diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 1305a7dce43cc8b78c3b776e53d7107751f06060..3a919859665040db3660313041104373037ca63b 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -153,9 +153,10 @@ Receiver::InternalAudioStates &Receiver::_getAudioFrame(const StreamPacket &spkt } void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) { - auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number); + auto &build = builder(spkt.streamID); + auto fs = build.get(spkt.timestamp, spkt.frame_number); auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number]; - f.informChange(spkt.channel, ftl::data::ChangeType::FOREIGN, pkt); + f.informChange(spkt.channel, build.changeType(), pkt); const auto &sel = stream_->selected(spkt.frameSetID()); // & cs->available(spkt.frameSetID()); @@ -177,10 +178,11 @@ void Receiver::_processAudio(const StreamPacket &spkt, const Packet &pkt) { //frame.frame.reset(); state.timestamp = spkt.timestamp; - auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); + auto &build = builder(spkt.streamID); + auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); auto &frame = fs->frames[0]; - auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, ftl::data::ChangeType::FOREIGN, pkt); + auto &audiolist = frame.createChange<std::list<ftl::audio::Audio>>(spkt.channel, build.changeType(), pkt); auto &audio = audiolist.emplace_back(); //size_t size = pkt.data.size()/sizeof(short); @@ -243,7 +245,8 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { auto [tx,ty] = ftl::codecs::chooseTileConfig(pkt.frame_count); // Get the frameset - auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); // TODO: This is a hack + auto &build = builder(spkt.streamID); + auto fs = build.get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); // TODO: This is a hack if (!fs->frames[0].has(Channel::Calibration)) { LOG(WARNING) << "No calibration, skipping frame"; @@ -324,7 +327,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Add channel to frame and allocate memory if required const cv::Size size = cv::Size(width, height); - auto &buf = frame.createChange<ftl::rgbd::VideoFrame>(spkt.channel, ftl::data::ChangeType::FOREIGN, pkt).createGPU(); + auto &buf = frame.createChange<ftl::rgbd::VideoFrame>(spkt.channel, build.changeType(), pkt).createGPU(); buf.create(size, ftl::codecs::type(spkt.channel)); //(isFloatChannel(rchan) ? CV_32FC1 : CV_8UC4)); cv::Rect roi((i % tx)*width, (i / tx)*height, width, height); diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index b9ccbc6ce6e6324cc09582e525e14eff193cf4e0..86d1ee3ba0eeb4ff70d4b3ee584248ddb04ed539 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -426,13 +426,14 @@ void Session::flush(Frame &f) { if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; flush_.trigger(f, c.first); - f.pool()->flush_.trigger(f, c.first); + if (f.pool()) f.pool()->flush_.trigger(f, c.first); } } else if (c.second == ftl::data::ChangeType::FOREIGN) { auto &d = f._getData(c.first); if (d.status == ftl::data::ChannelStatus::DISPATCHED) { d.status = ftl::data::ChannelStatus::FLUSHED; - //flush_.trigger(f, c.first); + flush_.trigger(f, c.first); + if (f.pool()) f.pool()->flush_.trigger(f, c.first); } } } @@ -445,13 +446,14 @@ void Session::flush(Frame &f, ftl::codecs::Channel c) { if (d.status == ftl::data::ChannelStatus::VALID) { d.status = ftl::data::ChannelStatus::FLUSHED; flush_.trigger(f, c); - f.pool()->flush_.trigger(f, c); + if (f.pool()) f.pool()->flush_.trigger(f, c); } } else if (cc == ftl::data::ChangeType::FOREIGN) { auto &d = f._getData(c); if (d.status == ftl::data::ChannelStatus::DISPATCHED) { d.status = ftl::data::ChannelStatus::FLUSHED; - //flush_.trigger(f, c); + flush_.trigger(f, c); + if (f.pool()) f.pool()->flush_.trigger(f, c); } } } diff --git a/components/structures/test/CMakeLists.txt b/components/structures/test/CMakeLists.txt index d3fc22d9891aa0763a86aab528bcc1c49659437f..4a4154d403520eba304125bb12dc28f016aff613 100644 --- a/components/structures/test/CMakeLists.txt +++ b/components/structures/test/CMakeLists.txt @@ -2,7 +2,6 @@ add_executable(nframe_unit $<TARGET_OBJECTS:CatchTest> ./frame_unit.cpp - ../src/new_frame.cpp ) target_include_directories(nframe_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") target_link_libraries(nframe_unit diff --git a/components/structures/test/frame_unit.cpp b/components/structures/test/frame_unit.cpp index 4c1f183d101c0fdc873b335542d485709f4f692b..a092b391346b1c41e166d2997f5a9ec933bd79c1 100644 --- a/components/structures/test/frame_unit.cpp +++ b/components/structures/test/frame_unit.cpp @@ -24,6 +24,9 @@ class Pool { void release(Frame &f); Frame allocate(FrameID id, int64_t ts); + + ftl::Handler<ftl::data::Frame&,ftl::codecs::Channel> flush_; + ftl::Handler<ftl::data::FrameSet&,ftl::codecs::Channel> flush_fs_; }; } @@ -49,6 +52,11 @@ Frame ftl::data::Pool::allocate(FrameID id, int64_t ts) { return make(nullptr, id, ts); } +#define _FTL_DATA_FRAMEPOOL_HPP_ +#include <../src/new_frame.cpp> + + + /* #1.1.1 */ static_assert(sizeof(ftl::codecs::Channel) >= 4, "Channel must be at least 32bit");