Skip to content
Snippets Groups Projects
Commit af8f28bf authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Working read up to timestamp

parent ec8de648
No related branches found
No related tags found
1 merge request!127Implements #196 stream capturing
...@@ -32,6 +32,8 @@ class Reader { ...@@ -32,6 +32,8 @@ class Reader {
private: private:
std::istream *stream_; std::istream *stream_;
msgpack::unpacker buffer_; msgpack::unpacker buffer_;
std::tuple<StreamPacket,Packet> data_;
bool has_data_;
}; };
} }
......
...@@ -7,7 +7,7 @@ using ftl::codecs::StreamPacket; ...@@ -7,7 +7,7 @@ using ftl::codecs::StreamPacket;
using ftl::codecs::Packet; using ftl::codecs::Packet;
using std::get; using std::get;
Reader::Reader(std::istream &s) : stream_(&s) { Reader::Reader(std::istream &s) : stream_(&s), has_data_(false) {
} }
...@@ -23,7 +23,14 @@ bool Reader::begin() { ...@@ -23,7 +23,14 @@ bool Reader::begin() {
} }
bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) {
while (stream_->good()) { if (has_data_ && get<0>(data_).timestamp <= ts) {
f(get<0>(data_), get<1>(data_));
has_data_ = false;
} else if (has_data_) {
return false;
}
while (stream_->good() || buffer_.nonparsed_size() > 0) {
if (buffer_.nonparsed_size() == 0) { if (buffer_.nonparsed_size() == 0) {
buffer_.reserve_buffer(100000); buffer_.reserve_buffer(100000);
stream_->read(buffer_.buffer(), 100000); stream_->read(buffer_.buffer(), 100000);
...@@ -37,11 +44,16 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream ...@@ -37,11 +44,16 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream
msgpack::object_handle msg; msgpack::object_handle msg;
if (!buffer_.next(msg)) continue; if (!buffer_.next(msg)) continue;
msgpack::object obj = msg.get();
std::tuple<StreamPacket,Packet> data; std::tuple<StreamPacket,Packet> data;
msgpack::object obj = msg.get();
obj.convert(data); obj.convert(data);
if (get<0>(data).timestamp <= ts) {
f(get<0>(data),get<1>(data)); f(get<0>(data),get<1>(data));
} else {
data_ = data;
has_data_ = true;
}
} }
return false; return false;
......
...@@ -50,3 +50,156 @@ TEST_CASE( "Write and read - Single frame" ) { ...@@ -50,3 +50,156 @@ TEST_CASE( "Write and read - Single frame" ) {
REQUIRE( n == 1 ); REQUIRE( n == 1 );
REQUIRE( !res ); REQUIRE( !res );
} }
TEST_CASE( "Write and read - Multiple frames" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = 0;
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.timestamp = 50;
pkt.data = {55,55,55};
w.write(spkt, pkt);
spkt.timestamp = 100;
pkt.data = {66,66,66};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 3 );
REQUIRE( !res );
}
TEST_CASE( "Write and read - Multiple frames with limit" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = 0;
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.timestamp = 50;
pkt.data = {55,55,55};
w.write(spkt, pkt);
spkt.timestamp = 100;
pkt.data = {66,66,66};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 2 );
REQUIRE( !res );
}
TEST_CASE( "Write and read - Multiple reads" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = 0;
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.timestamp = 50;
pkt.data = {55,55,55};
w.write(spkt, pkt);
spkt.timestamp = 100;
pkt.data = {66,66,66};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
REQUIRE(rspkt.channel == 0);
});
REQUIRE( n == 2 );
REQUIRE( !res );
n = 0;
res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == 66 );
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 1 );
REQUIRE( !res );
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment