diff --git a/components/codecs/include/ftl/codecs/reader.hpp b/components/codecs/include/ftl/codecs/reader.hpp index 949f037dd18ce136317481bd2f657c5be24359f1..65be39eaa31f31f485883378ed9b782418d1fbc7 100644 --- a/components/codecs/include/ftl/codecs/reader.hpp +++ b/components/codecs/include/ftl/codecs/reader.hpp @@ -7,6 +7,7 @@ #include <functional> #include <ftl/codecs/packet.hpp> +#include <ftl/threads.hpp> namespace ftl { namespace codecs { @@ -24,7 +25,7 @@ class Reader { * and the timestamps stored in the file are aligned to the time when open * was called. */ - bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); + bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &); /** * An alternative version of read where packet events are generated for @@ -34,7 +35,7 @@ class Reader { */ bool read(int64_t ts); - void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); + void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &); bool begin(); bool end(); @@ -49,7 +50,9 @@ class Reader { int64_t timestart_; bool playing_; - std::vector<std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)>> handlers_; + MUTEX mtx_; + + std::vector<std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)>> handlers_; }; } diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp index 5ab809b8fb4beb44a0a41151d551a8cdfe3263d6..96002aeeea252413f7af641cf006836fcf79b384 100644 --- a/components/codecs/src/reader.cpp +++ b/components/codecs/src/reader.cpp @@ -23,13 +23,17 @@ bool Reader::begin() { if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; // Capture current time to adjust timestamps - timestart_ = ftl::timer::get_time(); + timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval(); playing_ = true; return true; } -bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { +bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) { + //UNIQUE_LOCK(mtx_, lk); + std::unique_lock<std::mutex> lk(mtx_, std::defer_lock); + if (!lk.try_lock()) return true; + if (has_data_ && get<0>(data_).timestamp <= ts) { f(get<0>(data_), get<1>(data_)); has_data_ = false; @@ -85,14 +89,14 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream } bool Reader::read(int64_t ts) { - return read(ts, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + return read(ts, [this](const ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &pkt) { if (handlers_.size() > spkt.streamID && (bool)handlers_[spkt.streamID]) { handlers_[spkt.streamID](spkt, pkt); } }); } -void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { +void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) { if (streamID >= handlers_.size()) handlers_.resize(streamID+1); handlers_[streamID] = f; } diff --git a/components/rgbd-sources/src/file_source.cpp b/components/rgbd-sources/src/file_source.cpp index a7fe61169c0cdb626341756eaa18d8e2f0126a2d..f5e0e7b30e1ff75b8458129cca6a0f81975c0fdd 100644 --- a/components/rgbd-sources/src/file_source.cpp +++ b/components/rgbd-sources/src/file_source.cpp @@ -1,11 +1,46 @@ #include "file_source.hpp" using ftl::rgbd::detail::FileSource; +using ftl::codecs::codec_t; + +void FileSource::_createDecoder(int ix, const ftl::codecs::Packet &pkt) { + if (decoders_[ix]) { + if (!decoders_[ix]->accepts(pkt)) { + ftl::codecs::free(decoders_[ix]); + } else { + return; + } + } + + LOG(INFO) << "Create a decoder: " << ix; + decoders_[ix] = ftl::codecs::allocateDecoder(pkt); +} FileSource::FileSource(ftl::rgbd::Source *s, ftl::codecs::Reader *r, int sid) : ftl::rgbd::detail::Source(s) { reader_ = r; - r->onPacket(sid, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { - LOG(INFO) << "PACKET RECEIVED " << spkt.streamID; + has_calibration_ = false; + decoders_[0] = nullptr; + decoders_[1] = nullptr; + cache_read_ = -1; + cache_write_ = 0; + + r->onPacket(sid, [this](const ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &pkt) { + if (pkt.codec == codec_t::POSE) { + Eigen::Matrix4d p = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data()); + host_->setPose(p); + } else if (pkt.codec == codec_t::CALIBRATION) { + ftl::rgbd::Camera *camera = (ftl::rgbd::Camera*)pkt.data.data(); + LOG(INFO) << "Have calibration: " << camera->fx; + params_ = *camera; + has_calibration_ = true; + } else { + cache_[cache_write_].emplace_back(); + auto &c = cache_[cache_write_].back(); + + // TODO: Attempt to avoid this copy operation + c.spkt = spkt; + c.pkt = pkt; + } }); } @@ -14,15 +49,49 @@ FileSource::~FileSource() { } bool FileSource::capture(int64_t ts) { - reader_->read(ts); + timestamp_ = ts; return true; } bool FileSource::retrieve() { + if (!reader_->read(timestamp_)) { + cache_write_ = -1; + } return true; } +void FileSource::swap() { + cache_read_ = cache_write_; + cache_write_ = (cache_write_ == 0) ? 1 : 0; +} + bool FileSource::compute(int n, int b) { + if (cache_read_ < 0) return false; + + for (auto i=cache_[cache_read_].begin(); i!=cache_[cache_read_].end(); ++i) { + auto &c = *i; + + if (c.spkt.channel == 0) { + rgb_.create(cv::Size(ftl::codecs::getWidth(c.pkt.definition),ftl::codecs::getHeight(c.pkt.definition)), CV_8UC3); + } else { + depth_.create(cv::Size(ftl::codecs::getWidth(c.pkt.definition),ftl::codecs::getHeight(c.pkt.definition)), CV_32F); + } + + _createDecoder(c.spkt.channel, c.pkt); + + try { + decoders_[c.spkt.channel]->decode(c.pkt, (c.spkt.channel == 0) ? rgb_ : depth_); + } catch (std::exception &e) { + LOG(INFO) << "Decoder exception: " << e.what(); + } + } + + cache_[cache_read_].clear(); + + if (rgb_.empty() || depth_.empty()) return false; + + auto cb = host_->callback(); + if (cb) cb(timestamp_, rgb_, depth_); return true; } diff --git a/components/rgbd-sources/src/file_source.hpp b/components/rgbd-sources/src/file_source.hpp index f69ec8321e381eb749484beed2c34db5cbba2d74..06f8248846225654c35b70814ae379d4057b707e 100644 --- a/components/rgbd-sources/src/file_source.hpp +++ b/components/rgbd-sources/src/file_source.hpp @@ -6,6 +6,9 @@ #include <ftl/rgbd/source.hpp> #include <ftl/codecs/reader.hpp> +#include <ftl/codecs/decoder.hpp> + +#include <list> namespace ftl { namespace rgbd { @@ -20,10 +23,25 @@ class FileSource : public detail::Source { bool retrieve(); bool compute(int n, int b); bool isReady(); + void swap(); //void reset(); private: ftl::codecs::Reader *reader_; + bool has_calibration_; + + struct PacketPair { + ftl::codecs::StreamPacket spkt; + ftl::codecs::Packet pkt; + }; + + std::list<PacketPair> cache_[2]; + int cache_read_; + int cache_write_; + + ftl::codecs::Decoder *decoders_[2]; + + void _createDecoder(int ix, const ftl::codecs::Packet &pkt); }; } diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 96335342eb1ba013fbc79acb921df340ab2c1312..32cff11c40de330caf788c2a0a7b54817a141995 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -121,6 +121,7 @@ ftl::rgbd::detail::Source *Source::_createFileImpl(const ftl::URI &uri) { if (ext == "ftl") { ftl::codecs::Reader *reader = __createReader(path); + LOG(INFO) << "Playing track: " << uri.getFragment(); return new FileSource(this, reader, std::stoi(uri.getFragment())); } else if (ext == "png" || ext == "jpg") { return new ImageSource(this, path);