Skip to content
Snippets Groups Projects
Commit d2977c26 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

WIP delayed msgpack decode

parent dd62c2ca
No related branches found
No related tags found
1 merge request!316Resolves #343 GUI and Frame Refactor
......@@ -27,4 +27,9 @@ inline bool ftl::data::make_type<std::list<ftl::audio::Audio>>() {
return false;
}
template <>
inline bool ftl::data::decode_type<std::list<ftl::audio::Audio>>(std::any &a, const std::vector<uint8_t> &data) {
return false;
}
#endif // _FTL_AUDIO_AUDIO_HPP_
......@@ -129,7 +129,7 @@ void Speaker::_open(int fsize, int sample, int channels) {
}
void Speaker::queue(int64_t ts, ftl::audio::Frame &frame) {
auto &audio = frame.get<ftl::audio::Audio>((frame.hasChannel(Channel::AudioStereo)) ? Channel::AudioStereo : Channel::AudioMono);
const auto &audio = frame.get<std::list<ftl::audio::Audio>>((frame.hasChannel(Channel::AudioStereo)) ? Channel::AudioStereo : Channel::AudioMono);
if (!buffer_) {
_open(960, 48000, (frame.hasChannel(Channel::AudioStereo)) ? 2 : 1);
......@@ -137,7 +137,9 @@ void Speaker::queue(int64_t ts, ftl::audio::Frame &frame) {
if (!buffer_) return;
//LOG(INFO) << "Buffer Fullness (" << ts << "): " << buffer_->size() << " - " << audio.size();
buffer_->write(audio.data());
for (const auto &d : audio) {
buffer_->write(d.data());
}
//LOG(INFO) << "Audio delay: " << buffer_.delay() << "s";
}
......
......@@ -193,4 +193,9 @@ inline bool ftl::data::make_type<ftl::rgbd::VideoFrame>() {
return false;
}
template <>
inline bool ftl::data::decode_type<ftl::rgbd::VideoFrame>(std::any &a, const std::vector<uint8_t> &data) {
return false;
}
#endif // _FTL_RGBD_FRAME_HPP_
\ No newline at end of file
......@@ -422,3 +422,64 @@ TEST_CASE( "Receiver non zero buffer" ) {
delete receiver;
//ftl::data::clearRegistry();
}
TEST_CASE( "Receiver for data channels" ) {
//ftl::data::make_channel<ftl::rgbd::Camera>(Channel::Calibration, "calibration", ftl::data::StorageMode::PERSISTENT);
json_t global = json_t{{"$id","ftl://test"}};
ftl::config::configure(global);
ftl::data::Pool pool(5,7);
json_t cfg = json_t{
{"$id","ftl://test/1"}
};
auto *receiver = ftl::create<Receiver>(cfg, &pool);
json_t cfg2 = json_t{
{"$id","ftl://test/2"}
};
TestStream stream(cfg2);
receiver->setStream(&stream);
receiver->set("frameset_buffer_size", 0);
ftl::codecs::Packet pkt;
pkt.codec = codec_t::Any;
pkt.bitrate = 255;
pkt.flags = 0;
pkt.frame_count = 1;
ftl::codecs::StreamPacket spkt;
spkt.version = 4;
spkt.timestamp = 10;
spkt.frame_number = 0;
spkt.channel = Channel::Data;
spkt.streamID = 0;
ftl::timer::start(false);
SECTION("a single data packet") {
pkt.data.resize(0);
ftl::util::FTLVectorBuffer buf(pkt.data);
msgpack::pack(buf, 5.0f);
stream.post(spkt, pkt);
int count = 0;
auto h = receiver->onFrameSet([&count](const std::shared_ptr<ftl::data::FrameSet>& fs) {
++count;
REQUIRE( fs->timestamp() == 10 );
REQUIRE( fs->frames.size() == 1 );
REQUIRE( fs->frames[0].hasChannel(Channel::Data) );
REQUIRE( fs->frames[0].get<float>(Channel::Data) == 5.0f );
return true;
});
int i=10;
while (i-- > 0 && count < 1) std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE( count == 1 );
}
}
......@@ -32,7 +32,8 @@ enum class ChannelStatus {
INVALID, // Any data is stale and should not be referenced
VALID, // Contains currently valid data
FLUSHED, // Has already been transmitted, now read-only
DISPATCHED // Externally received, can't be flushed but can be modified locally
DISPATCHED, // Externally received, can't be flushed but can be modified locally
ENCODED // Still in an encoded form
};
/* Internal structure for channel configurations. */
......
......@@ -200,6 +200,9 @@ class Frame {
template <typename T>
T &createChange(ftl::codecs::Channel c, ftl::data::ChangeType t, const ftl::codecs::Packet &data);
template <typename T>
void informChange(ftl::codecs::Channel c, ftl::data::ChangeType t, const ftl::codecs::Packet &data);
const std::list<ftl::codecs::Packet> &getEncoded(ftl::codecs::Channel c) const;
template <typename T, typename ...ARGS>
......@@ -308,8 +311,8 @@ class Frame {
private:
struct ChannelData {
ChannelStatus status=ChannelStatus::INVALID;
std::any data;
mutable ChannelStatus status=ChannelStatus::INVALID;
mutable std::any data;
std::list<ftl::codecs::Packet> encoded={};
};
......@@ -380,6 +383,14 @@ bool make_type() {
return true;
}
template <typename T>
bool decode_type(std::any &a, const std::vector<uint8_t> &data) {
auto unpacked = msgpack::unpack((const char*)data.data(), data.size());
T &t = a.emplace<T>();
unpacked.get().convert(t);
return true;
}
}
}
......@@ -457,6 +468,13 @@ const T *ftl::data::Frame::getPtr(ftl::codecs::Channel c) const noexcept {
template <typename T>
const T &ftl::data::Frame::get(ftl::codecs::Channel c) const {
const auto &d = _getData(c);
if (d.status == ftl::data::ChannelStatus::ENCODED) {
// Do a decode now and change the status
d.status = ftl::data::ChannelStatus::DISPATCHED;
decode_type<T>(d.data, d.encoded.front().data);
}
if (d.status != ftl::data::ChannelStatus::INVALID) {
auto *p = std::any_cast<T>(&d.data);
if (!p) throw FTL_Error("'get' wrong type for channel (" << static_cast<unsigned int>(c) << ")");
......@@ -493,13 +511,22 @@ T &ftl::data::Frame::createChange(ftl::codecs::Channel c, ftl::data::ChangeType
if (!bool(is_list<T>{}) && isAggregate(c)) throw FTL_Error("Aggregate channels must be of list type");
ftl::data::verifyChannelType<T>(c);
ftl::data::make_type<T>();
//ftl::data::make_type<T>();
std::any &a = createAnyChange(c, type, data);
if (!isType<T>(c)) return a.emplace<T>();
else return *std::any_cast<T>(&a);
}
template <typename T>
void ftl::data::Frame::informChange(ftl::codecs::Channel c, ftl::data::ChangeType type, const ftl::codecs::Packet &data) {
if (!bool(is_list<T>{}) && isAggregate(c)) throw FTL_Error("Aggregate channels must be of list type");
if (data.codec != ftl::codecs::codec_t::MSGPACK) throw FTL_Error("Can only inform of msgpack changes");
ftl::data::verifyChannelType<T>(c);
createAnyChange(c, type, data);
}
// Non-list version
template <typename T, std::enable_if_t<!is_list<T>::value,int> = 0>
T &ftl::data::Frame::createChange(ftl::codecs::Channel c, ftl::data::ChangeType type) {
......
......@@ -147,7 +147,7 @@ std::any &Frame::createAnyChange(ftl::codecs::Channel c, ftl::data::ChangeType t
}
if (d->status != ftl::data::ChannelStatus::FLUSHED) {
d->status = ftl::data::ChannelStatus::DISPATCHED;
d->status = (data.codec == ftl::codecs::codec_t::MSGPACK) ? ftl::data::ChannelStatus::ENCODED : ftl::data::ChannelStatus::DISPATCHED;
d->encoded.push_back(data);
return d->data;
} else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment