Skip to content
Snippets Groups Projects
Commit e8f3e516 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #323 File FPS detection

parent 3db65899
No related branches found
No related tags found
No related merge requests found
......@@ -39,6 +39,12 @@ class File : public Stream {
*/
bool tick(int64_t);
/**
* Directly read a packet. Returns false if no more packets exist, true
* otherwise. The callback is called when a packet is read.
*/
bool readPacket(std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> &);
enum class Mode {
Read,
Write,
......@@ -52,6 +58,7 @@ class File : public Stream {
std::ofstream *ostream_;
std::ifstream *istream_;
bool checked_;
Mode mode_;
msgpack::sbuffer buffer_out_;
msgpack::unpacker buffer_in_;
......@@ -67,6 +74,9 @@ class File : public Stream {
MUTEX mutex_;
MUTEX data_mutex_;
std::atomic<int> jobs_;
bool _open();
bool _checkFile();
};
}
......
......@@ -8,26 +8,71 @@ using ftl::stream::File;
using ftl::codecs::StreamPacket;
using ftl::codecs::Packet;
using std::get;
using ftl::codecs::Channel;
File::File(nlohmann::json &config) : Stream(config), ostream_(nullptr), istream_(nullptr), active_(false) {
mode_ = Mode::Read;
jobs_ = 0;
checked_ = false;
}
File::File(nlohmann::json &config, std::ifstream *is) : Stream(config), ostream_(nullptr), istream_(is), active_(false) {
mode_ = Mode::Read;
jobs_ = 0;
checked_ = false;
}
File::File(nlohmann::json &config, std::ofstream *os) : Stream(config), ostream_(os), istream_(nullptr), active_(false) {
mode_ = Mode::Write;
jobs_ = 0;
checked_ = false;
}
File::~File() {
end();
}
bool File::_checkFile() {
if (!_open()) return false;
LOG(INFO) << "FTL format version " << version_;
// Read some packets to identify frame rate.
int count = 10;
int64_t ts = -1000;
int min_ts_diff = 1000;
while (count > 0) {
std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> data;
if (!readPacket(data)) {
break;
}
auto &spkt = std::get<0>(data);
auto &pkt = std::get<1>(data);
if (spkt.timestamp > 0 && int(spkt.channel) < 32) {
if (spkt.timestamp > ts) {
--count;
auto d = spkt.timestamp - ts;
if (d < min_ts_diff && d > 0) {
min_ts_diff = d;
}
ts = spkt.timestamp;
}
}
}
buffer_in_.reset();
buffer_in_.remove_nonparsed_buffer();
checked_ = true;
LOG(INFO) << " -- Frame rate = " << (1000 / min_ts_diff);
interval_ = min_ts_diff;
return true;
}
bool File::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) {
cb_ = f;
return true;
......@@ -62,6 +107,57 @@ bool File::post(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p
return ostream_->good();
}
bool File::readPacket(std::tuple<ftl::codecs::StreamPacket,ftl::codecs::Packet> &data) {
bool partial = false;
while ((istream_->good()) || buffer_in_.nonparsed_size() > 0u) {
if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) {
buffer_in_.reserve_buffer(10000000);
istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity());
//if (stream_->bad()) return false;
int bytes = istream_->gcount();
if (bytes == 0) return false;
buffer_in_.buffer_consumed(bytes);
partial = false;
}
msgpack::object_handle msg;
if (!buffer_in_.next(msg)) {
partial = true;
continue;
}
msgpack::object obj = msg.get();
try {
obj.convert(data);
} catch (std::exception &e) {
LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what();
active_ = false;
return false;
}
// Fix to clear flags for version 2.
if (version_ <= 2) {
std::get<1>(data).flags = 0;
}
if (version_ < 4) {
std::get<0>(data).frame_number = std::get<0>(data).streamID;
std::get<0>(data).streamID = 0;
if (isFloatChannel(std::get<0>(data).channel)) std::get<1>(data).flags |= ftl::codecs::kFlagFloat;
auto codec = std::get<1>(data).codec;
if (codec == ftl::codecs::codec_t::HEVC) std::get<1>(data).codec = ftl::codecs::codec_t::HEVC_LOSSLESS;
}
std::get<0>(data).version = 4;
return true;
}
return false;
}
bool File::tick(int64_t ts) {
if (!active_) return false;
if (mode_ != Mode::Read) {
......@@ -110,54 +206,20 @@ bool File::tick(int64_t ts) {
int64_t extended_ts = timestamp_ + 200; // Buffer 200ms ahead
while ((active_ && istream_->good()) || buffer_in_.nonparsed_size() > 0u) {
if (buffer_in_.nonparsed_size() == 0 || (partial && buffer_in_.nonparsed_size() < 10000000)) {
buffer_in_.reserve_buffer(10000000);
istream_->read(buffer_in_.buffer(), buffer_in_.buffer_capacity());
//if (stream_->bad()) return false;
int bytes = istream_->gcount();
if (bytes == 0) break;
buffer_in_.buffer_consumed(bytes);
partial = false;
}
msgpack::object_handle msg;
if (!buffer_in_.next(msg)) {
partial = true;
continue;
}
msgpack::object obj = msg.get();
UNIQUE_LOCK(data_mutex_, dlk);
auto &data = data_.emplace_back();
dlk.unlock();
try {
obj.convert(data);
} catch (std::exception &e) {
LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what();
active_ = false;
return false;
bool res = readPacket(data);
if (!res) {
UNIQUE_LOCK(data_mutex_, dlk);
data_.pop_back();
break;
}
// Adjust timestamp
// FIXME: A potential bug where multiple times are merged into one?
std::get<0>(data).timestamp = ((std::get<0>(data).timestamp) / interval_) * interval_ + timestart_;
//std::get<0>(data).timestamp = std::get<0>(data).timestamp + timestart_;
// Fix to clear flags for version 2.
if (version_ <= 2) {
std::get<1>(data).flags = 0;
}
if (version_ < 4) {
std::get<0>(data).frame_number = std::get<0>(data).streamID;
std::get<0>(data).streamID = 0;
if (isFloatChannel(std::get<0>(data).channel)) std::get<1>(data).flags |= ftl::codecs::kFlagFloat;
auto codec = std::get<1>(data).codec;
if (codec == ftl::codecs::codec_t::HEVC) std::get<1>(data).codec = ftl::codecs::codec_t::HEVC_LOSSLESS;
}
std::get<0>(data).version = 4;
// Maintain availability of channels.
available(0) += std::get<0>(data).channel;
......@@ -184,15 +246,7 @@ bool File::tick(int64_t ts) {
timestamp_ += interval_;
if (data_.size() == 0 && value("looping", true)) {
istream_->clear();
istream_->seekg(0);
ftl::codecs::Header h;
(*istream_).read((char*)&h, sizeof(h));
if (h.version >= 2) {
ftl::codecs::IndexHeader ih;
(*istream_).read((char*)&ih, sizeof(ih));
}
_open();
timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval();
timestamp_ = timestart_;
......@@ -202,16 +256,11 @@ bool File::tick(int64_t ts) {
return data_.size() > 0;
}
bool File::run() {
timer_ = ftl::timer::add(ftl::timer::kTimerMain, [this](int64_t ts) {
tick(ts);
return active_;
});
return true;
}
bool File::begin(bool dorun) {
if (mode_ == Mode::Read) {
bool File::_open() {
if (istream_ && istream_->is_open()) {
istream_->clear();
istream_->seekg(0);
} else {
if (!istream_) istream_ = new std::ifstream;
istream_->open(*get<std::string>("filename"));
......@@ -219,23 +268,38 @@ bool File::begin(bool dorun) {
LOG(ERROR) << "Could not open file: " << *get<std::string>("filename");
return false;
}
}
ftl::codecs::Header h;
(*istream_).read((char*)&h, sizeof(h));
if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false;
ftl::codecs::Header h;
(*istream_).read((char*)&h, sizeof(h));
if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false;
if (h.version >= 2) {
ftl::codecs::IndexHeader ih;
(*istream_).read((char*)&ih, sizeof(ih));
}
if (h.version >= 2) {
ftl::codecs::IndexHeader ih;
(*istream_).read((char*)&ih, sizeof(ih));
}
version_ = h.version;
LOG(INFO) << "FTL format version " << version_;
version_ = h.version;
return true;
}
bool File::run() {
timer_ = ftl::timer::add(ftl::timer::kTimerMain, [this](int64_t ts) {
tick(ts);
return active_;
});
return true;
}
bool File::begin(bool dorun) {
if (mode_ == Mode::Read) {
if (!checked_) _checkFile();
_open();
// Capture current time to adjust timestamps
timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval();
active_ = true;
interval_ = 50; //ftl::timer::getInterval(); // FIXME: Use correct interval!!
//interval_ = 40;
timestamp_ = timestart_;
tick(timestart_); // Do some now!
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment