From dc28ce01bfc0e63d9302dd59da4b691e8fed41ea Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Tue, 16 Jun 2020 15:05:12 +0300
Subject: [PATCH] Fix for bad get data in frame

---
 components/common/cpp/include/ftl/handle.hpp  |   2 +-
 components/common/cpp/include/ftl/threads.hpp |   2 +-
 components/streams/src/builder.cpp            |  18 +--
 components/streams/src/filestream.cpp         |   2 +
 components/streams/src/receiver.cpp           |   8 +-
 .../structures/include/ftl/data/framepool.hpp |   2 +
 .../structures/include/ftl/data/new_frame.hpp |   7 +-
 components/structures/src/frameset.cpp        |   4 +-
 components/structures/src/new_frame.cpp       |  42 ++++++-
 components/structures/src/pool.cpp            |  37 +++++--
 components/structures/test/frame_unit.cpp     |  36 +++++-
 components/structures/test/pool_unit.cpp      | 104 ++++++++++++++++++
 12 files changed, 226 insertions(+), 38 deletions(-)

diff --git a/components/common/cpp/include/ftl/handle.hpp b/components/common/cpp/include/ftl/handle.hpp
index 92b99fdcc..dcf694447 100644
--- a/components/common/cpp/include/ftl/handle.hpp
+++ b/components/common/cpp/include/ftl/handle.hpp
@@ -15,7 +15,7 @@ struct BaseHandler {
 	inline Handle make_handle(BaseHandler*, int);
 
 	protected:
-	MUTEX mutex_;
+	std::mutex mutex_;
 	int id_=0;
 };
 
diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp
index 83086135a..6dc002359 100644
--- a/components/common/cpp/include/ftl/threads.hpp
+++ b/components/common/cpp/include/ftl/threads.hpp
@@ -7,7 +7,7 @@
 
 #define POOL_SIZE 10
 
-//#define DEBUG_MUTEX
+#define DEBUG_MUTEX
 #define MUTEX_TIMEOUT 5
 
 #if defined DEBUG_MUTEX
diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp
index c869b882d..91fa4cfa2 100644
--- a/components/streams/src/builder.cpp
+++ b/components/streams/src/builder.cpp
@@ -90,12 +90,12 @@ ftl::data::Frame &Builder::get(int64_t timestamp, size_t ix) {
 
 	auto fs = _get(timestamp);
 
-	//if (ix >= fs->frames.size()) {
-		//throw FTL_Error("Frame index out-of-bounds - " << ix << "(" << fs->frames.size() << ")");
-		while (fs->frames.size() < size_) {
-			fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp())));
-		}
-	//}
+	if (ix >= fs->frames.size()) {
+		throw FTL_Error("Frame index out-of-bounds - " << ix << "(" << fs->frames.size() << ")");
+		//while (fs->frames.size() < size_) {
+		//	fs->frames.push_back(std::move(pool_->allocate(ftl::data::FrameID(fs->frameset(), + fs->frames.size()), fs->timestamp())));
+		//}
+	}
 
 	//if (fs->frames.size() < size_) fs->frames.resize(size_);
 
@@ -128,7 +128,7 @@ void Builder::completed(int64_t ts, size_t ix) {
 			++fs->count;
 		}
 
