From af8f28bfc33baece48c792a2033c9667170e2d71 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nwpope@utu.fi>
Date: Wed, 9 Oct 2019 23:13:37 +0300
Subject: [PATCH] Working read up to timestamp

---
 .../codecs/include/ftl/codecs/reader.hpp      |   2 +
 components/codecs/src/reader.cpp              |  20 ++-
 components/codecs/test/readwrite_test.cpp     | 153 ++++++++++++++++++
 3 files changed, 171 insertions(+), 4 deletions(-)

diff --git a/components/codecs/include/ftl/codecs/reader.hpp b/components/codecs/include/ftl/codecs/reader.hpp
index 530c09890..5492c3d50 100644
--- a/components/codecs/include/ftl/codecs/reader.hpp
+++ b/components/codecs/include/ftl/codecs/reader.hpp
@@ -32,6 +32,8 @@ class Reader {
 	private:
 	std::istream *stream_;
 	msgpack::unpacker buffer_;
+	std::tuple<StreamPacket,Packet> data_;
+	bool has_data_;
 };
 
 }
diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp
index 1f5b7f2aa..1cd570143 100644
--- a/components/codecs/src/reader.cpp
+++ b/components/codecs/src/reader.cpp
@@ -7,7 +7,7 @@ using ftl::codecs::StreamPacket;
 using ftl::codecs::Packet;
 using std::get;
 
-Reader::Reader(std::istream &s) : stream_(&s) {
+Reader::Reader(std::istream &s) : stream_(&s), has_data_(false) {
 
 }
 
@@ -23,7 +23,14 @@ bool Reader::begin() {
 }
 
 bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) {
-	while (stream_->good()) {
+	if (has_data_ && get<0>(data_).timestamp <= ts) {
+		f(get<0>(data_), get<1>(data_));
+		has_data_ = false;
+	} else if (has_data_) {
+		return false;
+	}
+
+	while (stream_->good() || buffer_.nonparsed_size() > 0) {
 		if (buffer_.nonparsed_size() == 0) {
 			buffer_.reserve_buffer(100000);
 			stream_->read(buffer_.buffer(), 100000);
@@ -37,11 +44,16 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream
 		msgpack::object_handle msg;
 		if (!buffer_.next(msg)) continue;
 
-		msgpack::object obj = msg.get();
 		std::tuple<StreamPacket,Packet> data;
+		msgpack::object obj = msg.get();
 		obj.convert(data);
 
-		f(get<0>(data),get<1>(data));
+		if (get<0>(data).timestamp <= ts) {
+			f(get<0>(data),get<1>(data));
+		} else {
+			data_ = data;
+			has_data_ = true;
+		}
 	}
 
 	return false;
diff --git a/components/codecs/test/readwrite_test.cpp b/components/codecs/test/readwrite_test.cpp
index e929ebaf9..6258661ae 100644
--- a/components/codecs/test/readwrite_test.cpp
+++ b/components/codecs/test/readwrite_test.cpp
@@ -50,3 +50,156 @@ TEST_CASE( "Write and read - Single frame" ) {
 	REQUIRE( n == 1 );
 	REQUIRE( !res );
 }
+
+TEST_CASE( "Write and read - Multiple frames" ) {
+	std::stringstream s;
+	Writer w(s);
+
+	StreamPacket spkt;
+	Packet pkt;
+
+	spkt.channel = 0;
+	spkt.timestamp = 0;
+	spkt.streamID = 0;
+
+	pkt.codec = codec_t::JSON;
+	pkt.definition = definition_t::Any;
+	pkt.block_number = 0;
+	pkt.block_total = 1;
+	pkt.flags = 0;
+	pkt.data = {44,44,44};
+
+	w.begin();
+	w.write(spkt, pkt);
+	spkt.timestamp = 50;
+	pkt.data = {55,55,55};
+	w.write(spkt, pkt);
+	spkt.timestamp = 100;
+	pkt.data = {66,66,66};
+	w.write(spkt, pkt);
+	w.end();
+
+	REQUIRE( s.str().size() > 0 );
+
+	int n = 0;
+
+	Reader r(s);
+	r.begin();
+	bool res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
+		++n;
+		REQUIRE(rpkt.codec == codec_t::JSON);
+		REQUIRE(rpkt.data.size() == 3);
+		REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
+		REQUIRE(rspkt.channel == 0);
+	});
+	r.end();
+
+	REQUIRE( n == 3 );
+	REQUIRE( !res );
+}
+
+TEST_CASE( "Write and read - Multiple frames with limit" ) {
+	std::stringstream s;
+	Writer w(s);
+
+	StreamPacket spkt;
+	Packet pkt;
+
+	spkt.channel = 0;
+	spkt.timestamp = 0;
+	spkt.streamID = 0;
+
+	pkt.codec = codec_t::JSON;
+	pkt.definition = definition_t::Any;
+	pkt.block_number = 0;
+	pkt.block_total = 1;
+	pkt.flags = 0;
+	pkt.data = {44,44,44};
+
+	w.begin();
+	w.write(spkt, pkt);
+	spkt.timestamp = 50;
+	pkt.data = {55,55,55};
+	w.write(spkt, pkt);
+	spkt.timestamp = 100;
+	pkt.data = {66,66,66};
+	w.write(spkt, pkt);
+	w.end();
+
+	REQUIRE( s.str().size() > 0 );
+
+	int n = 0;
+
+	Reader r(s);
+	r.begin();
+	bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
+		++n;
+		REQUIRE(rpkt.codec == codec_t::JSON);
+		REQUIRE(rpkt.data.size() == 3);
+		REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
+		REQUIRE(rspkt.channel == 0);
+	});
+	r.end();
+
+	REQUIRE( n == 2 );
+	REQUIRE( !res );
+}
+
+TEST_CASE( "Write and read - Multiple reads" ) {
+	std::stringstream s;
+	Writer w(s);
+
+	StreamPacket spkt;
+	Packet pkt;
+
+	spkt.channel = 0;
+	spkt.timestamp = 0;
+	spkt.streamID = 0;
+
+	pkt.codec = codec_t::JSON;
+	pkt.definition = definition_t::Any;
+	pkt.block_number = 0;
+	pkt.block_total = 1;
+	pkt.flags = 0;
+	pkt.data = {44,44,44};
+
+	w.begin();
+	w.write(spkt, pkt);
+	spkt.timestamp = 50;
+	pkt.data = {55,55,55};
+	w.write(spkt, pkt);
+	spkt.timestamp = 100;
+	pkt.data = {66,66,66};
+	w.write(spkt, pkt);
+	w.end();
+
+	REQUIRE( s.str().size() > 0 );
+
+	int n = 0;
+
+	Reader r(s);
+	r.begin();
+	bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
+		++n;
+		REQUIRE(rpkt.codec == codec_t::JSON);
+		REQUIRE(rpkt.data.size() == 3);
+		REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
+		REQUIRE(rspkt.channel == 0);
+	});
+
+	REQUIRE( n == 2 );
+	REQUIRE( !res );
+
+	n = 0;
+	res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
+		++n;
+		REQUIRE(rpkt.codec == codec_t::JSON);
+		REQUIRE(rpkt.data.size() == 3);
+		REQUIRE(rpkt.data[0] == 66 );
+		REQUIRE(rspkt.channel == 0);
+	});
+	r.end();
+
+	REQUIRE( n == 1 );
+	REQUIRE( !res );
+}
-- 
GitLab