diff --git a/components/codecs/include/ftl/codecs/reader.hpp b/components/codecs/include/ftl/codecs/reader.hpp index 530c098904023f041376c88128ddc34c4034eb9d..5492c3d501a97756925fa46fa17eba58e22846a9 100644 --- a/components/codecs/include/ftl/codecs/reader.hpp +++ b/components/codecs/include/ftl/codecs/reader.hpp @@ -32,6 +32,8 @@ class Reader { private: std::istream *stream_; msgpack::unpacker buffer_; + std::tuple<StreamPacket,Packet> data_; + bool has_data_; }; } diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp index 1f5b7f2aabe0682d535e83ac3447a26bcee3319d..1cd570143374ebf8bada2d209dcdae7a2a6c194d 100644 --- a/components/codecs/src/reader.cpp +++ b/components/codecs/src/reader.cpp @@ -7,7 +7,7 @@ using ftl::codecs::StreamPacket; using ftl::codecs::Packet; using std::get; -Reader::Reader(std::istream &s) : stream_(&s) { +Reader::Reader(std::istream &s) : stream_(&s), has_data_(false) { } @@ -23,7 +23,14 @@ bool Reader::begin() { } bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { - while (stream_->good()) { + if (has_data_ && get<0>(data_).timestamp <= ts) { + f(get<0>(data_), get<1>(data_)); + has_data_ = false; + } else if (has_data_) { + return false; + } + + while (stream_->good() || buffer_.nonparsed_size() > 0) { if (buffer_.nonparsed_size() == 0) { buffer_.reserve_buffer(100000); stream_->read(buffer_.buffer(), 100000); @@ -37,11 +44,16 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream msgpack::object_handle msg; if (!buffer_.next(msg)) continue; - msgpack::object obj = msg.get(); std::tuple<StreamPacket,Packet> data; + msgpack::object obj = msg.get(); obj.convert(data); - f(get<0>(data),get<1>(data)); + if (get<0>(data).timestamp <= ts) { + f(get<0>(data),get<1>(data)); + } else { + data_ = data; + has_data_ = true; + } } return false; diff --git a/components/codecs/test/readwrite_test.cpp b/components/codecs/test/readwrite_test.cpp index e929ebaf909befd37dd3b6f18700fa4f4944cf49..6258661aee8e992a0708edb5b8972e428970cb3a 100644 --- a/components/codecs/test/readwrite_test.cpp +++ b/components/codecs/test/readwrite_test.cpp @@ -50,3 +50,156 @@ TEST_CASE( "Write and read - Single frame" ) { REQUIRE( n == 1 ); REQUIRE( !res ); } + +TEST_CASE( "Write and read - Multiple frames" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = 0; + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.timestamp = 50; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + spkt.timestamp = 100; + pkt.data = {66,66,66}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66)); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 3 ); + REQUIRE( !res ); +} + +TEST_CASE( "Write and read - Multiple frames with limit" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = 0; + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.timestamp = 50; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + spkt.timestamp = 100; + pkt.data = {66,66,66}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66)); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 2 ); + REQUIRE( !res ); +} + +TEST_CASE( "Write and read - Multiple reads" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = 0; + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.timestamp = 50; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + spkt.timestamp = 100; + pkt.data = {66,66,66}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66)); + REQUIRE(rspkt.channel == 0); + }); + + REQUIRE( n == 2 ); + REQUIRE( !res ); + + n = 0; + res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 66 ); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 1 ); + REQUIRE( !res ); +}