-		LOG(INFO) << "COMPLETE FRAME : " << fs->timestamp() << " " << fs->count << "(" << size_ << ")";
+		//LOG(INFO) << "COMPLETE FRAME : " << fs->timestamp() << " " << fs->count << "(" << size_ << ")";
 
 		// No buffering, so do a schedule here for immediate effect
 		if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE) && static_cast<unsigned int>(fs->count) >= size_) {
@@ -165,14 +165,14 @@ void Builder::_schedule() {
 		jobs_++;
 
 		ftl::pool.push([this,fs](int) {
-			UNIQUE_LOCK(fs->mutex(), lk2);
-
 			// Calling onFrameset but without all frames so mark as partial
 			if (static_cast<size_t>(fs->count) < fs->frames.size()) fs->set(ftl::data::FSFlag::PARTIAL);
 
 			for (auto &f : fs->frames) f.store();
 			fs->store();
 
+			//UNIQUE_LOCK(fs->mutex(), lk2);
+
 			try {
 				cb_.trigger(fs);
 			} catch(const ftl::exception &e) {
diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp
index a3e74d6ba..7ddbfdb6a 100644
--- a/components/streams/src/filestream.cpp
+++ b/components/streams/src/filestream.cpp
@@ -212,6 +212,8 @@ bool File::tick(int64_t ts) {
 
 					try {
 						if (cb_) cb_(spkt, pkt);
+					} catch (const ftl::exception &e) {
+						LOG(ERROR) << "Exception in packet callback: " << e.what() << e.trace();
 					} catch (std::exception &e) {
 						LOG(ERROR) << "Exception in packet callback: " << e.what();
 					}
diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp
index 78cb826b3..5ebab5c28 100644
--- a/components/streams/src/receiver.cpp
+++ b/components/streams/src/receiver.cpp
@@ -144,11 +144,11 @@ void Receiver::_processState(const StreamPacket &spkt, const Packet &pkt) {
 	for (int i=0; i<pkt.frame_count; ++i) {
 		InternalVideoStates &frame = _getVideoFrame(spkt,i);
 
-		LOG(INFO) << "GOT STATE " << (int)spkt.streamID << "," << (int)spkt.frame_number;
-
 		if (spkt.channel == Channel::Calibration) {
 			auto &f = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number);
 			f.createChange<ftl::rgbd::Camera>(Channel::Calibration, ftl::data::ChangeType::FOREIGN) = parseCalibration(pkt);
+
+			LOG(INFO) << "GOT STATE " << (int)spkt.streamID << "," << (int)spkt.frame_number << " = " << (int)f.status();
 		}
 
 		// Deal with the special channels...
@@ -349,10 +349,6 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) {
 		const auto *cs = stream_;
 		const auto &sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID());
 
-		for (auto &a : sel) {
-			LOG(INFO) << " -- selected " << (int)a;
-		}
-
 		//frame.create<cv::cuda::GpuMat>(spkt.channel);
 
 		if (i == 0) {
diff --git a/components/structures/include/ftl/data/framepool.hpp b/components/structures/include/ftl/data/framepool.hpp
index 83041a5c9..4413dd236 100644
--- a/components/structures/include/ftl/data/framepool.hpp
+++ b/components/structures/include/ftl/data/framepool.hpp
@@ -43,6 +43,8 @@ class Pool {
 	size_t max_n_;
 	size_t ideal_n_;
 
+	MUTEX mutex_;
+
 	PoolData &_getPool(FrameID);
 };
 
diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp
index 51820affc..64543d8e3 100644
--- a/components/structures/include/ftl/data/new_frame.hpp
+++ b/components/structures/include/ftl/data/new_frame.hpp
@@ -315,6 +315,11 @@ class Frame {
 		timestamp_ = ts;
 		status_ = FrameStatus::CREATED;
 	}
+
+	/**
+	 * Primary frames also store on flush.
+	 */
+	void _primaryStore();
 };
 
 class Session : public Frame {
@@ -433,7 +438,7 @@ const T &ftl::data::Frame::get(ftl::codecs::Channel c) const {
 		auto *p = std::any_cast<T>(&d.data);
 		if (!p) throw FTL_Error("'get' wrong type for channel (" << static_cast<unsigned int>(c) << ")");
 		return *p;
-	} else throw FTL_Error("Missing channel (" << static_cast<unsigned int>(c) << ")");
+	} else throw FTL_Error("Missing channel (" << static_cast<unsigned int>(c) << ") for (" << frameset() << "," << source() << ")");
 }
 
 // Non-list version
diff --git a/components/structures/src/frameset.cpp b/components/structures/src/frameset.cpp
index 44584d131..3c1874f2a 100644
--- a/components/structures/src/frameset.cpp
+++ b/components/structures/src/frameset.cpp
@@ -16,8 +16,8 @@ void ftl::data::FrameSet::resize(size_t s) {
 }
 
 void ftl::data::FrameSet::moveTo(ftl::data::FrameSet &fs) {
-	//UNIQUE_LOCK(fs.mtx, lk);
-	std::unique_lock<std::mutex> lk(fs.mutex());  // FIXME: was a shared mutex
+	UNIQUE_LOCK(fs.mutex(), lk);
+	//std::unique_lock<std::mutex> lk(fs.mutex());  // FIXME: was a shared mutex
 
 	//if (fs.frames.size() != frames.size()) {
 		// Assume "this" is correct and "fs" is not.
diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp
index e55d5ab93..fc4e92006 100644
--- a/components/structures/src/new_frame.cpp
+++ b/components/structures/src/new_frame.cpp
@@ -80,7 +80,7 @@ bool ftl::data::Frame::has(ftl::codecs::Channel c) const {
 Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) {
 	if (status_ == FrameStatus::RELEASED) throw FTL_Error("Reading a released frame");
 	const auto &i = data_.find(c);
-	if (i != data_.end()) {
+	if (i != data_.end() && i->second.status != ChannelStatus::INVALID) {
 		return i->second;
 	} else if (parent_) {
 		return parent_->_getData(c);
@@ -90,7 +90,7 @@ Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) {
 const Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) const {
 	if (status_ == FrameStatus::RELEASED) throw FTL_Error("Reading a released frame");
 	const auto &i = data_.find(c);
-	if (i != data_.end()) {
+	if (i != data_.end() && i->second.status != ChannelStatus::INVALID) {
 		return i->second;
 	} else if (parent_) {
 		return parent_->_getData(c);
@@ -100,6 +100,8 @@ const Frame::ChannelData &Frame::_getData(ftl::codecs::Channel c) const {
 std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t) {
 	if (status_ != FrameStatus::CREATED) throw FTL_Error("Cannot apply change after store " << static_cast<int>(status_));
 
+	//UNIQUE_LOCK(mutex(), lk);
+
 	auto &d = data_[c];
 	if (d.status != ftl::data::ChannelStatus::FLUSHED) {
 		d.status = ftl::data::ChannelStatus::DISPATCHED;
@@ -114,6 +116,8 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t
 std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t, ftl::codecs::Packet &data) {
 	if (status_ != FrameStatus::CREATED) throw FTL_Error("Cannot apply change after store " << static_cast<int>(status_));
 
+	//UNIQUE_LOCK(mutex(), lk);
+
 	auto &d = data_[c];
 	if (d.status != ftl::data::ChannelStatus::FLUSHED) {
 		d.status = ftl::data::ChannelStatus::DISPATCHED;
@@ -128,6 +132,8 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t
 std::any &Frame::createAny(ftl::codecs::Channel c) {
 	if (status_ != FrameStatus::STORED) throw FTL_Error("Cannot create before store or after flush");
 
+	//UNIQUE_LOCK(mutex(), lk);
+
 	auto &d = data_[c];
 	if (d.status != ftl::data::ChannelStatus::FLUSHED) {
 		d.status = ftl::data::ChannelStatus::VALID;
@@ -162,6 +168,7 @@ bool Frame::flush() {
 	for (auto c : changed_) {
 		_getData(c.first).status = ChannelStatus::FLUSHED;
 	}
+	_primaryStore();
 	return true;
 }
 
@@ -183,6 +190,32 @@ void Frame::store() {
 
 	if (!parent_) return;
 
+	//UNIQUE_LOCK(parent_->mutex(), lk);
+
+	for (auto c : changed_) {
+		if (ftl::data::isPersistent(c.first) && hasOwn(c.first)) {
+			auto &d = data_[c.first];
+			auto &pd = parent_->data_[c.first];
+			pd.data = d.data;
+			//pd.encoded = std::move(d.encoded);
+			pd.status = ChannelStatus::VALID;
+			//data_.erase(c.first);
+		}
+
+		parent_->change_.trigger(*this, c.first);
+		uint64_t sig = (uint64_t(id()) << 32) + static_cast<unsigned int>(c.first);
+		const auto &i = parent_->change_channel_.find(sig);
+
+		if (i != parent_->change_channel_.end()) i->second.trigger(*this, c.first);
+	}
+}
+
+void Frame::_primaryStore() {
+	if (mode_ == FrameMode::RESPONSE) return;
+	if (!parent_) return;
+
+	//UNIQUE_LOCK(parent_->mutex(), lk);
+
 	for (auto c : changed_) {
 		if (ftl::data::isPersistent(c.first) && hasOwn(c.first)) {
 			auto &d = data_[c.first];
@@ -196,6 +229,7 @@ void Frame::store() {
 		parent_->change_.trigger(*this, c.first);
 		uint64_t sig = (uint64_t(id()) << 32) + static_cast<unsigned int>(c.first);
 		const auto &i = parent_->change_channel_.find(sig);
+
 		if (i != parent_->change_channel_.end()) i->second.trigger(*this, c.first);
 	}
 }
@@ -216,6 +250,7 @@ void Frame::moveTo(Frame &f) {
 	f.id_ = id_;
 	f.timestamp_ = timestamp_;
 	f.status_ = status_;
+	f.mode_ = mode_;
 	f.parent_ = parent_;
 	f.pool_ = pool_;
 	f.data_ = std::move(data_);
@@ -262,6 +297,7 @@ void Frame::reset() {
 	}
 	changed_.clear();
 	status_ = FrameStatus::CREATED;
+	mode_ = FrameMode::PRIMARY;
 }
 
 void Frame::hardReset() {
@@ -304,7 +340,6 @@ void Session::notifyChanges(Frame &f) {
 }
 
 void Session::flush(Frame &f) {
-	// TODO: Lock
 	for (auto c : f.changed()) {
 		if (c.second == ftl::data::ChangeType::PRIMARY || c.second == ftl::data::ChangeType::RESPONSE) {
 			auto &d = f._getData(c.first);
@@ -323,7 +358,6 @@ void Session::flush(Frame &f) {
 }
 
 void Session::flush(Frame &f, ftl::codecs::Channel c) {
-	// TODO: Lock
 	auto cc = f.changed_[c];
 	if (cc == ftl::data::ChangeType::PRIMARY || cc == ftl::data::ChangeType::RESPONSE) {
 		auto &d = f._getData(c);
diff --git a/components/structures/src/pool.cpp b/components/structures/src/pool.cpp
index 9c1839860..d022f5474 100644
--- a/components/structures/src/pool.cpp
+++ b/components/structures/src/pool.cpp
@@ -9,6 +9,7 @@ Pool::Pool(size_t min_n, size_t max_n) : min_n_(min_n), max_n_(max_n) {
 }
 
 Pool::~Pool() {
+	UNIQUE_LOCK(mutex_, lk);
 	for (auto &p : pool_) {
 		for (auto *f : p.second.pool) {
 			f->status_ = FrameStatus::RELEASED;
@@ -18,31 +19,40 @@ Pool::~Pool() {
 }
 
 Frame Pool::allocate(FrameID id, int64_t timestamp) {
-	auto &pool = _getPool(id);
+	Frame *f;
 
-	if (timestamp < pool.last_timestamp) {
-		throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp);
-	}
+	{
+		UNIQUE_LOCK(mutex_, lk);
+		auto &pool = _getPool(id);
 
-	// Add items as required
-	if (pool.pool.size() < min_n_) {
-		while (pool.pool.size() < ideal_n_) {
-			pool.pool.push_back(new Frame(this, &pool.session, id, 0));
+		if (timestamp < pool.last_timestamp) {
+			throw FTL_Error("New frame timestamp is older than previous: " << timestamp << " vs " << pool.last_timestamp);
 		}
+
+		// Add items as required
+		if (pool.pool.size() < min_n_) {
+			while (pool.pool.size() < ideal_n_) {
+				pool.pool.push_back(new Frame(this, &pool.session, id, 0));
+			}
+		}
+
+		f = pool.pool.front();
+		pool.pool.pop_front();
+		pool.last_timestamp = timestamp;
 	}
 
-	Frame *f = pool.pool.front();
 	Frame ff = std::move(*f);
-	delete f;
 	ff.restart(timestamp);
-	pool.pool.pop_front();
-	pool.last_timestamp = timestamp;
+	delete f;
+
 	return ff;
 }
 
 void Pool::release(Frame &f) {
 	if (f.status() == FrameStatus::RELEASED) return;
 	f.reset();
+
+	UNIQUE_LOCK(mutex_, lk);
 	auto &pool = _getPool(f.id());
 
 	if (pool.pool.size() < max_n_) {
@@ -53,16 +63,19 @@ void Pool::release(Frame &f) {
 }
 
 ftl::data::Session &Pool::session(FrameID id) {
+	UNIQUE_LOCK(mutex_, lk);
 	auto &pool = _getPool(id);
 	return pool.session;
 }
 
 size_t Pool::size(FrameID id) {
+	UNIQUE_LOCK(mutex_, lk);
 	auto &pool = _getPool(id);
 	return pool.pool.size();
 }
 
 size_t Pool::size() {
+	UNIQUE_LOCK(mutex_, lk);
 	size_t s = 0;
 	for (auto &p : pool_) {
 		s += p.second.pool.size();
diff --git a/components/structures/test/frame_unit.cpp b/components/structures/test/frame_unit.cpp
index ffb81142e..74647ab40 100644
--- a/components/structures/test/frame_unit.cpp
+++ b/components/structures/test/frame_unit.cpp
@@ -345,6 +345,17 @@ TEST_CASE("ftl::data::Frame persistent data", "[1.2.5]") {
 		REQUIRE( x == y );
 	}
 
+	SECTION("get from parent not ptr") {
+		Session p;
+		Frame f = Feed::make(&p, FrameID(0,0), 0);
+		f.store();
+
+		p.create<int>(Channel::Pose) = 55;
+
+		auto x = f.get<int>(Channel::Pose);
+		REQUIRE( x == 55 );
+	}
+
 	SECTION("has from parent") {
 		Session p;
 		Frame f = Feed::make(&p, FrameID(0,0), 0);
@@ -772,7 +783,7 @@ TEST_CASE("ftl::data::Frame multiple flush", "[Frame]") {
 TEST_CASE("ftl::data::Frame locality of changes", "[2.2.4]") {
 	ftl::data::make_channel<int>(Channel::Density, "density", ftl::data::StorageMode::PERSISTENT);
 
-	SECTION("not persistent after flush only") {
+	SECTION("persistent after flush only for primary frame") {
 		Session p;
 		Frame f = Feed::make(&p, FrameID(0,0), 0);
 		f.store();
@@ -788,9 +799,30 @@ TEST_CASE("ftl::data::Frame locality of changes", "[2.2.4]") {
 			e.ignore();
 			err = true;
 		}
-		REQUIRE( err );
+		REQUIRE( !err );
 	}
 
+	// FIXME: Need a way to change frame mode or generate response frame.
+	/*SECTION("not persistent after flush only for response frame") {
+		Session p;
+		Frame ff = Feed::make(&p, FrameID(0,0), 0);
+		ff.store();
+		Frame f = ff.response();
+
+		f.create<int>(Channel::Density) = 44;
+		f.flush();
+
+		bool err=false;
+
+		try {		
+			p.get<int>(Channel::Density);
+		} catch(const ftl::exception &e) {
+			e.ignore();
+			err = true;
+		}
+		REQUIRE( err );
+	}*/
+
 	SECTION("not persistent without store") {
 		Session p;
 		Frame f = Feed::make(&p, FrameID(0,0), 0);
diff --git a/components/structures/test/pool_unit.cpp b/components/structures/test/pool_unit.cpp
index 6476d00c6..141acfb0d 100644
--- a/components/structures/test/pool_unit.cpp
+++ b/components/structures/test/pool_unit.cpp
@@ -121,3 +121,107 @@ TEST_CASE("ftl::data::Pool excessive allocations", "[5.5]") {
 	}
 }
 
+TEST_CASE("ftl::data::Pool persistent sessions", "[]") {
+	SECTION("persistent across timetstamps") {
+		Pool pool(10,20);
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 10);
+			f.store();
+			f.create<int>(Channel::Pose) = 567;
+		}
+
+		REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) );
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 20);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 567 );
+		}
+	}
+
+	SECTION("persistent across many timetstamps") {
+		Pool pool(10,20);
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 10);
+			f.store();
+			f.create<int>(Channel::Pose) = 567;
+		}
+
+		REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) );
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 20);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 567 );
+		}
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 30);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 567 );
+		}
+	}
+
+	SECTION("persistent across frames and timetstamps") {
+		Pool pool(10,20);
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 10);
+			f.store();
+			f.create<int>(Channel::Pose) = 567;
+		}
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,1), 10);
+			f.store();
+			f.create<int>(Channel::Pose) = 568;
+		}
+
+		REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) );
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 20);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 567 );
+		}
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,1), 20);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 568 );
+		}
+	}
+
+	SECTION("persistent across framesets and timetstamps") {
+		Pool pool(10,20);
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 10);
+			f.store();
+			f.create<int>(Channel::Pose) = 567;
+		}
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(1,0), 10);
+			f.store();
+			f.create<int>(Channel::Pose) = 568;
+		}
+
+		REQUIRE( (pool.session(FrameID(0,0)).get<int>(Channel::Pose) == 567) );
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(0,0), 20);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 567 );
+		}
+
+		{
+			Frame f = pool.allocate(ftl::data::FrameID(1,0), 20);
+			f.store();
+			REQUIRE( f.get<int>(Channel::Pose) == 568 );
+		}
+	}	
+}
+
-- 
GitLab