From fd7625e6ac9cd0b0bb00e93a78eef2fb4d180d01 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Tue, 16 Jun 2020 22:24:41 +0300 Subject: [PATCH] WIP encode data channels --- .../streams/include/ftl/streams/sender.hpp | 4 + components/streams/src/sender.cpp | 48 ++++++++- components/streams/test/sender_unit.cpp | 99 +++++++++++++++++++ .../structures/include/ftl/data/channels.hpp | 10 ++ .../structures/include/ftl/data/new_frame.hpp | 27 +++++ components/structures/src/new_frame.cpp | 17 ++++ 6 files changed, 203 insertions(+), 2 deletions(-) diff --git a/components/streams/include/ftl/streams/sender.hpp b/components/streams/include/ftl/streams/sender.hpp index 73891734a..d542fc748 100644 --- a/components/streams/include/ftl/streams/sender.hpp +++ b/components/streams/include/ftl/streams/sender.hpp @@ -65,6 +65,10 @@ class Sender : public ftl::Configurable { //ftl::codecs::Encoder *_getEncoder(int fsid, int fid, ftl::codecs::Channel c); void _encodeChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); + void _encodeVideoChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); + void _encodeAudioChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); + void _encodeDataChannel(ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, bool reset); + int _generateTiles(const ftl::rgbd::FrameSet &fs, int offset, ftl::codecs::Channel c, cv::cuda::Stream &stream, bool, bool); EncodingState &_getTile(int fsid, ftl::codecs::Channel c); cv::Rect _generateROI(const ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, int offset, bool stereo); diff --git a/components/streams/src/sender.cpp b/components/streams/src/sender.cpp index 246b1587f..4fb7ab1c8 100644 --- a/components/streams/src/sender.cpp +++ b/components/streams/src/sender.cpp @@ -267,7 +267,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { StreamPacket spkt; spkt.version = 4; spkt.timestamp = fs.timestamp(); - spkt.streamID = 0; // FIXME: fs.id; + spkt.streamID = fs.frameset(); spkt.frame_number = 255; spkt.channel = c; @@ -286,7 +286,7 @@ void Sender::post(ftl::data::FrameSet &fs, ftl::codecs::Channel c) { //do_inject_ = false; } -void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { +void Sender::_encodeVideoChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { bool lossless = value("lossless", false); int max_bitrate = std::max(0, std::min(255, value("max_bitrate", 255))); //int min_bitrate = std::max(0, std::min(255, value("min_bitrate", 0))); // TODO: Use this @@ -388,6 +388,50 @@ void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { } } +void Sender::_encodeAudioChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { + +} + +void Sender::_encodeDataChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { + int i=0; + + for (auto &f : fs.frames) { + StreamPacket spkt; + spkt.version = 4; + spkt.timestamp = fs.timestamp(); + spkt.streamID = fs.frameset(); + spkt.frame_number = i++; + spkt.channel = c; + + ftl::codecs::Packet pkt; + pkt.frame_count = 1; + pkt.codec = codec_t::MSGPACK; + pkt.bitrate = 255; + pkt.flags = 0; + + auto encoder = ftl::data::getTypeEncoder(f.type(c)); + if (encoder) { + if (encoder(f, c, pkt.data)) { + stream_->post(spkt, pkt); + } + } else { + LOG(WARNING) << "Missing msgpack encoder"; + } + } +} + +void Sender::_encodeChannel(ftl::data::FrameSet &fs, Channel c, bool reset) { + int ic = int(c); + + if (ic < 32) { + _encodeVideoChannel(fs, c, reset); + } else if (ic < 64) { + _encodeAudioChannel(fs, c, reset); + } else { + _encodeDataChannel(fs, c, reset); + } +} + cv::Rect Sender::_generateROI(const ftl::rgbd::FrameSet &fs, ftl::codecs::Channel c, int offset, bool stereo) { const ftl::data::Frame &cframe = fs.firstFrame(); int rwidth = cframe.get<cv::cuda::GpuMat>(c).cols; diff --git a/components/streams/test/sender_unit.cpp b/components/streams/test/sender_unit.cpp index 32239dee0..5b4a9ce55 100644 --- a/components/streams/test/sender_unit.cpp +++ b/components/streams/test/sender_unit.cpp @@ -318,3 +318,102 @@ TEST_CASE( "Sender request to control encoding" ) { REQUIRE( ftl::codecs::hevc::validNAL(pkt.data.data(), pkt.data.size()) ); } } + +TEST_CASE( "Sender::post() data channels" ) { + json_t global = json_t{{"$id","ftl://test"}}; + ftl::config::configure(global); + + json_t cfg = json_t{ + {"$id","ftl://test/1"} + }; + auto *sender = ftl::create<Sender>(cfg); + + ftl::data::Pool pool(4,6); + Frame f = pool.allocate(ftl::data::FrameID(0,0), 1000); + f.store(); + auto fsptr = FrameSet::fromFrame(f); + FrameSet &fs = *fsptr; + + json_t cfg2 = json_t{ + {"$id","ftl://test/2"} + }; + TestStream stream(cfg2); + sender->setStream(&stream); + + ftl::codecs::StreamPacket spkt; + ftl::codecs::Packet pkt; + int count = 0; + + stream.onIntercept([&count,&spkt,&pkt](const ftl::codecs::StreamPacket &pspkt, const ftl::codecs::Packet &ppkt) { + spkt = pspkt; + pkt = ppkt; + ++count; + }); + + SECTION("a single calibration channel") { + stream.select(0, {Channel::Calibration}, true); + + fs.count = 1; + fs.mask = 1; + auto &calib = fs.frames[0].create<ftl::rgbd::Camera>(Channel::Calibration); + calib.width = 1024; + + fs.frames[0].flush(); + sender->post(fs, Channel::Calibration); + + REQUIRE( count == 1 ); + REQUIRE( spkt.version == 4 ); + REQUIRE( spkt.timestamp == 1000 ); + REQUIRE( (int)spkt.frame_number == 0 ); + REQUIRE( spkt.streamID == 0 ); + REQUIRE( spkt.channel == Channel::Calibration ); + REQUIRE( pkt.codec == codec_t::MSGPACK ); + REQUIRE( pkt.data.size() > 0 ); + REQUIRE( pkt.frame_count == 1 ); + } + + SECTION("a single pose channel") { + stream.select(0, {Channel::Pose}, true); + + fs.count = 1; + fs.mask = 1; + fs.frames[0].create<Eigen::Matrix4d>(Channel::Pose); + + fs.frames[0].flush(); + sender->post(fs, Channel::Pose); + + REQUIRE( count == 1 ); + REQUIRE( spkt.version == 4 ); + REQUIRE( spkt.timestamp == 1000 ); + REQUIRE( (int)spkt.frame_number == 0 ); + REQUIRE( spkt.streamID == 0 ); + REQUIRE( spkt.channel == Channel::Pose ); + REQUIRE( pkt.codec == codec_t::MSGPACK ); + REQUIRE( pkt.data.size() > 0 ); + REQUIRE( pkt.frame_count == 1 ); + } + + SECTION("a single custom channel") { + stream.select(0, {Channel::Data}, true); + + fs.count = 1; + fs.mask = 1; + auto &vf = fs.frames[0].create<std::vector<float>>(Channel::Data); + vf.push_back(5.0f); + vf.push_back(33.0f); + + fs.frames[0].flush(); + sender->post(fs, Channel::Data); + + REQUIRE( count == 1 ); + REQUIRE( spkt.version == 4 ); + REQUIRE( spkt.timestamp == 1000 ); + REQUIRE( (int)spkt.frame_number == 0 ); + REQUIRE( spkt.streamID == 0 ); + REQUIRE( spkt.channel == Channel::Data ); + REQUIRE( pkt.codec == codec_t::MSGPACK ); + REQUIRE( pkt.data.size() > 0 ); + REQUIRE( pkt.frame_count == 1 ); + // TODO: Check decodes correctly. + } +} diff --git a/components/structures/include/ftl/data/channels.hpp b/components/structures/include/ftl/data/channels.hpp index ab041c0ec..a130975c7 100644 --- a/components/structures/include/ftl/data/channels.hpp +++ b/components/structures/include/ftl/data/channels.hpp @@ -4,10 +4,12 @@ #include <string> #include <ftl/codecs/channels.hpp> #include <ftl/exception.hpp> +#include <ftl/utility/vectorbuffer.hpp> namespace ftl { namespace data { +class Frame; /** Kind of channel in terms of data persistence */ enum class StorageMode { @@ -78,6 +80,14 @@ std::string getChannelName(ftl::codecs::Channel); /** Unsupported */ ftl::codecs::Channel getChannelByName(const std::string &name); +/** + * Attempts to get a msgpack encoder for this channel. Such encoders are + * registered by typeid basis when creating channels. + */ +std::function<bool(const ftl::data::Frame &, ftl::codecs::Channel, std::vector<uint8_t> &)> getTypeEncoder(size_t type); + +void setTypeEncoder(size_t type, const std::function<bool(const ftl::data::Frame &, ftl::codecs::Channel, std::vector<uint8_t> &)> &e); + /** * Helper to register a channel using a template specified type. */ diff --git a/components/structures/include/ftl/data/new_frame.hpp b/components/structures/include/ftl/data/new_frame.hpp index 4bc435277..ac392a480 100644 --- a/components/structures/include/ftl/data/new_frame.hpp +++ b/components/structures/include/ftl/data/new_frame.hpp @@ -145,6 +145,8 @@ class Frame { const std::any &getAny(ftl::codecs::Channel c) const; + inline size_t type(ftl::codecs::Channel c) const { return getAny(c).type().hash_code(); } + std::any &getAnyMutable(ftl::codecs::Channel c); template <typename T> @@ -357,6 +359,26 @@ class Session : public Frame { MUTEX mutex_; }; +template <typename T, std::enable_if_t<std::is_invocable<decltype(msgpack::pack<ftl::util::FTLVectorBuffer,T>), ftl::util::FTLVectorBuffer, T>::type,int> = 0> +bool make_type() { + //static_assert(false, "YAY"); + setTypeEncoder(typeid(T).hash_code(), [](const ftl::data::Frame &f, ftl::codecs::Channel c, std::vector<uint8_t> &data) { + data.resize(0); + ftl::util::FTLVectorBuffer buf(data); + msgpack::pack(buf, f.get<T>(c)); + return true; + }); + return true; +} + +template <typename T> +bool make_type() { + T t; + (void)t; + setTypeEncoder(typeid(T).hash_code(), nullptr); + return false; +} + } } @@ -447,6 +469,7 @@ T &ftl::data::Frame::create(ftl::codecs::Channel c) { if (isAggregate(c)) throw FTL_Error("Aggregate channels must be of list type"); ftl::data::verifyChannelType<T>(c); + ftl::data::make_type<T>(); std::any &a = createAny(c); if (!isType<T>(c)) return a.emplace<T>(); @@ -457,6 +480,7 @@ T &ftl::data::Frame::create(ftl::codecs::Channel c) { template <typename T, std::enable_if_t<is_list<T>::value,int> = 0> ftl::data::Aggregator<T> ftl::data::Frame::create(ftl::codecs::Channel c) { ftl::data::verifyChannelType<T>(c); + ftl::data::make_type<T>(); std::any &a = createAny(c); if (!isType<T>(c)) a.emplace<T>(); @@ -468,6 +492,7 @@ T &ftl::data::Frame::createChange(ftl::codecs::Channel c, ftl::data::ChangeType if (!bool(is_list<T>{}) && isAggregate(c)) throw FTL_Error("Aggregate channels must be of list type"); ftl::data::verifyChannelType<T>(c); + ftl::data::make_type<T>(); std::any &a = createAnyChange(c, type, data); if (!isType<T>(c)) return a.emplace<T>(); @@ -480,6 +505,7 @@ T &ftl::data::Frame::createChange(ftl::codecs::Channel c, ftl::data::ChangeType if (isAggregate(c)) throw FTL_Error("Aggregate channels must be of list type"); ftl::data::verifyChannelType<T>(c); + ftl::data::make_type<T>(); std::any &a = createAnyChange(c, type); if (!isType<T>(c)) return a.emplace<T>(); @@ -490,6 +516,7 @@ T &ftl::data::Frame::createChange(ftl::codecs::Channel c, ftl::data::ChangeType template <typename T, std::enable_if_t<is_list<T>::value,int> = 0> ftl::data::Aggregator<T> ftl::data::Frame::createChange(ftl::codecs::Channel c, ftl::data::ChangeType type) { ftl::data::verifyChannelType<T>(c); + ftl::data::make_type<T>(); std::any &a = createAnyChange(c, type); if (!isType<T>(c)) a.emplace<T>(); diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 6d0da508e..4a728c6ad 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -12,6 +12,7 @@ using ftl::data::FrameStatus; #include <loguru.hpp> static std::unordered_map<ftl::codecs::Channel, ChannelConfig> reg_channels; +static std::unordered_map<size_t, std::function<bool(const ftl::data::Frame &, ftl::codecs::Channel, std::vector<uint8_t> &)>> encoders; void ftl::data::registerChannel(ftl::codecs::Channel c, const ChannelConfig &config) { auto i = reg_channels.find(c); @@ -53,6 +54,17 @@ ftl::codecs::Channel ftl::data::getChannelByName(const std::string &name) { return ftl::codecs::Channel::Colour; } +std::function<bool(const ftl::data::Frame &, ftl::codecs::Channel, std::vector<uint8_t> &)> ftl::data::getTypeEncoder(size_t type) { + const auto &i = encoders.find(type); + if (i != encoders.end()) return i->second; + else return nullptr; +} + +void ftl::data::setTypeEncoder(size_t type, const std::function<bool(const ftl::data::Frame &, ftl::codecs::Channel, std::vector<uint8_t> &)> &e) { + encoders[type] = e; + LOG(INFO) << "Create msgpack encoder: " << type << "(" << ((e) ? "true" : "false") << ")"; +} + //============================================================================== Frame::~Frame() { @@ -164,6 +176,11 @@ std::any &Frame::getAnyMutable(ftl::codecs::Channel c) { return d.data; } +const std::any &Frame::getAny(ftl::codecs::Channel c) const { + auto &d = _getData(c); + return d.data; +} + const std::list<ftl::codecs::Packet> &ftl::data::Frame::getEncoded(ftl::codecs::Channel c) const { const auto &d = _getData(c); if (d.status != ftl::data::ChannelStatus::INVALID) { -- GitLab