Skip to content
Snippets Groups Projects
Commit e47aa6e7 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/198/playback' into 'master'

Implements #198 stream playback from file

Closes #198

See merge request nicolas.pope/ftl!128
parents e15e48db 90198906
No related branches found
No related tags found
1 merge request!128Implements #198 stream playback from file
Pipeline #15330 passed
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <functional> #include <functional>
#include <ftl/codecs/packet.hpp> #include <ftl/codecs/packet.hpp>
#include <ftl/threads.hpp>
namespace ftl { namespace ftl {
namespace codecs { namespace codecs {
...@@ -24,7 +25,7 @@ class Reader { ...@@ -24,7 +25,7 @@ class Reader {
* and the timestamps stored in the file are aligned to the time when open * and the timestamps stored in the file are aligned to the time when open
* was called. * was called.
*/ */
bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &);
/** /**
* An alternative version of read where packet events are generated for * An alternative version of read where packet events are generated for
...@@ -34,7 +35,7 @@ class Reader { ...@@ -34,7 +35,7 @@ class Reader {
*/ */
bool read(int64_t ts); bool read(int64_t ts);
void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &);
bool begin(); bool begin();
bool end(); bool end();
...@@ -49,7 +50,9 @@ class Reader { ...@@ -49,7 +50,9 @@ class Reader {
int64_t timestart_; int64_t timestart_;
bool playing_; bool playing_;
std::vector<std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)>> handlers_; MUTEX mtx_;
std::vector<std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)>> handlers_;
}; };
} }
......
...@@ -23,13 +23,17 @@ bool Reader::begin() { ...@@ -23,13 +23,17 @@ bool Reader::begin() {
if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false;
// Capture current time to adjust timestamps // Capture current time to adjust timestamps
timestart_ = ftl::timer::get_time(); timestart_ = (ftl::timer::get_time() / ftl::timer::getInterval()) * ftl::timer::getInterval();
playing_ = true; playing_ = true;
return true; return true;
} }
bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) {
//UNIQUE_LOCK(mtx_, lk);
std::unique_lock<std::mutex> lk(mtx_, std::defer_lock);
if (!lk.try_lock()) return true;
if (has_data_ && get<0>(data_).timestamp <= ts) { if (has_data_ && get<0>(data_).timestamp <= ts) {
f(get<0>(data_), get<1>(data_)); f(get<0>(data_), get<1>(data_));
has_data_ = false; has_data_ = false;
...@@ -85,14 +89,14 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream ...@@ -85,14 +89,14 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream
} }
bool Reader::read(int64_t ts) { bool Reader::read(int64_t ts) {
return read(ts, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { return read(ts, [this](const ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &pkt) {
if (handlers_.size() > spkt.streamID && (bool)handlers_[spkt.streamID]) { if (handlers_.size() > spkt.streamID && (bool)handlers_[spkt.streamID]) {
handlers_[spkt.streamID](spkt, pkt); handlers_[spkt.streamID](spkt, pkt);
} }
}); });
} }
void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) {
if (streamID >= handlers_.size()) handlers_.resize(streamID+1); if (streamID >= handlers_.size()) handlers_.resize(streamID+1);
handlers_[streamID] = f; handlers_[streamID] = f;
} }
......
#include "file_source.hpp" #include "file_source.hpp"
using ftl::rgbd::detail::FileSource; using ftl::rgbd::detail::FileSource;
using ftl::codecs::codec_t;
void FileSource::_createDecoder(int ix, const ftl::codecs::Packet &pkt) {
if (decoders_[ix]) {
if (!decoders_[ix]->accepts(pkt)) {
ftl::codecs::free(decoders_[ix]);
} else {
return;
}
}
LOG(INFO) << "Create a decoder: " << ix;
decoders_[ix] = ftl::codecs::allocateDecoder(pkt);
}
FileSource::FileSource(ftl::rgbd::Source *s, ftl::codecs::Reader *r, int sid) : ftl::rgbd::detail::Source(s) { FileSource::FileSource(ftl::rgbd::Source *s, ftl::codecs::Reader *r, int sid) : ftl::rgbd::detail::Source(s) {
reader_ = r; reader_ = r;
r->onPacket(sid, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { has_calibration_ = false;
LOG(INFO) << "PACKET RECEIVED " << spkt.streamID; decoders_[0] = nullptr;
decoders_[1] = nullptr;
cache_read_ = -1;
cache_write_ = 0;
r->onPacket(sid, [this](const ftl::codecs::StreamPacket &spkt, ftl::codecs::Packet &pkt) {
if (pkt.codec == codec_t::POSE) {
Eigen::Matrix4d p = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data());
host_->setPose(p);
} else if (pkt.codec == codec_t::CALIBRATION) {
ftl::rgbd::Camera *camera = (ftl::rgbd::Camera*)pkt.data.data();
LOG(INFO) << "Have calibration: " << camera->fx;
params_ = *camera;
has_calibration_ = true;
} else {
cache_[cache_write_].emplace_back();
auto &c = cache_[cache_write_].back();
// TODO: Attempt to avoid this copy operation
c.spkt = spkt;
c.pkt = pkt;
}
}); });
} }
...@@ -14,15 +49,49 @@ FileSource::~FileSource() { ...@@ -14,15 +49,49 @@ FileSource::~FileSource() {
} }
bool FileSource::capture(int64_t ts) { bool FileSource::capture(int64_t ts) {
reader_->read(ts); timestamp_ = ts;
return true; return true;
} }
bool FileSource::retrieve() { bool FileSource::retrieve() {
if (!reader_->read(timestamp_)) {
cache_write_ = -1;
}
return true; return true;
} }
void FileSource::swap() {
cache_read_ = cache_write_;
cache_write_ = (cache_write_ == 0) ? 1 : 0;
}
bool FileSource::compute(int n, int b) { bool FileSource::compute(int n, int b) {
if (cache_read_ < 0) return false;
for (auto i=cache_[cache_read_].begin(); i!=cache_[cache_read_].end(); ++i) {
auto &c = *i;
if (c.spkt.channel == 0) {
rgb_.create(cv::Size(ftl::codecs::getWidth(c.pkt.definition),ftl::codecs::getHeight(c.pkt.definition)), CV_8UC3);
} else {
depth_.create(cv::Size(ftl::codecs::getWidth(c.pkt.definition),ftl::codecs::getHeight(c.pkt.definition)), CV_32F);
}
_createDecoder(c.spkt.channel, c.pkt);
try {
decoders_[c.spkt.channel]->decode(c.pkt, (c.spkt.channel == 0) ? rgb_ : depth_);
} catch (std::exception &e) {
LOG(INFO) << "Decoder exception: " << e.what();
}
}
cache_[cache_read_].clear();
if (rgb_.empty() || depth_.empty()) return false;
auto cb = host_->callback();
if (cb) cb(timestamp_, rgb_, depth_);
return true; return true;
} }
......
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
#include <ftl/rgbd/source.hpp> #include <ftl/rgbd/source.hpp>
#include <ftl/codecs/reader.hpp> #include <ftl/codecs/reader.hpp>
#include <ftl/codecs/decoder.hpp>
#include <list>
namespace ftl { namespace ftl {
namespace rgbd { namespace rgbd {
...@@ -20,10 +23,25 @@ class FileSource : public detail::Source { ...@@ -20,10 +23,25 @@ class FileSource : public detail::Source {
bool retrieve(); bool retrieve();
bool compute(int n, int b); bool compute(int n, int b);
bool isReady(); bool isReady();
void swap();
//void reset(); //void reset();
private: private:
ftl::codecs::Reader *reader_; ftl::codecs::Reader *reader_;
bool has_calibration_;
struct PacketPair {
ftl::codecs::StreamPacket spkt;
ftl::codecs::Packet pkt;
};
std::list<PacketPair> cache_[2];
int cache_read_;
int cache_write_;
ftl::codecs::Decoder *decoders_[2];
void _createDecoder(int ix, const ftl::codecs::Packet &pkt);
}; };
} }
......
...@@ -121,6 +121,7 @@ ftl::rgbd::detail::Source *Source::_createFileImpl(const ftl::URI &uri) { ...@@ -121,6 +121,7 @@ ftl::rgbd::detail::Source *Source::_createFileImpl(const ftl::URI &uri) {
if (ext == "ftl") { if (ext == "ftl") {
ftl::codecs::Reader *reader = __createReader(path); ftl::codecs::Reader *reader = __createReader(path);
LOG(INFO) << "Playing track: " << uri.getFragment();
return new FileSource(this, reader, std::stoi(uri.getFragment())); return new FileSource(this, reader, std::stoi(uri.getFragment()));
} else if (ext == "png" || ext == "jpg") { } else if (ext == "png" || ext == "jpg") {
return new ImageSource(this, path); return new ImageSource(this, path);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment