#include <fstream> #include <ftl/streams/filestream.hpp> #include <ftl/timer.hpp> #define LOGURU_REPLACE_GLOG 1 #include <loguru.hpp> using ftl::stream::File; using ftl::codecs::StreamPacket; using ftl::codecs::Packet; using std::get; using ftl::codecs::Channel; File::File(nlohmann::json &config) : Stream(config), ostream_(nullptr), istream_(nullptr), active_(false) { mode_ = Mode::Read; jobs_ = 0; checked_ = false; save_data_ = value("save_data", false); on("save_data", [this]() { save_data_ = value("save_data", false); }); } File::File(nlohmann::json &config, std::ifstream *is) : Stream(config), ostream_(nullptr), istream_(is), active_(false) { mode_ = Mode::Read; jobs_ = 0; checked_ = false; save_data_ = false; } File::File(nlohmann::json &config, std::ofstream *os) : Stream(config), ostream_(os), istream_(nullptr), active_(false) { mode_ = Mode::Write; jobs_ = 0; checked_ = false; save_data_ = value("save_data", false); on("save_data", [this]() { save_data_ = value("save_data", false); }); } File::~File() { end(); } bool File::_checkFile() { if (!_open()) return false; LOG(INFO) << "FTL format version " << version_; // Read some packets to identify frame rate. int count = 10; int64_t ts = -1000; int min_ts_diff = 1000; first_ts_ = 10000000000000ll; while (count > 0) { std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> data; if (!readPacket(data)) { break; } auto &spkt = std::get<0>(data); //auto &pkt = std::get<1>(data); if (spkt.timestamp < first_ts_) first_ts_ = spkt.timestamp; //LOG(INFO) << "TIMESTAMP: " << spkt.timestamp; if (spkt.timestamp > 0 && int(spkt.channel) < 32) { if (spkt.timestamp > ts) { --count; auto d = spkt.timestamp - ts; if (d < min_ts_diff && d > 0) { min_ts_diff = d; } ts = spkt.timestamp; } } } buffer_in_.reset(); buffer_in_.remove_nonparsed_buffer(); checked_ = true; is_video_ = count < 9; LOG(INFO) << " -- Frame rate = " << (1000 / min_ts_diff); if (!is_video_) LOG(INFO) << " -- Static image"; interval_ = min_ts_diff; return true; } bool File::post(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p) { if (!active_) return false; if (mode_ != Mode::Write) { LOG(WARNING) << "Cannot write to read-only ftl file"; return false; } //LOG(INFO) << "WRITE: " << s.timestamp << " " << (int)s.channel << " " << p.data.size(); // Don't write dummy packets to files. if (p.data.size() == 0) return true; available(s.streamID) += s.channel; // Discard all data channel packets for now // TODO: Allow saving of data channels once formats have solidified. if (!save_data_ && static_cast<int>(s.channel) >= static_cast<int>(ftl::codecs::Channel::Data)) return true; ftl::codecs::StreamPacket s2 = s; // Adjust timestamp relative to start of file. //s2.timestamp -= timestart_; auto data = std::make_tuple(s2,p); msgpack::sbuffer buffer; msgpack::pack(buffer, data); UNIQUE_LOCK(mutex_, lk); ostream_->write(buffer.data(), buffer.size()); return ostream_->good(); } bool File::readPacket(std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> &data) { bool partial = false; while ((istream_->good()) || buffer_in_.nonparsed_size() > 0u) { if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) { buffer_in_.reserve_buffer(10000000); istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity()); //if (stream_->bad()) return false; int bytes = istream_->gcount(); if (bytes == 0) return false; buffer_in_.buffer_consumed(bytes); partial = false; } msgpack::object_handle msg; if (!buffer_in_.next(msg)) { partial = true; continue; } msgpack::object obj = msg.get(); try { // Older versions have a different SPKT structure. if (version_ < 5) { std::tuple<ftl::codecs::StreamPacketV4, ftl::codecs::Packet> datav4; obj.convert(datav4); auto &spkt = std::get<0>(data); auto &spktv4 = std::get<0>(datav4); spkt.streamID = spktv4.streamID; spkt.channel = spktv4.channel; spkt.frame_number = spktv4.frame_number; spkt.timestamp = spktv4.timestamp; spkt.flags = 0; std::get<1>(data) = std::move(std::get<1>(datav4)); } else { obj.convert(data); } } catch (std::exception &e) { LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what(); //active_ = false; return false; } // Correct for older version differences. _patchPackets(std::get<0>(data), std::get<1>(data)); return true; } return false; } void File::_patchPackets(ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &pkt) { // Fix to clear flags for version 2. if (version_ <= 2) { pkt.flags = 0; } if (version_ < 4) { spkt.frame_number = spkt.streamID; spkt.streamID = 0; if (isFloatChannel(spkt.channel)) pkt.flags |= ftl::codecs::kFlagFloat; auto codec = pkt.codec; if (codec == ftl::codecs::codec_t::HEVC) pkt.codec = ftl::codecs::codec_t::HEVC_LOSSLESS; } spkt.version = 5; // Fix for flags corruption if (pkt.data.size() == 0) { pkt.flags = 0; } } bool File::tick(int64_t ts) { if (!active_) return false; if (mode_ != Mode::Read) { LOG(ERROR) << "Cannot read from a write only file"; return false; } #ifdef DEBUG_MUTEX UNIQUE_LOCK(mutex_, lk); #else std::unique_lock<std::mutex> lk(mutex_, std::defer_lock); if (!lk.try_lock()) return true; #endif if (jobs_ > 0) { //LOG(ERROR) << "STILL HAS JOBS"; return true; } // Check buffer first for frames already read { UNIQUE_LOCK(data_mutex_, dlk); for (auto i = data_.begin(); i != data_.end(); ++i) { if (std::get<0>(*i).timestamp <= timestamp_) { ++jobs_; std::get<0>(*i).timestamp = ts; ftl::pool.push([this,i](int id) { auto &spkt = std::get<0>(*i); auto &pkt = std::get<1>(*i); try { cb_.trigger(spkt, pkt); } catch (const ftl::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what() << e.trace(); } catch (std::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what(); } //LOG(INFO) << "ERASE: " << spkt.timestamp << ", " << spkt.frameNumber() << ", " << (int)spkt.channel; UNIQUE_LOCK(data_mutex_, dlk); data_.erase(i); --jobs_; }); } } } int64_t extended_ts = timestamp_ + 200; // Buffer 200ms ahead while ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u) { UNIQUE_LOCK(data_mutex_, dlk); auto *lastData = (data_.size() > 0) ? &data_.back() : nullptr; auto &data = data_.emplace_back(); dlk.unlock(); bool res = readPacket(data); if (!res) { UNIQUE_LOCK(data_mutex_, dlk); data_.pop_back(); break; } // Adjust timestamp // FIXME: A potential bug where multiple times are merged into one? std::get<0>(data).timestamp = (((std::get<0>(data).timestamp) - first_ts_) / interval_) * interval_ + timestart_; std::get<0>(data).hint_capability = ((is_video_) ? 0 : ftl::codecs::kStreamCap_Static) | ftl::codecs::kStreamCap_Recorded; // Maintain availability of channels. available(0) += std::get<0>(data).channel; // This should only occur for first few frames, generally otherwise // the data buffer is already several frames ahead so is processed // above. Hence, no need to bother parallelising this bit. if (std::get<0>(data).timestamp <= timestamp_) { std::get<0>(data).timestamp = ts; //if (cb_) { dlk.lock(); try { cb_.trigger(std::get<0>(data),std::get<1>(data)); } catch (std::exception &e) { LOG(ERROR) << "Exception in packet callback: " << e.what(); } data_.pop_back(); //} } else if (version_ < 5 && lastData) { // For versions < 5, add completed flag to previous data std::get<0>(*lastData).flags |= ftl::codecs::kFlagCompleted; } if (std::get<0>(data).timestamp > extended_ts) { break; } } timestamp_ += interval_; if (data_.size() == 0 && value("looping", true)) { buffer_in_.reset(); buffer_in_.remove_nonparsed_buffer(); _open(); timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); timestamp_ = timestart_; return true; } return data_.size() > 0; } bool File::_open() { if (istream_ && istream_->is_open()) { istream_->clear(); istream_->seekg(0); } else { if (!istream_) istream_ = new std::ifstream; istream_->open(*get<std::string>("filename"), std::ifstream::in | std::ifstream::binary); if (!istream_->good()) { LOG(ERROR) << "Could not open file: " << *get<std::string>("filename"); return false; } } ftl::codecs::Header h; (*istream_).read((char*)&h, sizeof(h)); if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; if (h.version >= 2) { ftl::codecs::IndexHeader ih; (*istream_).read((char*)&ih, sizeof(ih)); } version_ = h.version; return true; } bool File::run() { timer_ = ftl::timer::add(ftl::timer::kTimerMain, [this](int64_t ts) { tick(ts); return active_; }); return true; } bool File::begin(bool dorun) { if (active_) return true; if (mode_ == Mode::Read) { if (!checked_) _checkFile(); _open(); // Capture current time to adjust timestamps timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); active_ = true; //interval_ = 40; timestamp_ = timestart_; tick(timestart_); // Do some now! if (dorun) run(); } else if (mode_ == Mode::Write) { if (!ostream_) ostream_ = new std::ofstream; ostream_->open(*get<std::string>("filename"), std::ifstream::out | std::ifstream::binary); if (!ostream_->good()) { LOG(ERROR) << "Could not open file: '" << *get<std::string>("filename") << "'"; return false; } ftl::codecs::Header h; //h.version = 2; (*ostream_).write((const char*)&h, sizeof(h)); ftl::codecs::IndexHeader ih; ih.reserved[0] = -1; (*ostream_).write((const char*)&ih, sizeof(ih)); // Capture current time to adjust timestamps timestart_ = ftl::timer::get_time(); active_ = true; interval_ = ftl::timer::getInterval(); timestamp_ = timestart_; } return true; } bool File::end() { UNIQUE_LOCK(mutex_, lk); if (!active_) return false; active_ = false; timer_.cancel(); if (mode_ == Mode::Read) { if (istream_) { istream_->close(); delete istream_; istream_ = nullptr; } } else if (mode_ == Mode::Write) { if (ostream_) { ostream_->close(); delete ostream_; ostream_ = nullptr; } } return true; } void File::reset() { UNIQUE_LOCK(mutex_, lk); // TODO: Find a better solution while (jobs_ > 0) std::this_thread::sleep_for(std::chrono::milliseconds(2)); data_.clear(); buffer_in_.reset(); buffer_in_.remove_nonparsed_buffer(); _open(); timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); timestamp_ = timestart_; } bool File::active() { return active_; }