diff --git a/components/streams/include/ftl/streams/filestream.hpp b/components/streams/include/ftl/streams/filestream.hpp index e1677b413c271554a1a99b0d908497a15583d2e6..be415989073ba51345f13e0117f17320192e911b 100644 --- a/components/streams/include/ftl/streams/filestream.hpp +++ b/components/streams/include/ftl/streams/filestream.hpp @@ -39,6 +39,12 @@ class File : public Stream { */ bool tick(int64_t); + /** + * Directly read a packet. Returns false if no more packets exist, true + * otherwise. The callback is called when a packet is read. + */ + bool readPacket(std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> &); + enum class Mode { Read, Write, @@ -52,6 +58,7 @@ class File : public Stream { std::ofstream *ostream_; std::ifstream *istream_; + bool checked_; Mode mode_; msgpack::sbuffer buffer_out_; msgpack::unpacker buffer_in_; @@ -67,6 +74,9 @@ class File : public Stream { MUTEX mutex_; MUTEX data_mutex_; std::atomic<int> jobs_; + + bool _open(); + bool _checkFile(); }; } diff --git a/components/streams/src/filestream.cpp b/components/streams/src/filestream.cpp index 3d96b6659086ab3ed56897317d07f2c2a04852d6..f8ee805b04d3335d94b16c50d4c6539f9e88c44a 100644 --- a/components/streams/src/filestream.cpp +++ b/components/streams/src/filestream.cpp @@ -8,26 +8,71 @@ using ftl::stream::File; using ftl::codecs::StreamPacket; using ftl::codecs::Packet; using std::get; +using ftl::codecs::Channel; File::File(nlohmann::json &config) : Stream(config), ostream_(nullptr), istream_(nullptr), active_(false) { mode_ = Mode::Read; jobs_ = 0; + checked_ = false; } File::File(nlohmann::json &config, std::ifstream *is) : Stream(config), ostream_(nullptr), istream_(is), active_(false) { mode_ = Mode::Read; jobs_ = 0; + checked_ = false; } File::File(nlohmann::json &config, std::ofstream *os) : Stream(config), ostream_(os), istream_(nullptr), active_(false) { mode_ = Mode::Write; jobs_ = 0; + checked_ = false; } File::~File() { end(); } +bool File::_checkFile() { + if (!_open()) return false; + + LOG(INFO) << "FTL format version " << version_; + + // Read some packets to identify frame rate. + int count = 10; + int64_t ts = -1000; + int min_ts_diff = 1000; + + while (count > 0) { + std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> data; + if (!readPacket(data)) { + break; + } + + auto &spkt = std::get<0>(data); + auto &pkt = std::get<1>(data); + + if (spkt.timestamp > 0 && int(spkt.channel) < 32) { + if (spkt.timestamp > ts) { + --count; + auto d = spkt.timestamp - ts; + if (d < min_ts_diff && d > 0) { + min_ts_diff = d; + } + ts = spkt.timestamp; + } + } + } + + buffer_in_.reset(); + buffer_in_.remove_nonparsed_buffer(); + + checked_ = true; + + LOG(INFO) << " -- Frame rate = " << (1000 / min_ts_diff); + interval_ = min_ts_diff; + return true; +} + bool File::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { cb_ = f; return true; @@ -62,6 +107,57 @@ bool File::post(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p return ostream_->good(); } +bool File::readPacket(std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> &data) { + bool partial = false; + + while ((istream_->good()) || buffer_in_.nonparsed_size() > 0u) { + if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) { + buffer_in_.reserve_buffer(10000000); + istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity()); + //if (stream_->bad()) return false; + + int bytes = istream_->gcount(); + if (bytes == 0) return false; + buffer_in_.buffer_consumed(bytes); + partial = false; + } + + msgpack::object_handle msg; + if (!buffer_in_.next(msg)) { + partial = true; + continue; + } + + msgpack::object obj = msg.get(); + + try { + obj.convert(data); + } catch (std::exception &e) { + LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what(); + active_ = false; + return false; + } + + // Fix to clear flags for version 2. + if (version_ <= 2) { + std::get<1>(data).flags = 0; + } + if (version_ < 4) { + std::get<0>(data).frame_number = std::get<0>(data).streamID; + std::get<0>(data).streamID = 0; + if (isFloatChannel(std::get<0>(data).channel)) std::get<1>(data).flags |= ftl::codecs::kFlagFloat; + + auto codec = std::get<1>(data).codec; + if (codec == ftl::codecs::codec_t::HEVC) std::get<1>(data).codec = ftl::codecs::codec_t::HEVC_LOSSLESS; + } + std::get<0>(data).version = 4; + + return true; + } + + return false; +} + bool File::tick(int64_t ts) { if (!active_) return false; if (mode_ != Mode::Read) { @@ -110,54 +206,20 @@ bool File::tick(int64_t ts) { int64_t extended_ts = timestamp_ + 200; // Buffer 200ms ahead while ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u) { - if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) { - buffer_in_.reserve_buffer(10000000); - istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity()); - //if (stream_->bad()) return false; - - int bytes = istream_->gcount(); - if (bytes == 0) break; - buffer_in_.buffer_consumed(bytes); - partial = false; - } - - msgpack::object_handle msg; - if (!buffer_in_.next(msg)) { - partial = true; - continue; - } - - msgpack::object obj = msg.get(); - UNIQUE_LOCK(data_mutex_, dlk); auto &data = data_.emplace_back(); dlk.unlock(); - try { - obj.convert(data); - } catch (std::exception &e) { - LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what(); - active_ = false; - return false; + bool res = readPacket(data); + if (!res) { + UNIQUE_LOCK(data_mutex_, dlk); + data_.pop_back(); + break; } // Adjust timestamp + // FIXME: A potential bug where multiple times are merged into one? std::get<0>(data).timestamp = ((std::get<0>(data).timestamp) / interval_) * interval_ + timestart_; - //std::get<0>(data).timestamp = std::get<0>(data).timestamp + timestart_; - - // Fix to clear flags for version 2. - if (version_ <= 2) { - std::get<1>(data).flags = 0; - } - if (version_ < 4) { - std::get<0>(data).frame_number = std::get<0>(data).streamID; - std::get<0>(data).streamID = 0; - if (isFloatChannel(std::get<0>(data).channel)) std::get<1>(data).flags |= ftl::codecs::kFlagFloat; - - auto codec = std::get<1>(data).codec; - if (codec == ftl::codecs::codec_t::HEVC) std::get<1>(data).codec = ftl::codecs::codec_t::HEVC_LOSSLESS; - } - std::get<0>(data).version = 4; // Maintain availability of channels. available(0) += std::get<0>(data).channel; @@ -184,15 +246,7 @@ bool File::tick(int64_t ts) { timestamp_ += interval_; if (data_.size() == 0 && value("looping", true)) { - istream_->clear(); - istream_->seekg(0); - - ftl::codecs::Header h; - (*istream_).read((char*)&h, sizeof(h)); - if (h.version >= 2) { - ftl::codecs::IndexHeader ih; - (*istream_).read((char*)&ih, sizeof(ih)); - } + _open(); timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); timestamp_ = timestart_; @@ -202,16 +256,11 @@ bool File::tick(int64_t ts) { return data_.size() > 0; } -bool File::run() { - timer_ = ftl::timer::add(ftl::timer::kTimerMain, [this](int64_t ts) { - tick(ts); - return active_; - }); - return true; -} - -bool File::begin(bool dorun) { - if (mode_ == Mode::Read) { +bool File::_open() { + if (istream_ && istream_->is_open()) { + istream_->clear(); + istream_->seekg(0); + } else { if (!istream_) istream_ = new std::ifstream; istream_->open(*get<std::string>("filename")); @@ -219,23 +268,38 @@ bool File::begin(bool dorun) { LOG(ERROR) << "Could not open file: " << *get<std::string>("filename"); return false; } + } - ftl::codecs::Header h; - (*istream_).read((char*)&h, sizeof(h)); - if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; + ftl::codecs::Header h; + (*istream_).read((char*)&h, sizeof(h)); + if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; - if (h.version >= 2) { - ftl::codecs::IndexHeader ih; - (*istream_).read((char*)&ih, sizeof(ih)); - } + if (h.version >= 2) { + ftl::codecs::IndexHeader ih; + (*istream_).read((char*)&ih, sizeof(ih)); + } - version_ = h.version; - LOG(INFO) << "FTL format version " << version_; + version_ = h.version; + return true; +} + +bool File::run() { + timer_ = ftl::timer::add(ftl::timer::kTimerMain, [this](int64_t ts) { + tick(ts); + return active_; + }); + return true; +} + +bool File::begin(bool dorun) { + if (mode_ == Mode::Read) { + if (!checked_) _checkFile(); + _open(); // Capture current time to adjust timestamps timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); active_ = true; - interval_ = 50; //ftl::timer::getInterval(); // FIXME: Use correct interval!! + //interval_ = 40; timestamp_ = timestart_; tick(timestart_); // Do some now!