diff --git a/components/common/cpp/include/ftl/transactional.hpp b/components/common/cpp/include/ftl/transactional.hpp index 7c1553ac5a98656a10160f7b84976074828605df..298fa5d2a9187c01e3bf244c72e8593316cb3ecb 100644 --- a/components/common/cpp/include/ftl/transactional.hpp +++ b/components/common/cpp/include/ftl/transactional.hpp @@ -24,12 +24,18 @@ class Transactional { if (completed_) completed_(ref_); } + Transactional(Transactional &&t) : ref_(t.ref_), mtx_(t.mtx_), lock_(mtx_), completed_(t.completed_) { + t.completed_ = nullptr; + } + Transactional &operator=(const Transactional &)=delete; T operator->() { return ref_; } - const T operator->() const { return ref_; } + T operator*() { return ref_; } + const T operator*() const { return ref_; } + private: T ref_; SHARED_MUTEX &mtx_; diff --git a/components/streams/include/ftl/streams/builder.hpp b/components/streams/include/ftl/streams/builder.hpp index f61375f83db377e10a7ae60b303ac46591aecead..3e50619db926d4f87b18952b3854f626b354f08e 100644 --- a/components/streams/include/ftl/streams/builder.hpp +++ b/components/streams/include/ftl/streams/builder.hpp @@ -4,65 +4,95 @@ #include <ftl/data/new_frameset.hpp> #include <ftl/data/framepool.hpp> #include <ftl/handle.hpp> +#include <ftl/transactional.hpp> #include <list> namespace ftl { namespace streams { +using LockedFrameSet = ftl::Transactional<ftl::data::FrameSet*>; + /** - * Accept frames and generate framesets as they become completed. This can - * directly act as a generator of framesets, each frameset being generated - * by the global timer. Once the expected number of frames have been received, - * a frameset is marked as complete and can then be passed to the callback at - * the appropriate time. If frames are generated faster than consumed then they - * are buffered and merged into a single frameset. The buffer has a limited size - * so a longer delay in a callback will cause buffer failures. If frames are - * generated below framerate then the on frameset callback is just not called. + * An abstract base class for a FrameSet database. A builder stores incomplete + * framesets whilst they are being created, allowing partial data to be buffered + * and searched for using timestamp and frameset id. One instance of a builder + * should be created for each frameset id. */ -class Builder { +class BaseBuilder { public: - Builder(ftl::data::Pool *pool, int id); - Builder(); - ~Builder(); + BaseBuilder(ftl::data::Pool *pool, int id); + BaseBuilder(); + virtual ~BaseBuilder(); - //inline void setID(int id) { id_ = id; } + virtual LockedFrameSet get(int64_t timestamp, size_t ix)=0; - inline ftl::Handle onFrameSet(const ftl::data::FrameSetCallback &cb) { return cb_.on(cb); } + virtual LockedFrameSet get(int64_t timestamp)=0; - /** - * Instead of pushing a frame, find or create a direct reference to one. - */ - std::shared_ptr<ftl::data::FrameSet> get(int64_t timestamp, size_t ix); + //void setName(const std::string &name); - /** - * Get the entire frameset for a given timestamp. - */ - std::shared_ptr<ftl::data::FrameSet> get(int64_t timestamp); + void setID(uint32_t id) { id_ = id; } + void setPool(ftl::data::Pool *p) { pool_ = p; } /** - * Mark a frame as completed. + * Retrieve an fps + latency pair, averaged since last call to this + * function. */ - void completed(int64_t ts, size_t ix); + //static std::pair<float,float> getStatistics(); - void markPartial(int64_t ts); + inline size_t size() const { return size_; } - void setName(const std::string &name); + protected: + ftl::data::Pool *pool_; + int id_; + size_t size_; +}; - void setBufferSize(size_t n) { bufferSize_ = n; } +/** + * A version of the frameset database that is used by sources or renderers to + * obtain new frames. Timestamps are not used in this version as only a single + * frameset is being buffered. + */ +class LocalBuilder : public BaseBuilder { + public: + LocalBuilder(ftl::data::Pool *pool, int id); + LocalBuilder(); + ~LocalBuilder(); - void setID(uint32_t id) { id_ = id; } - void setPool(ftl::data::Pool *p) { pool_ = p; } + LockedFrameSet get(int64_t timestamp, size_t ix) override; + + LockedFrameSet get(int64_t timestamp) override; + + void setFrameCount(size_t size); /** - * Retrieve an fps + latency pair, averaged since last call to this - * function. + * Return a smart pointer to a new frameset. The frameset will have the + * number of frames set with `setFrameCount`, or 1 frame by default. Once + * called, another new frameset is buffered internally and ownership of the + * returned frameset is transfered. */ - static std::pair<float,float> getStatistics(); + std::shared_ptr<ftl::data::FrameSet> getNextFrameSet(int64_t ts); - inline size_t size() const { return size_; } + private: + std::shared_ptr<ftl::data::FrameSet> frameset_; +}; + +class ForeignBuilder : public BaseBuilder { + public: + ForeignBuilder(ftl::data::Pool *pool, int id); + ForeignBuilder(); + ~ForeignBuilder(); + + //inline void setID(int id) { id_ = id; } + + inline ftl::Handle onFrameSet(const ftl::data::FrameSetCallback &cb) { return cb_.on(cb); } + + LockedFrameSet get(int64_t timestamp, size_t ix) override; + + LockedFrameSet get(int64_t timestamp) override; + + void setBufferSize(size_t n) { bufferSize_ = n; } private: - ftl::data::Pool *pool_; std::list<std::shared_ptr<ftl::data::FrameSet>> framesets_; // Active framesets //std::list<ftl::data::FrameSet*> allocated_; // Keep memory allocations @@ -72,11 +102,9 @@ class Builder { int mspf_; int64_t last_ts_; int64_t last_frame_; - int id_; std::atomic<int> jobs_; volatile bool skip_; ftl::Handle main_id_; - size_t size_; size_t bufferSize_; std::string name_; @@ -101,7 +129,7 @@ class Builder { void _schedule(); - void _recordStats(float fps, float latency); + //void _recordStats(float fps, float latency); }; } diff --git a/components/streams/include/ftl/streams/receiver.hpp b/components/streams/include/ftl/streams/receiver.hpp index 4d109a1f085e24504d3e92a9b094938e935e676b..75bb7e1bb98e18890a90012091dcfe307feead56 100644 --- a/components/streams/include/ftl/streams/receiver.hpp +++ b/components/streams/include/ftl/streams/receiver.hpp @@ -43,13 +43,13 @@ class Receiver : public ftl::Configurable, public ftl::data::Generator { //void onAudio(const ftl::audio::FrameSet::Callback &cb); - ftl::streams::Builder &builder(uint32_t id); + ftl::streams::BaseBuilder &builder(uint32_t id); private: ftl::stream::Stream *stream_; ftl::data::Pool *pool_; ftl::SingletonHandler<const ftl::data::FrameSetPtr&> callback_; - std::unordered_map<uint32_t, ftl::streams::Builder> builders_; + std::unordered_map<uint32_t, std::shared_ptr<ftl::streams::BaseBuilder>> builders_; std::list<ftl::Handle> handles_; ftl::codecs::Channel second_channel_; int64_t timestamp_; diff --git a/components/streams/include/ftl/streams/renderer.hpp b/components/streams/include/ftl/streams/renderer.hpp index 5323a16bdc9573924ee2567602787f653364caf7..ffb13fb286c3fede131e88c8220677cc9eac740c 100644 --- a/components/streams/include/ftl/streams/renderer.hpp +++ b/components/streams/include/ftl/streams/renderer.hpp @@ -22,7 +22,7 @@ public: void submit(const ftl::data::FrameSetPtr &fs); private: - ftl::streams::Builder builder_; + ftl::streams::LocalBuilder *builder_; ftl::Handle builder_handle_; ftl::SingletonHandler<const ftl::data::FrameSetPtr&> callback_; ftl::data::FrameSetPtr input_; diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index 356378513b8b62020cd1917d0e97f7d276e1fef0..97c209b893a79be77ae43a00b41101ced54b16a0 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -6,41 +6,57 @@ #include <chrono> -using ftl::streams::Builder; +using ftl::streams::BaseBuilder; +using ftl::streams::ForeignBuilder; +using ftl::streams::LocalBuilder; +using ftl::streams::LockedFrameSet; using ftl::data::FrameSet; using ftl::data::Frame; using namespace std::chrono; using std::this_thread::sleep_for; -float Builder::latency__ = 0.0f; +/*float Builder::latency__ = 0.0f; float Builder::fps__ = 0.0f; int Builder::stats_count__ = 0; -MUTEX Builder::msg_mutex__; +MUTEX Builder::msg_mutex__;*/ -Builder::Builder(ftl::data::Pool *pool, int id) : pool_(pool), head_(0), id_(id) { +BaseBuilder::BaseBuilder(ftl::data::Pool *pool, int id) : pool_(pool), id_(id) { + size_ = 1; +} + +BaseBuilder::BaseBuilder() : pool_(nullptr), id_(0) { + size_ = 1; +} + +BaseBuilder::~BaseBuilder() { + +} + + + + +ForeignBuilder::ForeignBuilder(ftl::data::Pool *pool, int id) : BaseBuilder(pool, id), head_(0) { jobs_ = 0; skip_ = false; bufferSize_ = 1; - size_ = 0; last_frame_ = 0; mspf_ = ftl::timer::getInterval(); name_ = "NoName"; } -Builder::Builder() : pool_(nullptr), head_(0) { +ForeignBuilder::ForeignBuilder() : BaseBuilder(), head_(0) { jobs_ = 0; skip_ = false; bufferSize_ = 1; - size_ = 0; last_frame_ = 0; mspf_ = ftl::timer::getInterval(); name_ = "NoName"; } -Builder::~Builder() { +ForeignBuilder::~ForeignBuilder() { main_id_.cancel(); UNIQUE_LOCK(mutex_, lk); @@ -50,15 +66,25 @@ Builder::~Builder() { } } -std::shared_ptr<ftl::data::FrameSet> Builder::get(int64_t timestamp) { +LockedFrameSet ForeignBuilder::get(int64_t timestamp) { if (timestamp <= 0) throw FTL_Error("Invalid frame timestamp"); UNIQUE_LOCK(mutex_, lk); - return _get(timestamp); + auto fs = _get(timestamp); + LockedFrameSet lfs(fs.get(), fs->smtx, [this](ftl::data::FrameSet *fs) { + // TODO: schedule completed + if (fs->isComplete()) { + if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { + UNIQUE_LOCK(mutex_, lk); + _schedule(); + } + } + }); + return lfs; } -std::shared_ptr<ftl::data::FrameSet> Builder::_get(int64_t timestamp) { +std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_get(int64_t timestamp) { if (timestamp <= last_frame_) { throw FTL_Error("Frameset already completed: " << timestamp); } @@ -79,12 +105,14 @@ std::shared_ptr<ftl::data::FrameSet> Builder::_get(int64_t timestamp) { return fs; } -std::shared_ptr<ftl::data::FrameSet> Builder::get(int64_t timestamp, size_t ix) { +LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) { if (ix == 255) { if (timestamp <= 0) throw FTL_Error("Invalid frame timestamp (" << timestamp << ")"); auto fs = _get(timestamp); if (!fs) throw FTL_Error("No frameset for time " << timestamp); - return fs; + + LockedFrameSet lfs(fs.get(), fs->smtx); + return lfs; } else { if (timestamp <= 0 || ix >= 32) throw FTL_Error("Invalid frame timestamp or index (" << timestamp << ", " << ix << ")"); @@ -105,11 +133,21 @@ std::shared_ptr<ftl::data::FrameSet> Builder::get(int64_t timestamp, size_t ix) } } - return fs; + LockedFrameSet lfs(fs.get(), fs->smtx, [this](ftl::data::FrameSet *fs) { + // TODO: schedule completed + if (fs->isComplete()) { + if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { + UNIQUE_LOCK(mutex_, lk); + _schedule(); + } + } + }); + + return lfs; } } -void Builder::completed(int64_t ts, size_t ix) { +/*void Builder::completed(int64_t ts, size_t ix) { std::shared_ptr<ftl::data::FrameSet> fs; { @@ -145,9 +183,9 @@ void Builder::completed(int64_t ts, size_t ix) { } else { LOG(ERROR) << "Completing frame that does not exist: " << ts << ":" << ix; } -} +}*/ -void Builder::markPartial(int64_t ts) { +/*void Builder::markPartial(int64_t ts) { std::shared_ptr<ftl::data::FrameSet> fs; { @@ -155,9 +193,9 @@ void Builder::markPartial(int64_t ts) { fs = _findFrameset(ts); if (fs) fs->set(ftl::data::FSFlag::PARTIAL); } -} +}*/ -void Builder::_schedule() { +void ForeignBuilder::_schedule() { if (size_ == 0) return; std::shared_ptr<ftl::data::FrameSet> fs; @@ -169,6 +207,9 @@ void Builder::_schedule() { // We have a frameset so create a thread job to call the onFrameset callback if (fs) { + // Lock to force completion of on going construction first + UNIQUE_LOCK(fs->smtx, slk); + jobs_++; ftl::pool.push([this,fs](int) { @@ -208,23 +249,14 @@ static void mergeFrameset(ftl::data::FrameSet &f1, ftl::data::FrameSet &f2) { }*/ } -void Builder::_recordStats(float fps, float latency) { +/*void Builder::_recordStats(float fps, float latency) { UNIQUE_LOCK(msg_mutex__, lk); latency__ += latency; fps__ += fps; ++stats_count__; +}*/ - /*if (fps_/float(stats_count_) <= float(stats_count_)) { - fps_ /= float(stats_count_); - latency_ /= float(stats_count_); - LOG(INFO) << name_ << ": fps = " << fps_ << ", latency = " << latency_; - fps_ = 0.0f; - latency_ = 0.0f; - stats_count_ = 0; - }*/ -} - -std::pair<float,float> Builder::getStatistics() { +/*std::pair<float,float> Builder::getStatistics() { UNIQUE_LOCK(msg_mutex__, lk); if (stats_count__ == 0.0f) return {0.0f,0.0f}; fps__ /= float(stats_count__); @@ -236,9 +268,9 @@ std::pair<float,float> Builder::getStatistics() { latency__ = 0.0f; stats_count__ = 0; return {fps,latency}; -} +}*/ -std::shared_ptr<ftl::data::FrameSet> Builder::_findFrameset(int64_t ts) { +std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_findFrameset(int64_t ts) { // Search backwards to find match for (auto f : framesets_) { if (f->timestamp() == ts) { @@ -255,7 +287,7 @@ std::shared_ptr<ftl::data::FrameSet> Builder::_findFrameset(int64_t ts) { * Get the most recent completed frameset that isn't stale. * Note: Must occur inside a mutex lock. */ -std::shared_ptr<ftl::data::FrameSet> Builder::_getFrameset() { +std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { //LOG(INFO) << "BUF SIZE = " << framesets_.size(); auto i = framesets_.begin(); @@ -297,7 +329,7 @@ std::shared_ptr<ftl::data::FrameSet> Builder::_getFrameset() { int64_t now = ftl::timer::get_time(); float framerate = 1000.0f / float(now - last_ts_); - _recordStats(framerate, now - f->timestamp()); + //_recordStats(framerate, now - f->timestamp()); last_ts_ = now; return f; //} @@ -306,7 +338,7 @@ std::shared_ptr<ftl::data::FrameSet> Builder::_getFrameset() { return nullptr; } -std::shared_ptr<ftl::data::FrameSet> Builder::_addFrameset(int64_t timestamp) { +std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timestamp) { if (framesets_.size() >= 32) { LOG(WARNING) << "Frameset buffer full, resetting: " << timestamp; framesets_.clear(); @@ -338,6 +370,6 @@ std::shared_ptr<ftl::data::FrameSet> Builder::_addFrameset(int64_t timestamp) { return newf; } -void Builder::setName(const std::string &name) { +/*void Builder::setName(const std::string &name) { name_ = name; -} +}*/ diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 792b3d7a961a552bf9e797caaed10f60e7c4223e..782c42e82034455ee400ad26d748d63103ed47d9 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -39,7 +39,7 @@ Receiver::Receiver(nlohmann::json &config, ftl::data::Pool *p) : ftl::Configurab on("frameset_buffer_size", [this](const ftl::config::Event &e) { size_t bsize = value("frameset_buffer_size", 3); for (auto &i : builders_) { - i.second.setBufferSize(bsize); + //i.second.setBufferSize(bsize); } }); @@ -56,20 +56,22 @@ Receiver::~Receiver() { //builder_[0].onFrameSet(nullptr); } -ftl::streams::Builder &Receiver::builder(uint32_t id) { +ftl::streams::BaseBuilder &Receiver::builder(uint32_t id) { auto i = builders_.find(id); if (i == builders_.end()) { + auto fb = new ftl::streams::ForeignBuilder(); + builders_[id] = std::shared_ptr<ftl::streams::BaseBuilder>(fb); auto &b = builders_[id]; - b.setID(id); - b.setPool(pool_); - b.setBufferSize(value("frameset_buffer_size", 3)); - handles_.push_back(std::move(b.onFrameSet([this](const ftl::data::FrameSetPtr& fs) { + b->setID(id); + b->setPool(pool_); + fb->setBufferSize(value("frameset_buffer_size", 3)); + handles_.push_back(std::move(fb->onFrameSet([this](const ftl::data::FrameSetPtr& fs) { callback_.trigger(fs); return true; }))); - return b; + return *b; } else { - return i->second; + return *(i->second); } } @@ -167,14 +169,14 @@ void Receiver::_processState(const StreamPacket &spkt, const Packet &pkt) { void Receiver::_processData(const StreamPacket &spkt, const Packet &pkt) { auto fs = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number); - auto &f = (spkt.frame_number == 255) ? *fs : fs->frames[spkt.frame_number]; + auto &f = (spkt.frame_number == 255) ? **fs : fs->frames[spkt.frame_number]; f.informChange(spkt.channel, ftl::data::ChangeType::FOREIGN, pkt); const auto &sel = stream_->selected(spkt.frameSetID()); // & cs->available(spkt.frameSetID()); if (f.hasAll(sel)) { timestamp_ = spkt.timestamp; - builder(spkt.streamID).completed(spkt.timestamp, spkt.frame_number); + fs->completed(spkt.frame_number); } } @@ -311,7 +313,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Mark a frameset as being partial if (pkt.flags & ftl::codecs::kFlagPartial) { - builder(spkt.streamID).markPartial(spkt.timestamp); + fs->markPartial(); } // Now split the tiles from surface into frames, doing colour conversions @@ -375,7 +377,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // TODO: Have multiple builders for different framesets. //builder_.push(frame.timestamp, spkt.frameNumber()+i, frame.frame); - builder(spkt.streamID).completed(spkt.timestamp, spkt.frame_number+i); + fs->completed(spkt.frame_number+i); // Check for any state changes and send them back //if (vidstate.state.hasChanged(Channel::Pose)) injectPose(stream_, frame, spkt.timestamp, spkt.frameNumber()+i); diff --git a/components/streams/src/renderer.cpp b/components/streams/src/renderer.cpp index e181439071978d96e0c1e2fc78cd07db44d429db..59f13b95befe4448ce83e3c35c43c8abfb38cc37 100644 --- a/components/streams/src/renderer.cpp +++ b/components/streams/src/renderer.cpp @@ -14,11 +14,12 @@ Renderer::Renderer(nlohmann::json &config, ftl::data::Pool *p, uint32_t fsid) : ftl::create<ftl::render::CUDARender>(this, std::string("vcam")+std::to_string(vcamcount++)) ); - builder_.setID(fsid); - builder_.setPool(p); - builder_.setBufferSize(3); // TODO + builder_ = new ftl::streams::LocalBuilder; - builder_handle_ = builder_.onFrameSet([this](const ftl::data::FrameSetPtr &fs){ + builder_->setID(fsid); + builder_->setPool(p); + + /*builder_handle_ = builder_.onFrameSet([this](const ftl::data::FrameSetPtr &fs){ auto &frame_out = builder_.get(fs->timestamp(), 0)->cast<ftl::rgbd::Frame>(); auto input = input_; @@ -31,12 +32,13 @@ Renderer::Renderer(nlohmann::json &config, ftl::data::Pool *p, uint32_t fsid) : builder_.get(fs->timestamp(), 0); return true; - }); + });*/ } void Renderer::submit(const ftl::data::FrameSetPtr &fs) { input_ = fs; - auto &frame_out = builder_.get(fs->timestamp(), 0)->cast<ftl::rgbd::Frame>(); + // FIXME: Need to keep the frameset shared_ptr around or it will be deleted. + auto &frame_out = builder_->getNextFrameSet(fs->timestamp())->frames[0].cast<ftl::rgbd::Frame>(); auto input = input_; renderer_->begin(frame_out, ftl::codecs::Channel::Left); renderer_->submit( @@ -46,9 +48,10 @@ void Renderer::submit(const ftl::data::FrameSetPtr &fs) { } Renderer::~Renderer() { + delete builder_; } ftl::Handle Renderer::onFrameSet(const std::function<bool(const ftl::data::FrameSetPtr&)> &cb) { //return callback_.on(cb); - return builder_.onFrameSet(cb); + //return builder_.onFrameSet(cb); } diff --git a/components/streams/test/builder_unit.cpp b/components/streams/test/builder_unit.cpp index 345f1885b3610586834572179787eceda5091cfc..1c044d3c51707ac717e3d599b2ab4a3d0e9f7c38 100644 --- a/components/streams/test/builder_unit.cpp +++ b/components/streams/test/builder_unit.cpp @@ -5,12 +5,12 @@ using ftl::data::Pool; using ftl::data::Frame; using ftl::data::FrameSet; -using ftl::streams::Builder; +using ftl::streams::ForeignBuilder; -TEST_CASE("ftl::streams::Builder can obtain a frameset", "[]") { +TEST_CASE("ftl::streams::ForeignBuilder can obtain a frameset", "[]") { SECTION("with one frame allocated") { Pool pool(2,5); - Builder builder(&pool, 44); + ForeignBuilder builder(&pool, 44); builder.get(100, 0); auto fs = builder.get(100); @@ -26,7 +26,7 @@ TEST_CASE("ftl::streams::Builder can obtain a frameset", "[]") { SECTION("with five frames allocated") { Pool pool(2,5); - Builder builder(&pool, 44); + ForeignBuilder builder(&pool, 44); builder.get(100, 4); builder.get(100, 0); @@ -41,16 +41,16 @@ TEST_CASE("ftl::streams::Builder can obtain a frameset", "[]") { } } -TEST_CASE("ftl::streams::Builder can complete a frame", "[]") { +TEST_CASE("ftl::streams::ForeignBuilder can complete a frame", "[]") { SECTION("with two frames allocated") { Pool pool(2,5); - Builder builder(&pool, 44); + ForeignBuilder builder(&pool, 44); builder.get(100, 1); builder.get(100, 0); auto fs = builder.get(100); - builder.completed(100, 0); + fs->completed(0); REQUIRE( fs->frameset() == 44 ); REQUIRE( fs->timestamp() == 100 ); @@ -61,10 +61,10 @@ TEST_CASE("ftl::streams::Builder can complete a frame", "[]") { } } -TEST_CASE("ftl::streams::Builder can complete a frameset", "[]") { +TEST_CASE("ftl::streams::ForeignBuilder can complete a frameset", "[]") { SECTION("with one frame allocated and no buffering") { Pool pool(2,5); - Builder builder(&pool, 44); + ForeignBuilder builder(&pool, 44); builder.setBufferSize(0); @@ -78,7 +78,7 @@ TEST_CASE("ftl::streams::Builder can complete a frameset", "[]") { return false; }); - builder.completed(100, 0); + fs->completed(0); // TODO: Find better way to wait... std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -87,7 +87,7 @@ TEST_CASE("ftl::streams::Builder can complete a frameset", "[]") { SECTION("with two frames allocated and no buffering") { Pool pool(2,5); - Builder builder(&pool, 44); + ForeignBuilder builder(&pool, 44); builder.setBufferSize(0); @@ -101,8 +101,8 @@ TEST_CASE("ftl::streams::Builder can complete a frameset", "[]") { return false; }); - builder.completed(100, 0); - builder.completed(100, 1); + fs->completed(0); + fs->completed(1); // TODO: Find better way to wait... std::this_thread::sleep_for(std::chrono::milliseconds(10)); @@ -111,7 +111,7 @@ TEST_CASE("ftl::streams::Builder can complete a frameset", "[]") { SECTION("does not complete a partial") { Pool pool(2,5); - Builder builder(&pool, 44); + ForeignBuilder builder(&pool, 44); builder.setBufferSize(0); @@ -125,7 +125,7 @@ TEST_CASE("ftl::streams::Builder can complete a frameset", "[]") { return false; }); - builder.completed(100, 1); + fs->completed(1); // TODO: Find better way to wait... std::this_thread::sleep_for(std::chrono::milliseconds(10)); diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp index 1f461e9d8d7b2b38b7729c848eaded9c7176a33d..63d85f6bf33c03f2b86c8fe2e57485a79126dde2 100644 --- a/components/structures/include/ftl/data/new_frameset.hpp +++ b/components/structures/include/ftl/data/new_frameset.hpp @@ -42,7 +42,7 @@ class FrameSet : public ftl::data::Frame { std::atomic<int> count; // Number of valid frames std::atomic<unsigned int> mask; // Mask of all sources that contributed //bool stale; // True if buffers have been invalidated - //SHARED_MUTEX mtx; + SHARED_MUTEX smtx; Eigen::Matrix4d pose; // Set to identity by default. @@ -62,6 +62,20 @@ class FrameSet : public ftl::data::Frame { */ void moveTo(ftl::data::FrameSet &); + /** + * Mark a frame as being completed. This modifies the mask and count + * members. + */ + void completed(size_t ix); + + inline void markPartial() { + set(ftl::data::FSFlag::PARTIAL); + } + + /** + * Are all frames complete within this frameset? + */ + inline bool isComplete() { return count == frames.size(); } /** * Check that a given frame is valid in this frameset. @@ -91,6 +105,14 @@ class FrameSet : public ftl::data::Frame { void resize(size_t s); + /** + * Force a change to all frame timestamps. This is generally used internally + * to allow frameset buffering in advance of knowing an exact timestamp. + * The method will update the timestamps of all contained frames and the + * frameset itself. + */ + void changeTimestamp(int64_t ts); + /** * Make a frameset from a single frame. It borrows the pool, id and * timestamp from the frame and creates a wrapping frameset instance. diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp index bf2ceeb15291fe069304eb9012c6b425496d6c72..5f7c736225866f7ae61174f13dc5d69f30e9fb91 100644 --- a/components/structures/src/frameset.cpp +++ b/components/structures/src/frameset.cpp @@ -8,6 +8,20 @@ FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts) : Frame(ppool->allocate } +void ftl::data::FrameSet::completed(size_t ix) { + if (ix == 255) { + + } else if (ix < frames.size()) { + // If already completed for given frame, then skip + if (mask & (1 << ix)) return; + + mask |= (1 << ix); + ++count; + } else { + LOG(ERROR) << "Completing frame that does not exist: " << timestamp() << ":" << ix; + } +} + void ftl::data::FrameSet::resize(size_t s) { while (frames.size() < s) { frames.push_back(std::move(pool()->allocate(FrameID(frameset(), frames.size()), timestamp())));