diff --git a/components/codecs/include/ftl/codecs/reader.hpp b/components/codecs/include/ftl/codecs/reader.hpp index 5492c3d501a97756925fa46fa17eba58e22846a9..6ec39f6b0a66a3010d05c1203ce4fd26fcb9d5c1 100644 --- a/components/codecs/include/ftl/codecs/reader.hpp +++ b/components/codecs/include/ftl/codecs/reader.hpp @@ -26,6 +26,16 @@ class Reader { */ bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); + /** + * An alternative version of read where packet events are generated for + * specific streams, allowing different handlers for different streams. + * This allows demuxing and is used by player sources. Each source can call + * this read, only the first one will generate the data packets. + */ + bool read(int64_t ts); + + void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); + bool begin(); bool end(); @@ -34,6 +44,9 @@ class Reader { msgpack::unpacker buffer_; std::tuple<StreamPacket,Packet> data_; bool has_data_; + int64_t timestart_; + + std::vector<std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)>> handlers_; }; } diff --git a/components/codecs/include/ftl/codecs/writer.hpp b/components/codecs/include/ftl/codecs/writer.hpp index 6ff3c8aeb7769756c721afdbd1f6181d9f7dd7d0..abdbdb3db6fafc8d1a65652b44941bb6ce1c44ab 100644 --- a/components/codecs/include/ftl/codecs/writer.hpp +++ b/components/codecs/include/ftl/codecs/writer.hpp @@ -22,6 +22,7 @@ class Writer { private: std::ostream *stream_; msgpack::sbuffer buffer_; + int64_t timestart_; }; } diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp index 6280d93918abfd1f7abe248f9e95b1d9f5c9c4b1..571feec84ec549284a3fd9752407b4db123c44e0 100644 --- a/components/codecs/src/reader.cpp +++ b/components/codecs/src/reader.cpp @@ -1,5 +1,6 @@ #include <loguru.hpp> #include <ftl/codecs/reader.hpp> +#include <ftl/timer.hpp> #include <tuple> @@ -20,6 +21,10 @@ bool Reader::begin() { ftl::codecs::Header h; (*stream_).read((char*)&h, sizeof(h)); if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; + + // Capture current time to adjust timestamps + timestart_ = ftl::timer::get_time(); + return true; } @@ -56,6 +61,9 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream msgpack::object obj = msg.get(); obj.convert(data); + // Adjust timestamp + get<0>(data).timestamp += timestart_; + if (get<0>(data).timestamp <= ts) { f(get<0>(data),get<1>(data)); } else { @@ -68,6 +76,19 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream return false; } +bool Reader::read(int64_t ts) { + return read(ts, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + if (handlers_.size() > spkt.streamID && (bool)handlers_[spkt.streamID]) { + handlers_[spkt.streamID](spkt, pkt); + } + }); +} + +void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { + if (streamID >= handlers_.size()) handlers_.resize(streamID+1); + handlers_[streamID] = f; +} + bool Reader::end() { return true; } diff --git a/components/codecs/src/writer.cpp b/components/codecs/src/writer.cpp index 2022265e0f72a1f6cd7b5f3e4bd88dae46f95be5..70c8647074144798917f7ee645fe020fbc84c1b3 100644 --- a/components/codecs/src/writer.cpp +++ b/components/codecs/src/writer.cpp @@ -1,4 +1,5 @@ #include <ftl/codecs/writer.hpp> +#include <ftl/timer.hpp> #include <tuple> @@ -14,6 +15,10 @@ bool Writer::begin() { ftl::codecs::Header h; h.version = 0; (*stream_).write((const char*)&h, sizeof(h)); + + // Capture current time to adjust timestamps + timestart_ = ftl::timer::get_time(); + return true; } @@ -22,7 +27,11 @@ bool Writer::end() { } bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p) { - auto data = std::make_tuple(s,p); + ftl::codecs::StreamPacket s2 = s; + // Adjust timestamp relative to start of file. + s2.timestamp -= timestart_; + + auto data = std::make_tuple(s2,p); msgpack::pack(buffer_, data); (*stream_).write(buffer_.data(), buffer_.size()); buffer_.clear(); diff --git a/components/codecs/test/readwrite_test.cpp b/components/codecs/test/readwrite_test.cpp index 7f776fdf7ca0da9b0a5cb98bec1ed22f874f117b..03a918ebdf9fe9985e3f35a4e9cd6f1448ab46ef 100644 --- a/components/codecs/test/readwrite_test.cpp +++ b/components/codecs/test/readwrite_test.cpp @@ -2,6 +2,7 @@ #include <ftl/codecs/writer.hpp> #include <ftl/codecs/reader.hpp> +#include <ftl/timer.hpp> #include <sstream> @@ -20,7 +21,7 @@ TEST_CASE( "Write and read - Single frame" ) { Packet pkt; spkt.channel = 0; - spkt.timestamp = 0; + spkt.timestamp = ftl::timer::get_time(); spkt.streamID = 0; pkt.codec = codec_t::JSON; @@ -40,7 +41,7 @@ TEST_CASE( "Write and read - Single frame" ) { Reader r(s); r.begin(); - bool res = r.read(10, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + bool res = r.read(ftl::timer::get_time()+10, [&n](const StreamPacket &rspkt, const Packet &rpkt) { ++n; REQUIRE(rpkt.codec == codec_t::JSON); REQUIRE(rpkt.data.size() == 3); @@ -61,7 +62,7 @@ TEST_CASE( "Write and read - Multiple frames" ) { Packet pkt; spkt.channel = 0; - spkt.timestamp = 0; + spkt.timestamp = ftl::timer::get_time(); spkt.streamID = 0; pkt.codec = codec_t::JSON; @@ -73,10 +74,10 @@ TEST_CASE( "Write and read - Multiple frames" ) { w.begin(); w.write(spkt, pkt); - spkt.timestamp = 50; + spkt.timestamp += 50; pkt.data = {55,55,55}; w.write(spkt, pkt); - spkt.timestamp = 100; + spkt.timestamp += 50; pkt.data = {66,66,66}; w.write(spkt, pkt); w.end(); @@ -87,7 +88,7 @@ TEST_CASE( "Write and read - Multiple frames" ) { Reader r(s); r.begin(); - bool res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + bool res = r.read(ftl::timer::get_time()+100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { ++n; REQUIRE(rpkt.codec == codec_t::JSON); REQUIRE(rpkt.data.size() == 3); @@ -100,6 +101,61 @@ TEST_CASE( "Write and read - Multiple frames" ) { REQUIRE( !res ); } +TEST_CASE( "Write and read - Multiple streams" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = ftl::timer::get_time(); + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.streamID = 1; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n1 = 0; + int n2 = 0; + + Reader r(s); + + r.onPacket(0, [&n1](const StreamPacket &rspkt, const Packet &rpkt) { + ++n1; + REQUIRE(rspkt.streamID == 0); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 44); + }); + + r.onPacket(1, [&n2](const StreamPacket &rspkt, const Packet &rpkt) { + ++n2; + REQUIRE(rspkt.streamID == 1); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 55); + }); + + r.begin(); + bool res = r.read(ftl::timer::get_time()+100); + r.end(); + + REQUIRE( n1 == 1 ); + REQUIRE( n2 == 1 ); + REQUIRE( !res ); +} + TEST_CASE( "Write and read - Multiple frames with limit" ) { std::stringstream s; Writer w(s); @@ -108,7 +164,7 @@ TEST_CASE( "Write and read - Multiple frames with limit" ) { Packet pkt; spkt.channel = 0; - spkt.timestamp = 0; + spkt.timestamp = ftl::timer::get_time(); spkt.streamID = 0; pkt.codec = codec_t::JSON; @@ -120,10 +176,10 @@ TEST_CASE( "Write and read - Multiple frames with limit" ) { w.begin(); w.write(spkt, pkt); - spkt.timestamp = 50; + spkt.timestamp += 50; pkt.data = {55,55,55}; w.write(spkt, pkt); - spkt.timestamp = 100; + spkt.timestamp += 50; pkt.data = {66,66,66}; w.write(spkt, pkt); w.end(); @@ -134,7 +190,7 @@ TEST_CASE( "Write and read - Multiple frames with limit" ) { Reader r(s); r.begin(); - bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + bool res = r.read(ftl::timer::get_time()+50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { ++n; REQUIRE(rpkt.codec == codec_t::JSON); REQUIRE(rpkt.data.size() == 3); @@ -155,7 +211,7 @@ TEST_CASE( "Write and read - Multiple reads" ) { Packet pkt; spkt.channel = 0; - spkt.timestamp = 0; + spkt.timestamp = ftl::timer::get_time(); spkt.streamID = 0; pkt.codec = codec_t::JSON; @@ -167,10 +223,10 @@ TEST_CASE( "Write and read - Multiple reads" ) { w.begin(); w.write(spkt, pkt); - spkt.timestamp = 50; + spkt.timestamp += 50; pkt.data = {55,55,55}; w.write(spkt, pkt); - spkt.timestamp = 100; + spkt.timestamp += 50; pkt.data = {66,66,66}; w.write(spkt, pkt); w.end(); @@ -181,7 +237,7 @@ TEST_CASE( "Write and read - Multiple reads" ) { Reader r(s); r.begin(); - bool res = r.read(50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + bool res = r.read(ftl::timer::get_time()+50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { ++n; REQUIRE(rpkt.codec == codec_t::JSON); REQUIRE(rpkt.data.size() == 3); @@ -193,7 +249,7 @@ TEST_CASE( "Write and read - Multiple reads" ) { REQUIRE( res ); n = 0; - res = r.read(100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + res = r.read(ftl::timer::get_time()+100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { ++n; REQUIRE(rpkt.codec == codec_t::JSON); REQUIRE(rpkt.data.size() == 3); diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 4ec34a5e5685cc76d4356ac3766a5a97fa067679..dcab5bd24adc81f4cdb9c80e20ce0279c29b7af4 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -114,7 +114,10 @@ ftl::rgbd::detail::Source *Source::_createFileImpl(const ftl::URI &uri) { } else if (ftl::is_file(path)) { string ext = path.substr(eix+1); - if (ext == "png" || ext == "jpg") { + if (ext == "ftl") { + //auto *reader = _createReader(path); + //return new PlayerSource(player, std::stoi(uri.getFragment())); + } else if (ext == "png" || ext == "jpg") { return new ImageSource(this, path); } else if (ext == "mp4") { return new StereoVideoSource(this, path);