#include <loguru.hpp> #include <ftl/codecs/reader.hpp> #include <tuple> using ftl::codecs::Reader; using ftl::codecs::StreamPacket; using ftl::codecs::Packet; using std::get; Reader::Reader(std::istream &s) : stream_(&s), has_data_(false) { } Reader::~Reader() { } bool Reader::begin() { ftl::codecs::Header h; (*stream_).read((char*)&h, sizeof(h)); if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; return true; } bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { if (has_data_ && get<0>(data_).timestamp <= ts) { f(get<0>(data_), get<1>(data_)); has_data_ = false; } else if (has_data_) { return false; } bool partial = false; while (stream_->good() || buffer_.nonparsed_size() > 0) { if (buffer_.nonparsed_size() == 0 || partial) { buffer_.reserve_buffer(10000000); stream_->read(buffer_.buffer(), 10000000); //if (stream_->bad()) return false; int bytes = stream_->gcount(); if (bytes == 0) return false; buffer_.buffer_consumed(bytes); partial = false; } msgpack::object_handle msg; if (!buffer_.next(msg)) { LOG(INFO) << "NO Message: " << buffer_.nonparsed_size(); partial = true; continue; } std::tuple<StreamPacket,Packet> data; msgpack::object obj = msg.get(); obj.convert(data); if (get<0>(data).timestamp <= ts) { f(get<0>(data),get<1>(data)); } else { data_ = data; has_data_ = true; return true; } } return false; } bool Reader::end() { return true; }