diff --git a/components/common/cpp/include/ftl/handle.hpp b/components/common/cpp/include/ftl/handle.hpp index 92b99fdcc5f9430c70655696cf794faf089d18b6..dcf694447386578357484af4c25ced7b78cfc5fd 100644 --- a/components/common/cpp/include/ftl/handle.hpp +++ b/components/common/cpp/include/ftl/handle.hpp @@ -15,7 +15,7 @@ struct BaseHandler { inline Handle make_handle(BaseHandler*, int); protected: - MUTEX mutex_; + std::mutex mutex_; int id_=0; }; diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index 83086135a4e535d7f2c4f8ce03ab07dadbe871e4..6dc0023594ee17ab713f4bddc1f24e90f6a22c55 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -7,7 +7,7 @@ #define POOL_SIZE 10 -//#define DEBUG_MUTEX +#define DEBUG_MUTEX #define MUTEX_TIMEOUT 5 #if defined DEBUG_MUTEX diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index c869b882d2ceaf1194c1643a07b8994657f6d4c5..91fa4cfa250df2388fee63cf0e25faff01aebbdd 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -90,12 +90,12 @@ ftl::data::Frame &Builder::get(int64_t timestamp, size_t ix) { auto fs = _get(timestamp); - //if (ix >= fs->frames.size()) { - //throw FTL_Error("Frame index out-of-bounds - " << ix << "(" << fs->frames.size() << ")"); - while (fs->frames.size() < size_) { - fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp()))); - } - //} + if (ix >= fs->frames.size()) { + throw FTL_Error("Frame index out-of-bounds - " << ix << "(" << fs->frames.size() << ")"); + //while (fs->frames.size() < size_) { + // fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp()))); + //} + } //if (fs->frames.size() < size_) fs->frames.resize(size_); @@ -128,7 +128,7 @@ void Builder::completed(int64_t ts, size_t ix) { ++fs->count; } - LOG(INFO) << "COMPLETE FRAME : " << fs->timestamp() << " " << fs->count << "(" << size_ << ")"; + //LOG(INFO) << "COMPLETE FRAME : " << fs->timestamp() << " " << fs->count << "(" << size_ << ")"; // No buffering, so do a schedule here for immediate effect if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE) && static_cast<unsigned int>(fs->count) >= size_) { @@ -165,14 +165,14 @@ void Builder::_schedule() { jobs_++; ftl::pool.push([this,fs](int) { - UNIQUE_LOCK(fs->mutex(), lk2); - // Calling onFrameset but without all frames so mark as partial if (static_cast<size_t>(fs->count) < fs->frames.size()) fs->set(ftl::data::FSFlag::PARTIAL); for (auto &f : fs->frames) f.store(); fs->store(); + //UNIQUE_LOCK(fs->mutex(), lk2); + try { cb_.trigger(fs); } catch(const ftl::exception &e) { diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp index a3e74d6ba362dfe9f431cfb74513861300b661c1..7ddbfdb6a0f83cb9db3c53eca92df8ae8afc6e17 100644 --- a/components/streams/src/filestream.cpp +++ b/components/streams/src/filestream.cpp @@ -212,6 +212,8 @@ bool File::tick(int64_t ts) { try { if (cb_) cb_(spkt, pkt); + } catch (const ftl::exception &e) { + LOG(ERROR) << "Exception in packet callback: " << e.what() << e.trace(); } catch (std::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what(); } diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 78cb826b3db168beda34397e12870339fb51e4ac..5ebab5c28c8ac0e7804f6cb760598d7622764168 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -144,11 +144,11 @@ void Receiver::_processState(const StreamPacket &spkt, const Packet &pkt) { for (int i=0; i<pkt.frame_count; ++i) { InternalVideoStates &frame = _getVideoFrame(spkt,i); - LOG(INFO) << "GOT STATE " << (int)spkt.streamID << "," << (int)spkt.frame_number; - if (spkt.channel == Channel::Calibration) { auto &f = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number); f.createChange<ftl::rgbd::Camera>(Channel::Calibration, ftl::data::ChangeType::FOREIGN) = parseCalibration(pkt); + + LOG(INFO) << "GOT STATE " << (int)spkt.streamID << "," << (int)spkt.frame_number << " = " << (int)f.status(); } // Deal with the special channels... @@ -349,10 +349,6 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { const auto *cs = stream_; const auto &sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); - for (auto &a : sel) { - LOG(INFO) << " -- selected " << (int)a; - } - //frame.create<cv::cuda::GpuMat>(spkt.channel); if (i == 0) { diff --git a/components/structures/include/ftl/data/framepool.hpp b/components/structures/include/ftl/data/framepool.hpp index 83041a5c9f4ec92bf10ad579fda95e19d294a01c..4413dd2368ca59ffd07a2a762c027a14eb6b47a1 100644 --- a/components/structures/include/ftl/data/framepool.hpp +++ b/components/structures/include/ftl/data/framepool.hpp @@ -43,6 +43,8 @@ class Pool { size_t max_n_; size_t ideal_n_; + MUTEX mutex_; + PoolData &_getPool(FrameID); }; diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index 51820affcfee983e613726f33e7e12bdf0328285..64543d8e3297867d027d0fad0a11db7c75f52f6d 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -315,6 +315,11 @@ class Frame { timestamp_ = ts; status_ = FrameStatus::CREATED; } + + /** + * Primary frames also store on flush. + */ + void _primaryStore(); }; class Session : public Frame { @@ -433,7 +438,7 @@ const T &ftl::data::Frame::get(ftl::codecs::Channel c) const { auto *p = std::any_cast<T>(&d.data); if (!p) throw FTL_Error("'get' wrong type for channel (" << static_cast<unsigned int>(c) << ")"); return *p; - } else throw FTL_Error("Missing channel (" << static_cast<unsigned int>(c) << ")"); + } else throw FTL_Error("Missing channel (" << static_cast<unsigned int>(c) << ") for (" << frameset() << "," << source() << ")"); } // Non-list version diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp index 44584d131bccf0ccd7d51bd8c55766f8c6243a72..3c1874f2a665975c946a992ce5b40e64895cb2e2 100644 --- a/components/structures/src/frameset.cpp +++ b/components/structures/src/frameset.cpp @@ -16,8 +16,8 @@ void ftl::data::FrameSet::resize(size_t s) { } void ftl::data::FrameSet::moveTo(ftl::data::FrameSet &fs) { - //UNIQUE_LOCK(fs.mtx, lk); - std::unique_lock<std::mutex> lk(fs.mutex()); // FIXME: was a shared mutex + UNIQUE_LOCK(fs.mutex(), lk); + //std::unique_lock<std::mutex> lk(fs.mutex()); // FIXME: was a shared mutex //if (fs.frames.size() != frames.size()) { // Assume "this" is correct and "fs" is not. diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index e55d5ab939ab22c611d6926570afc7d25d776f4a..fc4e92006ecda3b6953b109b2f7dd8cf6c9ab48c 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -80,7 +80,7 @@ bool ftl::data::Frame::has(ftl::codecs::Channel c) const { Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) { if (status_ == FrameStatus::RELEASED) throw FTL_Error("Reading a released frame"); const auto &i = data_.find(c); - if (i != data_.end()) { + if (i != data_.end() && i->second.status != ChannelStatus::INVALID) { return i->second; } else if (parent_) { return parent_->_getData(c); @@ -90,7 +90,7 @@ Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) { const Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) const { if (status_ == FrameStatus::RELEASED) throw FTL_Error("Reading a released frame"); const auto &i = data_.find(c); - if (i != data_.end()) { + if (i != data_.end() && i->second.status != ChannelStatus::INVALID) { return i->second; } else if (parent_) { return parent_->_getData(c); @@ -100,6 +100,8 @@ const Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) const { std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t) { if (status_ != FrameStatus::CREATED) throw FTL_Error("Cannot apply change after store " << static_cast<int>(status_)); + //UNIQUE_LOCK(mutex(), lk); + auto &d = data_[c]; if (d.status != ftl::data::ChannelStatus::FLUSHED) { d.status = ftl::data::ChannelStatus::DISPATCHED; @@ -114,6 +116,8 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t, ftl::codecs::Packet &data) { if (status_ != FrameStatus::CREATED) throw FTL_Error("Cannot apply change after store " << static_cast<int>(status_)); + //UNIQUE_LOCK(mutex(), lk); + auto &d = data_[c]; if (d.status != ftl::data::ChannelStatus::FLUSHED) { d.status = ftl::data::ChannelStatus::DISPATCHED; @@ -128,6 +132,8 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t std::any &Frame::createAny(ftl::codecs::Channel c) { if (status_ != FrameStatus::STORED) throw FTL_Error("Cannot create before store or after flush"); + //UNIQUE_LOCK(mutex(), lk); + auto &d = data_[c]; if (d.status != ftl::data::ChannelStatus::FLUSHED) { d.status = ftl::data::ChannelStatus::VALID; @@ -162,6 +168,7 @@ bool Frame::flush() { for (auto c : changed_) { _getData(c.first).status = ChannelStatus::FLUSHED; } + _primaryStore(); return true; } @@ -183,6 +190,32 @@ void Frame::store() { if (!parent_) return; + //UNIQUE_LOCK(parent_->mutex(), lk); + + for (auto c : changed_) { + if (ftl::data::isPersistent(c.first) && hasOwn(c.first)) { + auto &d = data_[c.first]; + auto &pd = parent_->data_[c.first]; + pd.data = d.data; + //pd.encoded = std::move(d.encoded); + pd.status = ChannelStatus::VALID; + //data_.erase(c.first); + } + + parent_->change_.trigger(*this, c.first); + uint64_t sig = (uint64_t(id()) << 32) + static_cast<unsigned int>(c.first); + const auto &i = parent_->change_channel_.find(sig); + + if (i != parent_->change_channel_.end()) i->second.trigger(*this, c.first); + } +} + +void Frame::_primaryStore() { + if (mode_ == FrameMode::RESPONSE) return; + if (!parent_) return; + + //UNIQUE_LOCK(parent_->mutex(), lk); + for (auto c : changed_) { if (ftl::data::isPersistent(c.first) && hasOwn(c.first)) { auto &d = data_[c.first]; @@ -196,6 +229,7 @@ void Frame::store() { parent_->change_.trigger(*this, c.first); uint64_t sig = (uint64_t(id()) << 32) + static_cast<unsigned int>(c.first); const auto &i = parent_->change_channel_.find(sig); + if (i != parent_->change_channel_.end()) i->second.trigger(*this, c.first); } } @@ -216,6 +250,7 @@ void Frame::moveTo(Frame &f) { f.id_ = id_; f.timestamp_ = timestamp_; f.status_ = status_; + f.mode_ = mode_; f.parent_ = parent_; f.pool_ = pool_; f.data_ = std::move(data_); @@ -262,6 +297,7 @@ void Frame::reset() { } changed_.clear(); status_ = FrameStatus::CREATED; + mode_ = FrameMode::PRIMARY; } void Frame::hardReset() { @@ -304,7 +340,6 @@ void Session::notifyChanges(Frame &f) { } void Session::flush(Frame &f) { - // TODO: Lock for (auto c : f.changed()) { if (c.second == ftl::data::ChangeType::PRIMARY || c.second == ftl::data::ChangeType::RESPONSE) { auto &d = f._getData(c.first); @@ -323,7 +358,6 @@ void Session::flush(Frame &f) { } void Session::flush(Frame &f, ftl::codecs::Channel c) { - // TODO: Lock auto cc = f.changed_[c]; if (cc == ftl::data::ChangeType::PRIMARY || cc == ftl::data::ChangeType::RESPONSE) { auto &d = f._getData(c); diff --git a/components/structures/src/pool.cpp b/components/structures/src/pool.cpp index 9c1839860bb34fe1eaec5960b50ceab116182751..d022f54746a7af9fab752e1001e22d001f18c236 100644 --- a/components/structures/src/pool.cpp +++ b/components/structures/src/pool.cpp @@ -9,6 +9,7 @@ Pool::Pool(size_t min_n, size_t max_n) : min_n_(min_n), max_n_(max_n) { } Pool::~Pool() { + UNIQUE_LOCK(mutex_, lk); for (auto &p : pool_) { for (auto *f : p.second.pool) { f->status_ = FrameStatus::RELEASED; @@ -18,31 +19,40 @@ Pool::~Pool() { } Frame Pool::allocate(FrameID id, int64_t timestamp) { - auto &pool = _getPool(id); + Frame *f; - if (timestamp < pool.last_timestamp) { - throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp); - } + { + UNIQUE_LOCK(mutex_, lk); + auto &pool = _getPool(id); - // Add items as required - if (pool.pool.size() < min_n_) { - while (pool.pool.size() < ideal_n_) { - pool.pool.push_back(new Frame(this, &pool.session, id, 0)); + if (timestamp < pool.last_timestamp) { + throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp); } + + // Add items as required + if (pool.pool.size() < min_n_) { + while (pool.pool.size() < ideal_n_) { + pool.pool.push_back(new Frame(this, &pool.session, id, 0)); + } + } + + f = pool.pool.front(); + pool.pool.pop_front(); + pool.last_timestamp = timestamp; } - Frame *f = pool.pool.front(); Frame ff = std::move(*f); - delete f; ff.restart(timestamp); - pool.pool.pop_front(); - pool.last_timestamp = timestamp; + delete f; + return ff; } void Pool::release(Frame &f) { if (f.status() == FrameStatus::RELEASED) return; f.reset(); + + UNIQUE_LOCK(mutex_, lk); auto &pool = _getPool(f.id()); if (pool.pool.size() < max_n_) { @@ -53,16 +63,19 @@ void Pool::release(Frame &f) { } ftl::data::Session &Pool::session(FrameID id) { + UNIQUE_LOCK(mutex_, lk); auto &pool = _getPool(id); return pool.session; } size_t Pool::size(FrameID id) { + UNIQUE_LOCK(mutex_, lk); auto &pool = _getPool(id); return pool.pool.size(); } size_t Pool::size() { + UNIQUE_LOCK(mutex_, lk); size_t s = 0; for (auto &p : pool_) { s += p.second.pool.size(); diff --git a/components/structures/test/frame_unit.cpp b/components/structures/test/frame_unit.cpp index ffb81142e0b8872b72904081a44fbf50e2db5d12..74647ab4013394d8cb76e9ca9c788a21389fbbca 100644 --- a/components/structures/test/frame_unit.cpp +++ b/components/structures/test/frame_unit.cpp @@ -345,6 +345,17 @@ TEST_CASE("ftl::data::Frame persistent data", "[1.2.5]") { REQUIRE( x == y ); } + SECTION("get from parent not ptr") { + Session p; + Frame f = Feed::make(&p, FrameID(0,0), 0); + f.store(); + + p.create<int>(Channel::Pose) = 55; + + auto x = f.get<int>(Channel::Pose); + REQUIRE( x == 55 ); + } + SECTION("has from parent") { Session p; Frame f = Feed::make(&p, FrameID(0,0), 0); @@ -772,7 +783,7 @@ TEST_CASE("ftl::data::Frame multiple flush", "[Frame]") { TEST_CASE("ftl::data::Frame locality of changes", "[2.2.4]") { ftl::data::make_channel<int>(Channel::Density, "density", ftl::data::StorageMode::PERSISTENT); - SECTION("not persistent after flush only") { + SECTION("persistent after flush only for primary frame") { Session p; Frame f = Feed::make(&p, FrameID(0,0), 0); f.store(); @@ -788,9 +799,30 @@ TEST_CASE("ftl::data::Frame locality of changes", "[2.2.4]") { e.ignore(); err = true; } - REQUIRE( err ); + REQUIRE( !err ); } + // FIXME: Need a way to change frame mode or generate response frame. + /*SECTION("not persistent after flush only for response frame") { + Session p; + Frame ff = Feed::make(&p, FrameID(0,0), 0); + ff.store(); + Frame f = ff.response(); + + f.create<int>(Channel::Density) = 44; + f.flush(); + + bool err=false; + + try { + p.get<int>(Channel::Density); + } catch(const ftl::exception &e) { + e.ignore(); + err = true; + } + REQUIRE( err ); + }*/ + SECTION("not persistent without store") { Session p; Frame f = Feed::make(&p, FrameID(0,0), 0); diff --git a/components/structures/test/pool_unit.cpp b/components/structures/test/pool_unit.cpp index 6476d00c6d2ac91c3592dfbe36bf8d7f63872f37..141acfb0de7af204f71e7ce8aad87c9668982f00 100644 --- a/components/structures/test/pool_unit.cpp +++ b/components/structures/test/pool_unit.cpp @@ -121,3 +121,107 @@ TEST_CASE("ftl::data::Pool excessive allocations", "[5.5]") { } } +TEST_CASE("ftl::data::Pool persistent sessions", "[]") { + SECTION("persistent across timetstamps") { + Pool pool(10,20); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 10); + f.store(); + f.create<int>(Channel::Pose) = 567; + } + + REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) ); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 20); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 567 ); + } + } + + SECTION("persistent across many timetstamps") { + Pool pool(10,20); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 10); + f.store(); + f.create<int>(Channel::Pose) = 567; + } + + REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) ); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 20); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 567 ); + } + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 30); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 567 ); + } + } + + SECTION("persistent across frames and timetstamps") { + Pool pool(10,20); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 10); + f.store(); + f.create<int>(Channel::Pose) = 567; + } + + { + Frame f = pool.allocate(ftl::data::FrameID(0,1), 10); + f.store(); + f.create<int>(Channel::Pose) = 568; + } + + REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) ); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 20); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 567 ); + } + + { + Frame f = pool.allocate(ftl::data::FrameID(0,1), 20); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 568 ); + } + } + + SECTION("persistent across framesets and timetstamps") { + Pool pool(10,20); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 10); + f.store(); + f.create<int>(Channel::Pose) = 567; + } + + { + Frame f = pool.allocate(ftl::data::FrameID(1,0), 10); + f.store(); + f.create<int>(Channel::Pose) = 568; + } + + REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) ); + + { + Frame f = pool.allocate(ftl::data::FrameID(0,0), 20); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 567 ); + } + + { + Frame f = pool.allocate(ftl::data::FrameID(1,0), 20); + f.store(); + REQUIRE( f.get<int>(Channel::Pose) == 568 ); + } + } +} +