Skip to content
Snippets Groups Projects
Commit 309d9632 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/213/recorder' into 'master'

Resolves #213 and resolves #214

Closes #213 and #214

See merge request nicolas.pope/ftl!148
parents 08cd039d 6cddc425
No related branches found
No related tags found
1 merge request!148Resolves #213 and resolves #214
Pipeline #15980 passed
...@@ -66,34 +66,6 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { ...@@ -66,34 +66,6 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
return rz * rx * ry; return rz * rx * ry;
} }
static void writeSourceProperties(ftl::codecs::Writer &writer, int id, ftl::rgbd::Source *src) {
ftl::codecs::StreamPacket spkt;
ftl::codecs::Packet pkt;
spkt.timestamp = 0;
spkt.streamID = id;
spkt.channel = Channel::Calibration;
spkt.channel_count = 1;
pkt.codec = ftl::codecs::codec_t::CALIBRATION;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)&src->parameters(), (uint8_t*)&src->parameters() + sizeof(ftl::rgbd::Camera)));
writer.write(spkt, pkt);
spkt.channel = Channel::Pose;
pkt.codec = ftl::codecs::codec_t::POSE;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)src->getPose().data(), (uint8_t*)src->getPose().data() + 4*4*sizeof(double)));
writer.write(spkt, pkt);
}
static void run(ftl::Configurable *root) { static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net"); Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root); ftl::ctrl::Slave slave(net, root);
...@@ -128,7 +100,7 @@ static void run(ftl::Configurable *root) { ...@@ -128,7 +100,7 @@ static void run(ftl::Configurable *root) {
LOG(INFO) << "Found " << (max_stream+1) << " sources in " << path; LOG(INFO) << "Found " << (max_stream+1) << " sources in " << path;
// For each stream found, add a source object // For each stream found, add a source object
for (int i=0; i<max_stream; ++i) { for (int i=0; i<=max_stream; ++i) {
root->getConfig()["sources"].push_back(nlohmann::json{{"uri",std::string("file://") + path + std::string("#") + std::to_string(i)}}); root->getConfig()["sources"].push_back(nlohmann::json{{"uri",std::string("file://") + path + std::string("#") + std::to_string(i)}});
} }
} }
...@@ -246,14 +218,15 @@ static void run(ftl::Configurable *root) { ...@@ -246,14 +218,15 @@ static void run(ftl::Configurable *root) {
fileout.open(std::string(timestamp) + ".ftl"); fileout.open(std::string(timestamp) + ".ftl");
writer.begin(); writer.begin();
group->addRawCallback(std::function(recorder));
// TODO: Write pose+calibration+config packets // TODO: Write pose+calibration+config packets
auto sources = group->sources(); auto sources = group->sources();
for (int i=0; i<sources.size(); ++i) { for (int i=0; i<sources.size(); ++i) {
writeSourceProperties(writer, i, sources[i]); //writeSourceProperties(writer, i, sources[i]);
sources[i]->inject(Channel::Calibration, sources[i]->parameters(), Channel::Left, sources[i]->getCapabilities());
sources[i]->inject(sources[i]->getPose());
} }
group->addRawCallback(std::function(recorder));
} else { } else {
group->removeRawCallback(recorder); group->removeRawCallback(recorder);
writer.end(); writer.end();
......
...@@ -62,34 +62,6 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { ...@@ -62,34 +62,6 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
return rz * rx * ry; return rz * rx * ry;
} }
static void writeSourceProperties(ftl::codecs::Writer &writer, int id, ftl::rgbd::Source *src) {
ftl::codecs::StreamPacket spkt;
ftl::codecs::Packet pkt;
spkt.timestamp = 0;
spkt.streamID = id;
spkt.channel = Channel::Calibration;
spkt.channel_count = 1;
pkt.codec = ftl::codecs::codec_t::CALIBRATION;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)&src->parameters(), (uint8_t*)&src->parameters() + sizeof(ftl::rgbd::Camera)));
writer.write(spkt, pkt);
spkt.channel = Channel::Pose;
pkt.codec = ftl::codecs::codec_t::POSE;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)src->getPose().data(), (uint8_t*)src->getPose().data() + 4*4*sizeof(double)));
writer.write(spkt, pkt);
}
static void run(ftl::Configurable *root) { static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net"); Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root); ftl::ctrl::Slave slave(net, root);
...@@ -199,7 +171,9 @@ static void run(ftl::Configurable *root) { ...@@ -199,7 +171,9 @@ static void run(ftl::Configurable *root) {
// TODO: Write pose+calibration+config packets // TODO: Write pose+calibration+config packets
auto sources = group->sources(); auto sources = group->sources();
for (int i=0; i<sources.size(); ++i) { for (int i=0; i<sources.size(); ++i) {
writeSourceProperties(writer, i, sources[i]); //writeSourceProperties(writer, i, sources[i]);
sources[i]->inject(Channel::Calibration, sources[i]->parameters(), Channel::Left, sources[i]->getCapabilities());
sources[i]->inject(sources[i]->getPose());
} }
} else { } else {
//group->removeRawCallback(recorder); //group->removeRawCallback(recorder);
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <msgpack.hpp> #include <msgpack.hpp>
//#include <Eigen/Eigen> //#include <Eigen/Eigen>
#include <ftl/threads.hpp>
#include <ftl/codecs/packet.hpp> #include <ftl/codecs/packet.hpp>
namespace ftl { namespace ftl {
...@@ -24,6 +25,7 @@ class Writer { ...@@ -24,6 +25,7 @@ class Writer {
msgpack::sbuffer buffer_; msgpack::sbuffer buffer_;
int64_t timestart_; int64_t timestart_;
bool active_; bool active_;
MUTEX mutex_;
}; };
} }
......
...@@ -37,6 +37,22 @@ bool Reader::begin() { ...@@ -37,6 +37,22 @@ bool Reader::begin() {
return true; return true;
} }
/*static void printMsgpack(msgpack::object &obj) {
switch (obj.type) {
case msgpack::type::NIL: return;
case msgpack::type::BOOLEAN: LOG(INFO) << "BOOL " << obj.as<bool>(); return;
case msgpack::type::POSITIVE_INTEGER:
case msgpack::type::NEGATIVE_INTEGER: LOG(INFO) << "INT " << obj.as<int>(); return;
case msgpack::type::FLOAT32: LOG(INFO) << "FLOAT " << obj.as<float>(); return;
case msgpack::type::FLOAT64: LOG(INFO) << "DOUBLE " << obj.as<double>(); return;
case msgpack::type::STR: LOG(INFO) << "STRING " << obj.as<std::string>(); return;
case msgpack::type::BIN: return;
case msgpack::type::EXT: return;
case msgpack::type::ARRAY: LOG(INFO) << "ARRAY: "; return;
case msgpack::type::MAP: LOG(INFO) << "MAP: "; return;
}
}*/
bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) { bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, ftl::codecs::Packet &)> &f) {
//UNIQUE_LOCK(mtx_, lk); //UNIQUE_LOCK(mtx_, lk);
std::unique_lock<std::mutex> lk(mtx_, std::defer_lock); std::unique_lock<std::mutex> lk(mtx_, std::defer_lock);
...@@ -75,10 +91,13 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream ...@@ -75,10 +91,13 @@ bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::Stream
//std::tuple<StreamPacket,Packet> data; //std::tuple<StreamPacket,Packet> data;
msgpack::object obj = msg.get(); msgpack::object obj = msg.get();
//printMsgpack(obj);
try { try {
obj.convert(data_.emplace_back()); obj.convert(data_.emplace_back());
} catch (std::exception &e) { } catch (std::exception &e) {
LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size(); LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size() << " - " << e.what();
//partial = true; //partial = true;
//continue; //continue;
return false; return false;
......
...@@ -41,7 +41,8 @@ bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet ...@@ -41,7 +41,8 @@ bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet
auto data = std::make_tuple(s2,p); auto data = std::make_tuple(s2,p);
msgpack::sbuffer buffer; msgpack::sbuffer buffer;
msgpack::pack(buffer, data); msgpack::pack(buffer, data);
UNIQUE_LOCK(mutex_, lk);
(*stream_).write(buffer.data(), buffer.size()); (*stream_).write(buffer.data(), buffer.size());
//buffer_.clear();
return true; return true;
} }
...@@ -216,6 +216,8 @@ class Source : public ftl::Configurable { ...@@ -216,6 +216,8 @@ class Source : public ftl::Configurable {
template <typename... ARGS> template <typename... ARGS>
void inject(ftl::codecs::Channel c, ARGS... args); void inject(ftl::codecs::Channel c, ARGS... args);
void inject(const Eigen::Matrix4d &pose);
protected: protected:
detail::Source *impl_; detail::Source *impl_;
Eigen::Matrix4d pose_; Eigen::Matrix4d pose_;
......
...@@ -323,3 +323,21 @@ void Source::notify(int64_t ts, cv::Mat &c1, cv::Mat &c2) { ...@@ -323,3 +323,21 @@ void Source::notify(int64_t ts, cv::Mat &c1, cv::Mat &c2) {
if (callback_) callback_(ts, c1, c2); if (callback_) callback_(ts, c1, c2);
} }
void Source::inject(const Eigen::Matrix4d &pose) {
ftl::codecs::StreamPacket spkt;
ftl::codecs::Packet pkt;
spkt.timestamp = impl_->timestamp_;
spkt.channel_count = 0;
spkt.channel = Channel::Pose;
spkt.streamID = 0;
pkt.codec = ftl::codecs::codec_t::POSE;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)pose.data(), (uint8_t*)pose.data() + 4*4*sizeof(double)));
notifyRaw(spkt, pkt);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment