diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index c41f4d2f2a815c1c4b4b06db76798104cd333275..20106e841bb4db67cc8dbe9806d9a939af427ee1 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -195,7 +195,6 @@ ForeignBuilder::ForeignBuilder(ftl::data::Pool *pool, int id) : BaseBuilder(pool last_frame_ = 0; mspf_ = ftl::timer::getInterval(); - name_ = "NoName"; } ForeignBuilder::ForeignBuilder() : BaseBuilder(), head_(0) { @@ -205,7 +204,6 @@ ForeignBuilder::ForeignBuilder() : BaseBuilder(), head_(0) { last_frame_ = 0; mspf_ = ftl::timer::getInterval(); - name_ = "NoName"; } ForeignBuilder::~ForeignBuilder() { @@ -278,16 +276,14 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) { auto fs = _get(timestamp); if (ix >= fs->frames.size()) { - //throw FTL_Error("Frame index out-of-bounds - " << ix << "(" << fs->frames.size() << ")"); - - // FIXME: This is really dangerous + // FIXME: Check that no access to frames can occur without lock + UNIQUE_LOCK(fs->smtx, flk); while (fs->frames.size() < size_) { fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp()))); } } LockedFrameSet lfs(fs.get(), fs->smtx, [this,fs](ftl::data::FrameSet *d) { - // TODO: schedule completed if (fs->isComplete()) { if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE)) { UNIQUE_LOCK(mutex_, lk); @@ -300,54 +296,6 @@ LockedFrameSet ForeignBuilder::get(int64_t timestamp, size_t ix) { } } -/*void Builder::completed(int64_t ts, size_t ix) { - std::shared_ptr<ftl::data::FrameSet> fs; - - { - UNIQUE_LOCK(mutex_, lk); - fs = _findFrameset(ts); - } - - if (fs && ix == 255) { - // Note: Frameset can't complete without frames - //if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE) && static_cast<unsigned int>(fs->count) >= size_) { - // UNIQUE_LOCK(mutex_, lk); - // _schedule(); - //} - } else if (fs && ix < fs->frames.size()) { - { - UNIQUE_LOCK(fs->mutex(), lk2); - - // If already completed for given frame, then skip - if (fs->mask & (1 << ix)) return; - - //states_[ix] = fs->frames[ix].origin(); - fs->mask |= (1 << ix); - ++fs->count; - } - - //LOG(INFO) << "COMPLETE FRAME : " << fs->timestamp() << " " << fs->count << "(" << size_ << ")"; - - // No buffering, so do a schedule here for immediate effect - if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE) && static_cast<unsigned int>(fs->count) >= size_) { - UNIQUE_LOCK(mutex_, lk); - _schedule(); - } - } else { - LOG(ERROR) << "Completing frame that does not exist: " << ts << ":" << ix; - } -}*/ - -/*void Builder::markPartial(int64_t ts) { - std::shared_ptr<ftl::data::FrameSet> fs; - - { - UNIQUE_LOCK(mutex_, lk); - fs = _findFrameset(ts); - if (fs) fs->set(ftl::data::FSFlag::PARTIAL); - } -}*/ - void ForeignBuilder::_schedule() { if (size_ == 0) return; std::shared_ptr<ftl::data::FrameSet> fs; @@ -366,7 +314,6 @@ void ForeignBuilder::_schedule() { // Calling onFrameset but without all frames so mark as partial if (static_cast<size_t>(fs->count) < fs->frames.size()) fs->set(ftl::data::FSFlag::PARTIAL); - //for (auto &f : fs->frames) f.store(); fs->store(); //UNIQUE_LOCK(fs->mutex(), lk2); @@ -391,32 +338,7 @@ void ForeignBuilder::_schedule() { } -static void mergeFrameset(ftl::data::FrameSet &f1, ftl::data::FrameSet &f2) { - // Prepend all frame encodings in f2 into corresponding frame in f1. - /*for (size_t i=0; i<f1.frames.size(); ++i) { - if (f2.frames.size() <= i) break; - f1.frames[i].mergeEncoding(f2.frames[i]); - }*/ -} - -/*void Builder::_recordStats(float fps, float latency) { - UNIQUE_LOCK(msg_mutex__, lk); - latency__ += latency; - fps__ += fps; - ++stats_count__; -}*/ - std::pair<float,float> BaseBuilder::getStatistics() { - /*UNIQUE_LOCK(msg_mutex__, lk); - if (stats_count__ == 0.0f) return {0.0f,0.0f}; - fps__ /= float(stats_count__); - latency__ /= float(stats_count__); - float fps = fps__; - float latency = latency__; - //LOG(INFO) << name_ << ": fps = " << fps_ << ", latency = " << latency_; - fps__ = 0.0f; - latency__ = 0.0f; - stats_count__ = 0;*/ return {-1.0f, -1.0f}; } @@ -438,8 +360,6 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_findFrameset(int64_t ts) { * Note: Must occur inside a mutex lock. */ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { - //LOG(INFO) << "BUF SIZE = " << framesets_.size(); - auto i = framesets_.begin(); int N = bufferSize_; @@ -453,17 +373,17 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { if (i != framesets_.end()) { auto f = *i; - last_frame_ = f->timestamp(); - + // Lock to force completion of on going construction first UNIQUE_LOCK(f->smtx, slk); + last_frame_ = f->timestamp(); auto j = framesets_.erase(i); f->set(ftl::data::FSFlag::STALE); slk.unlock(); int count = 0; - // Merge all previous frames - // TODO: Remove? + // Remove all previous framesets + // FIXME: Should do this in reverse order. while (j!=framesets_.end()) { ++count; @@ -474,18 +394,8 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_getFrameset() { UNIQUE_LOCK(f2->smtx, lk); j = framesets_.erase(j); } - mergeFrameset(*f,*f2); - - //LOG(INFO) << "EXTRA REMOVED"; - //_freeFrameset(f2); } - //if (count > 0) LOG(INFO) << "COUNT = " << count; - - int64_t now = ftl::timer::get_time(); - //float framerate = 1000.0f / float(now - last_ts_); - //_recordStats(framerate, now - f->timestamp()); - last_ts_ = now; return f; } @@ -498,7 +408,7 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timest framesets_.clear(); } - auto newf = std::make_shared<FrameSet>(pool_, ftl::data::FrameID(id_,255), timestamp); + auto newf = std::make_shared<FrameSet>(pool_, ftl::data::FrameID(id_,255), timestamp, size_); for (size_t i=0; i<size_; ++i) { newf->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(id_, i), timestamp))); } @@ -506,9 +416,7 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timest newf->count = 0; newf->mask = 0; newf->clearFlags(); - //newf->frames.resize(size_); - newf->pose.setIdentity(); - //newf->clearData(); + newf->pose.setIdentity(); // Deprecated? // Insertion sort by timestamp for (auto i=framesets_.begin(); i!=framesets_.end(); i++) { @@ -523,7 +431,3 @@ std::shared_ptr<ftl::data::FrameSet> ForeignBuilder::_addFrameset(int64_t timest framesets_.push_back(newf); return newf; } - -/*void Builder::setName(const std::string &name) { - name_ = name; -}*/ diff --git a/components/structures/include/ftl/data/new_frameset.hpp b/components/structures/include/ftl/data/new_frameset.hpp index 93b6d4ee4003ab59068c0ddc1770561e4cf0fc35..368456f2057fd9a94853677a8dbc6340e7162eb3 100644 --- a/components/structures/include/ftl/data/new_frameset.hpp +++ b/components/structures/include/ftl/data/new_frameset.hpp @@ -33,7 +33,7 @@ class FrameSet : public ftl::data::Frame { public: - FrameSet(Pool *ppool, FrameID pid, int64_t ts); + FrameSet(Pool *ppool, FrameID pid, int64_t ts, size_t psize=1); ~FrameSet(); //int id=0; diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp index 330ccf6592025a6c36b3f36da4d1b3e31b6fffb8..d804c68d0b96bc84857d6cf9e5e3dba7a22b0e38 100644 --- a/components/structures/src/frameset.cpp +++ b/components/structures/src/frameset.cpp @@ -1,15 +1,14 @@ #include <ftl/data/new_frameset.hpp> #include <ftl/data/framepool.hpp> -#include <loguru.hpp> - using ftl::data::Frame; using ftl::data::FrameSet; -FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts) : +FrameSet::FrameSet(Pool *ppool, FrameID pid, int64_t ts, size_t psize) : Frame(ppool->allocate(FrameID(pid.frameset(),255), ts)), mask(0) { flush_count = 0; // Reset flush on store... + frames.reserve(psize); } FrameSet::~FrameSet() { @@ -27,7 +26,7 @@ void ftl::data::FrameSet::completed(size_t ix) { mask |= (1 << ix); ++count; } else { - LOG(ERROR) << "Completing frame that does not exist: " << timestamp() << ":" << ix; + throw FTL_Error("Completing frame that does not exist: " << timestamp() << ":" << ix); } } @@ -47,25 +46,15 @@ void ftl::data::FrameSet::resize(size_t s) { void ftl::data::FrameSet::moveTo(ftl::data::FrameSet &fs) { UNIQUE_LOCK(fs.mutex(), lk); - //std::unique_lock<std::mutex> lk(fs.mutex()); // FIXME: was a shared mutex - - //if (fs.frames.size() != frames.size()) { - // Assume "this" is correct and "fs" is not. - //fs.frames.resize(frames.size()); - //} - Frame::moveTo(fs); fs.count = static_cast<int>(count); fs.flags_ = (int)flags_; fs.mask = static_cast<unsigned int>(mask); fs.pose = pose; - - /*for (size_t i=0; i<frames.size(); ++i) { - frames[i].moveTo(fs.frames[i]); - }*/ - fs.frames = std::move(frames); + count = 0; + mask = 0; set(ftl::data::FSFlag::STALE); } @@ -95,7 +84,6 @@ void FrameSet::store() { { //UNIQUE_LOCK(smtx, lk); - for (auto &f : frames) if (f.status() == ftl::data::FrameStatus::CREATED) f.store(); ftl::data::Frame::store(); } diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index d2b319d5969c9b66ae74932fd4218ab8fdbcedd0..594124c58bd7cd2d66f3f0ff0c65cfa44394075b 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -261,7 +261,7 @@ void Frame::store() { if (!parent_) return; - //UNIQUE_LOCK(parent_->mutex(), lk); + UNIQUE_LOCK(parent_->mutex(), lk); for (auto c : changed_) { if (ftl::data::isPersistent(c.first) && hasOwn(c.first)) {