diff --git a/CMakeLists.txt b/CMakeLists.txt index b0aecb25a357a5c978db35a80cb20487adb5d08d..17ef352e58e8191f84a7a9388f6d3e823d5a1535 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -220,6 +220,7 @@ add_subdirectory(components/rgbd-sources) add_subdirectory(components/control/cpp) add_subdirectory(applications/calibration) add_subdirectory(applications/groupview) +add_subdirectory(applications/player) if (BUILD_RENDERER) add_subdirectory(components/renderers) diff --git a/applications/player/CMakeLists.txt b/applications/player/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c501c882464a5fbd2a13f2d1561de19b74a3cd76 --- /dev/null +++ b/applications/player/CMakeLists.txt @@ -0,0 +1,11 @@ +set(PLAYERSRC + src/main.cpp +) + +add_executable(ftl-player ${PLAYERSRC}) + +target_include_directories(ftl-player PRIVATE src) + +target_link_libraries(ftl-player ftlcommon ftlcodecs ftlrgbd Threads::Threads ${OpenCV_LIBS}) + + diff --git a/applications/player/src/main.cpp b/applications/player/src/main.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dc176ed03cff73170f639961e5d328cdf71fac18 --- /dev/null +++ b/applications/player/src/main.cpp @@ -0,0 +1,110 @@ +#include <loguru.hpp> +#include <ftl/configuration.hpp> +#include <ftl/codecs/reader.hpp> +#include <ftl/codecs/decoder.hpp> +#include <ftl/codecs/packet.hpp> +#include <ftl/rgbd/camera.hpp> + +#include <fstream> + +#include <Eigen/Eigen> + +using ftl::codecs::codec_t; + +static ftl::codecs::Decoder *decoder; + + +static void createDecoder(const ftl::codecs::Packet &pkt) { + if (decoder) { + if (!decoder->accepts(pkt)) { + ftl::codecs::free(decoder); + } else { + return; + } + } + + decoder = ftl::codecs::allocateDecoder(pkt); +} + +static void visualizeDepthMap( const cv::Mat &depth, cv::Mat &out, + const float max_depth) +{ + depth.convertTo(out, CV_8U, 255.0f / max_depth); + out = 255 - out; + //cv::Mat mask = (depth >= 39.0f); // TODO (mask for invalid pixels) + + applyColorMap(out, out, cv::COLORMAP_JET); + //out.setTo(cv::Scalar(255, 255, 255), mask); +} + +int main(int argc, char **argv) { + std::string filename(argv[1]); + LOG(INFO) << "Playing: " << filename; + + auto root = ftl::configure(argc, argv, "player_default"); + + std::ifstream f; + f.open(filename); + if (!f.is_open()) LOG(ERROR) << "Could not open file"; + + ftl::codecs::Reader r(f); + if (!r.begin()) LOG(ERROR) << "Bad ftl file"; + + LOG(INFO) << "Playing..."; + + int current_stream = 0; + int current_channel = 0; + + bool res = r.read(90000000000000, [¤t_stream,¤t_channel,&r](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + if (spkt.channel != current_channel) return; + if (spkt.streamID == current_stream) { + + if (pkt.codec == codec_t::POSE) { + Eigen::Matrix4d p = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data()); + LOG(INFO) << "Have pose: " << p; + return; + } + + if (pkt.codec == codec_t::CALIBRATION) { + ftl::rgbd::Camera *camera = (ftl::rgbd::Camera*)pkt.data.data(); + LOG(INFO) << "Have calibration: " << camera->fx; + return; + } + + LOG(INFO) << "Reading packet: (" << (int)spkt.streamID << "," << (int)spkt.channel << ") " << (int)pkt.codec << ", " << (int)pkt.definition; + + cv::Mat frame(cv::Size(ftl::codecs::getWidth(pkt.definition),ftl::codecs::getHeight(pkt.definition)), (spkt.channel == 1) ? CV_32F : CV_8UC3); + createDecoder(pkt); + + try { + decoder->decode(pkt, frame); + } catch (std::exception &e) { + LOG(INFO) << "Decoder exception: " << e.what(); + } + + if (!frame.empty()) { + if (spkt.channel == 1) { + visualizeDepthMap(frame, frame, 8.0f); + } + double time = (double)(spkt.timestamp - r.getStartTime()) / 1000.0; + cv::putText(frame, std::string("Time: ") + std::to_string(time) + std::string("s"), cv::Point(10,20), cv::FONT_HERSHEY_PLAIN, 1, cv::Scalar(0,0,255)); + cv::imshow("Player", frame); + } + int key = cv::waitKey(20); + if (key >= 48 && key <= 57) { + current_stream = key - 48; + } else if (key == 'd') { + current_channel = (current_channel == 0) ? 1 : 0; + } else if (key == 27) { + r.end(); + } + } + }); + + if (!res) LOG(ERROR) << "No frames left"; + + r.end(); + + ftl::running = false; + return 0; +} diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 2ea9408133893ead019b7f769d4c2446c2ac0a4f..7f34ad8b3e4896c768a006d9c537e1c911292cd5 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -15,6 +15,7 @@ #include <ftl/slave.hpp> #include <ftl/rgbd/group.hpp> #include <ftl/threads.hpp> +#include <ftl/codecs/writer.hpp> #include "ilw/ilw.hpp" #include <ftl/render/splat_render.hpp> @@ -63,6 +64,33 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { return rz * rx * ry; } +static void writeSourceProperties(ftl::codecs::Writer &writer, int id, ftl::rgbd::Source *src) { + ftl::codecs::StreamPacket spkt; + ftl::codecs::Packet pkt; + + spkt.timestamp = 0; + spkt.streamID = id; + spkt.channel = 0; + spkt.channel_count = 1; + pkt.codec = ftl::codecs::codec_t::CALIBRATION; + pkt.definition = ftl::codecs::definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = std::move(std::vector<uint8_t>((uint8_t*)&src->parameters(), (uint8_t*)&src->parameters() + sizeof(ftl::rgbd::Camera))); + + writer.write(spkt, pkt); + + pkt.codec = ftl::codecs::codec_t::POSE; + pkt.definition = ftl::codecs::definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = std::move(std::vector<uint8_t>((uint8_t*)src->getPose().data(), (uint8_t*)src->getPose().data() + 4*4*sizeof(double))); + + writer.write(spkt, pkt); +} + static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); ftl::ctrl::Slave slave(net, root); @@ -159,6 +187,46 @@ static void run(ftl::Configurable *root) { group.addSource(in); } + // ---- Recording code ----------------------------------------------------- + + std::ofstream fileout; + ftl::codecs::Writer writer(fileout); + auto recorder = [&writer,&group](ftl::rgbd::Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + ftl::codecs::StreamPacket s = spkt; + + // Patch stream ID to match order in group + s.streamID = group.streamID(src); + writer.write(s, pkt); + }; + + root->set("record", false); + + // Allow stream recording + root->on("record", [&group,&fileout,&writer,&recorder](const ftl::config::Event &e) { + if (e.entity->value("record", false)) { + char timestamp[18]; + std::time_t t=std::time(NULL); + std::strftime(timestamp, sizeof(timestamp), "%F-%H%M%S", std::localtime(&t)); + fileout.open(std::string(timestamp) + ".ftl"); + + writer.begin(); + + // TODO: Write pose+calibration+config packets + auto sources = group.sources(); + for (int i=0; i<sources.size(); ++i) { + writeSourceProperties(writer, i, sources[i]); + } + + group.addRawCallback(std::function(recorder)); + } else { + group.removeRawCallback(recorder); + writer.end(); + fileout.close(); + } + }); + + // ------------------------------------------------------------------------- + stream->setLatency(5); // FIXME: This depends on source!? stream->add(&group); stream->run(); diff --git a/components/codecs/CMakeLists.txt b/components/codecs/CMakeLists.txt index f951f94a6c10127c989862d154dbe6cee896812a..db41431d49b2ec8672b70dafb7f14ccfb7497d28 100644 --- a/components/codecs/CMakeLists.txt +++ b/components/codecs/CMakeLists.txt @@ -5,6 +5,8 @@ set(CODECSRC src/opencv_encoder.cpp src/opencv_decoder.cpp src/generate.cpp + src/writer.cpp + src/reader.cpp ) if (HAVE_NVPIPE) diff --git a/components/codecs/include/ftl/codecs/bitrates.hpp b/components/codecs/include/ftl/codecs/bitrates.hpp index 19572daa5600fd7b78106e951307e92e17f51e33..8e41b13f7e4d9b77ad38cf23fdb2622d2107a537 100644 --- a/components/codecs/include/ftl/codecs/bitrates.hpp +++ b/components/codecs/include/ftl/codecs/bitrates.hpp @@ -14,7 +14,15 @@ enum struct codec_t : uint8_t { JPG = 0, PNG, H264, - HEVC // H265 + HEVC, // H265 + + // TODO: Add audio codecs + WAV, + + JSON = 100, // A JSON string + CALIBRATION, // Camera parameters object + POSE, // 4x4 eigen matrix + RAW // Some unknown binary format (msgpack?) }; /** @@ -29,6 +37,8 @@ enum struct definition_t : uint8_t { SD480 = 5, LD360 = 6, Any = 7 + + // TODO: Add audio definitions }; /** diff --git a/components/codecs/include/ftl/codecs/packet.hpp b/components/codecs/include/ftl/codecs/packet.hpp index 98e46a60122e6813fd22b0c0be0f09e40fbc96ce..3b8d17151fb492a89c61686b5f9bcb72193dc08d 100644 --- a/components/codecs/include/ftl/codecs/packet.hpp +++ b/components/codecs/include/ftl/codecs/packet.hpp @@ -10,6 +10,14 @@ namespace ftl { namespace codecs { +/** + * First bytes of our file format. + */ +struct Header { + const char magic[4] = {'F','T','L','F'}; + uint8_t version = 1; +}; + /** * A single network packet for the compressed video stream. It includes the raw * data along with any block metadata required to reconstruct. The underlying @@ -21,9 +29,10 @@ struct Packet { ftl::codecs::definition_t definition; uint8_t block_total; // Packets expected per frame uint8_t block_number; // This packets number within a frame + uint8_t flags; // Codec dependent flags (eg. I-Frame or P-Frame) std::vector<uint8_t> data; - MSGPACK_DEFINE(codec, definition, block_total, block_number, data); + MSGPACK_DEFINE(codec, definition, block_total, block_number, flags, data); }; /** @@ -33,9 +42,11 @@ struct Packet { */ struct StreamPacket { int64_t timestamp; - uint8_t channel; // first bit = channel, second bit indicates second channel being sent + uint8_t streamID; // Source number... + uint8_t channel_count; // Number of channels to expect (usually 1 or 2) + uint8_t channel; // Actual channel of this current set of packets - MSGPACK_DEFINE(timestamp, channel); + MSGPACK_DEFINE(timestamp, streamID, channel_count, channel); }; } diff --git a/components/codecs/include/ftl/codecs/reader.hpp b/components/codecs/include/ftl/codecs/reader.hpp new file mode 100644 index 0000000000000000000000000000000000000000..949f037dd18ce136317481bd2f657c5be24359f1 --- /dev/null +++ b/components/codecs/include/ftl/codecs/reader.hpp @@ -0,0 +1,58 @@ +#ifndef _FTL_CODECS_READER_HPP_ +#define _FTL_CODECS_READER_HPP_ + +#include <iostream> +#include <msgpack.hpp> +#include <inttypes.h> +#include <functional> + +#include <ftl/codecs/packet.hpp> + +namespace ftl { +namespace codecs { + +class Reader { + public: + Reader(std::istream &); + ~Reader(); + + /** + * Read packets up to and including requested timestamp. A provided callback + * is called for each packet read, in order stored in file. Returns true if + * there are still more packets available beyond specified timestamp, false + * otherwise (end-of-file). Timestamps are in local (clock adjusted) time + * and the timestamps stored in the file are aligned to the time when open + * was called. + */ + bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); + + /** + * An alternative version of read where packet events are generated for + * specific streams, allowing different handlers for different streams. + * This allows demuxing and is used by player sources. Each source can call + * this read, only the first one will generate the data packets. + */ + bool read(int64_t ts); + + void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &); + + bool begin(); + bool end(); + + inline int64_t getStartTime() const { return timestart_; }; + + private: + std::istream *stream_; + msgpack::unpacker buffer_; + std::tuple<StreamPacket,Packet> data_; + bool has_data_; + int64_t timestart_; + bool playing_; + + std::vector<std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)>> handlers_; +}; + +} +} + +#endif // _FTL_CODECS_READER_HPP_ diff --git a/components/codecs/include/ftl/codecs/writer.hpp b/components/codecs/include/ftl/codecs/writer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..abdbdb3db6fafc8d1a65652b44941bb6ce1c44ab --- /dev/null +++ b/components/codecs/include/ftl/codecs/writer.hpp @@ -0,0 +1,31 @@ +#ifndef _FTL_CODECS_WRITER_HPP_ +#define _FTL_CODECS_WRITER_HPP_ + +#include <iostream> +#include <msgpack.hpp> +//#include <Eigen/Eigen> + +#include <ftl/codecs/packet.hpp> + +namespace ftl { +namespace codecs { + +class Writer { + public: + Writer(std::ostream &); + ~Writer(); + + bool begin(); + bool write(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &); + bool end(); + + private: + std::ostream *stream_; + msgpack::sbuffer buffer_; + int64_t timestart_; +}; + +} +} + +#endif // _FTL_CODECS_WRITER_HPP_ diff --git a/components/codecs/src/reader.cpp b/components/codecs/src/reader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5ab809b8fb4beb44a0a41151d551a8cdfe3263d6 --- /dev/null +++ b/components/codecs/src/reader.cpp @@ -0,0 +1,103 @@ +#include <loguru.hpp> +#include <ftl/codecs/reader.hpp> +#include <ftl/timer.hpp> + +#include <tuple> + +using ftl::codecs::Reader; +using ftl::codecs::StreamPacket; +using ftl::codecs::Packet; +using std::get; + +Reader::Reader(std::istream &s) : stream_(&s), has_data_(false), playing_(false) { + +} + +Reader::~Reader() { + +} + +bool Reader::begin() { + ftl::codecs::Header h; + (*stream_).read((char*)&h, sizeof(h)); + if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false; + + // Capture current time to adjust timestamps + timestart_ = ftl::timer::get_time(); + playing_ = true; + + return true; +} + +bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { + if (has_data_ && get<0>(data_).timestamp <= ts) { + f(get<0>(data_), get<1>(data_)); + has_data_ = false; + } else if (has_data_) { + return false; + } + + bool partial = false; + + while (playing_ && stream_->good() || buffer_.nonparsed_size() > 0) { + if (buffer_.nonparsed_size() == 0 || (partial && buffer_.nonparsed_size() < 10000000)) { + buffer_.reserve_buffer(10000000); + stream_->read(buffer_.buffer(), buffer_.buffer_capacity()); + //if (stream_->bad()) return false; + + int bytes = stream_->gcount(); + if (bytes == 0) return false; + buffer_.buffer_consumed(bytes); + partial = false; + } + + msgpack::object_handle msg; + if (!buffer_.next(msg)) { + LOG(INFO) << "NO Message: " << buffer_.nonparsed_size(); + partial = true; + continue; + } + + std::tuple<StreamPacket,Packet> data; + msgpack::object obj = msg.get(); + try { + obj.convert(data); + } catch (std::exception &e) { + LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size(); + //partial = true; + //continue; + return false; + } + + // Adjust timestamp + get<0>(data).timestamp += timestart_; + + if (get<0>(data).timestamp <= ts) { + f(get<0>(data),get<1>(data)); + } else { + data_ = data; + has_data_ = true; + return true; + } + } + + return false; +} + +bool Reader::read(int64_t ts) { + return read(ts, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + if (handlers_.size() > spkt.streamID && (bool)handlers_[spkt.streamID]) { + handlers_[spkt.streamID](spkt, pkt); + } + }); +} + +void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) { + if (streamID >= handlers_.size()) handlers_.resize(streamID+1); + handlers_[streamID] = f; +} + +bool Reader::end() { + playing_ = false; + return true; +} diff --git a/components/codecs/src/writer.cpp b/components/codecs/src/writer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2c19a01fd260e1549b01daef10c5600ea8c4b5a3 --- /dev/null +++ b/components/codecs/src/writer.cpp @@ -0,0 +1,40 @@ +#include <ftl/codecs/writer.hpp> +#include <ftl/timer.hpp> + +#include <tuple> + +using ftl::codecs::Writer; + +Writer::Writer(std::ostream &s) : stream_(&s) {} + +Writer::~Writer() { + +} + +bool Writer::begin() { + ftl::codecs::Header h; + h.version = 0; + (*stream_).write((const char*)&h, sizeof(h)); + + // Capture current time to adjust timestamps + timestart_ = ftl::timer::get_time(); + + return true; +} + +bool Writer::end() { + return true; +} + +bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p) { + ftl::codecs::StreamPacket s2 = s; + // Adjust timestamp relative to start of file. + s2.timestamp -= timestart_; + + auto data = std::make_tuple(s2,p); + msgpack::sbuffer buffer; + msgpack::pack(buffer, data); + (*stream_).write(buffer.data(), buffer.size()); + //buffer_.clear(); + return true; +} diff --git a/components/codecs/test/CMakeLists.txt b/components/codecs/test/CMakeLists.txt index 534336fed6ce5135199105f0784fa260204f64de..89b92059decf25217cb48bbe26a75e1a7a7277a3 100644 --- a/components/codecs/test/CMakeLists.txt +++ b/components/codecs/test/CMakeLists.txt @@ -30,3 +30,17 @@ target_link_libraries(nvpipe_codec_unit add_test(NvPipeCodecUnitTest nvpipe_codec_unit) + +### Reader Writer Unit ################################################################ +add_executable(rw_unit + ./tests.cpp + ../src/writer.cpp + ../src/reader.cpp + ./readwrite_test.cpp +) +target_include_directories(rw_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(rw_unit + Threads::Threads ${OS_LIBS} ${OpenCV_LIBS} ${CUDA_LIBRARIES} ftlcommon Eigen3::Eigen) + + +add_test(RWUnitTest rw_unit) diff --git a/components/codecs/test/readwrite_test.cpp b/components/codecs/test/readwrite_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..03a918ebdf9fe9985e3f35a4e9cd6f1448ab46ef --- /dev/null +++ b/components/codecs/test/readwrite_test.cpp @@ -0,0 +1,263 @@ +#include "catch.hpp" + +#include <ftl/codecs/writer.hpp> +#include <ftl/codecs/reader.hpp> +#include <ftl/timer.hpp> + +#include <sstream> + +using ftl::codecs::Writer; +using ftl::codecs::Reader; +using ftl::codecs::StreamPacket; +using ftl::codecs::Packet; +using ftl::codecs::codec_t; +using ftl::codecs::definition_t; + +TEST_CASE( "Write and read - Single frame" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = ftl::timer::get_time(); + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(ftl::timer::get_time()+10, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 44); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 1 ); + REQUIRE( !res ); +} + +TEST_CASE( "Write and read - Multiple frames" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = ftl::timer::get_time(); + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.timestamp += 50; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + spkt.timestamp += 50; + pkt.data = {66,66,66}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(ftl::timer::get_time()+100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66)); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 3 ); + REQUIRE( !res ); +} + +TEST_CASE( "Write and read - Multiple streams" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = ftl::timer::get_time(); + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.streamID = 1; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n1 = 0; + int n2 = 0; + + Reader r(s); + + r.onPacket(0, [&n1](const StreamPacket &rspkt, const Packet &rpkt) { + ++n1; + REQUIRE(rspkt.streamID == 0); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 44); + }); + + r.onPacket(1, [&n2](const StreamPacket &rspkt, const Packet &rpkt) { + ++n2; + REQUIRE(rspkt.streamID == 1); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 55); + }); + + r.begin(); + bool res = r.read(ftl::timer::get_time()+100); + r.end(); + + REQUIRE( n1 == 1 ); + REQUIRE( n2 == 1 ); + REQUIRE( !res ); +} + +TEST_CASE( "Write and read - Multiple frames with limit" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = ftl::timer::get_time(); + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.timestamp += 50; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + spkt.timestamp += 50; + pkt.data = {66,66,66}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(ftl::timer::get_time()+50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66)); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 2 ); + REQUIRE( res ); +} + +TEST_CASE( "Write and read - Multiple reads" ) { + std::stringstream s; + Writer w(s); + + StreamPacket spkt; + Packet pkt; + + spkt.channel = 0; + spkt.timestamp = ftl::timer::get_time(); + spkt.streamID = 0; + + pkt.codec = codec_t::JSON; + pkt.definition = definition_t::Any; + pkt.block_number = 0; + pkt.block_total = 1; + pkt.flags = 0; + pkt.data = {44,44,44}; + + w.begin(); + w.write(spkt, pkt); + spkt.timestamp += 50; + pkt.data = {55,55,55}; + w.write(spkt, pkt); + spkt.timestamp += 50; + pkt.data = {66,66,66}; + w.write(spkt, pkt); + w.end(); + + REQUIRE( s.str().size() > 0 ); + + int n = 0; + + Reader r(s); + r.begin(); + bool res = r.read(ftl::timer::get_time()+50, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66)); + REQUIRE(rspkt.channel == 0); + }); + + REQUIRE( n == 2 ); + REQUIRE( res ); + + n = 0; + res = r.read(ftl::timer::get_time()+100, [&n](const StreamPacket &rspkt, const Packet &rpkt) { + ++n; + REQUIRE(rpkt.codec == codec_t::JSON); + REQUIRE(rpkt.data.size() == 3); + REQUIRE(rpkt.data[0] == 66 ); + REQUIRE(rspkt.channel == 0); + }); + r.end(); + + REQUIRE( n == 1 ); + REQUIRE( !res ); +} diff --git a/components/rgbd-sources/CMakeLists.txt b/components/rgbd-sources/CMakeLists.txt index 2b056d009a73dd7ee82adaedbf03bb98194d636f..e064f91a259874b68879627e2fbb8ff44b24f2ca 100644 --- a/components/rgbd-sources/CMakeLists.txt +++ b/components/rgbd-sources/CMakeLists.txt @@ -19,6 +19,7 @@ set(RGBDSRC src/abr.cpp src/offilter.cpp src/virtual.cpp + src/file_source.cpp ) if (HAVE_REALSENSE) diff --git a/components/rgbd-sources/include/ftl/rgbd/group.hpp b/components/rgbd-sources/include/ftl/rgbd/group.hpp index 3c7b26e171a3f119f2ca515696ffdd03e1ead54a..fdd7ba7412a2f0d247fb545150cd428ae4519d9f 100644 --- a/components/rgbd-sources/include/ftl/rgbd/group.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/group.hpp @@ -73,12 +73,12 @@ class Group { * There is no guarantee about order or timing and the callback itself will * need to ensure synchronisation of timestamps. */ - void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); + void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); /** * Removes a raw data callback from all sources in the group. */ - void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); + void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &); inline std::vector<Source*> sources() const { return sources_; } @@ -99,6 +99,8 @@ class Group { void stop() {} + int streamID(const ftl::rgbd::Source *s) const; + private: std::vector<FrameSet> framesets_; std::vector<Source*> sources_; diff --git a/components/rgbd-sources/include/ftl/rgbd/source.hpp b/components/rgbd-sources/include/ftl/rgbd/source.hpp index 4c27baf866fc8a80d30f37ec3b794df4e0368916..18413cb8941c56f09f6189895a098a517b276c7a 100644 --- a/components/rgbd-sources/include/ftl/rgbd/source.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/source.hpp @@ -9,9 +9,11 @@ #include <ftl/uri.hpp> #include <ftl/rgbd/detail/source.hpp> #include <ftl/codecs/packet.hpp> +#include <ftl/codecs/reader.hpp> #include <opencv2/opencv.hpp> #include <Eigen/Eigen> #include <string> +#include <map> #include <ftl/cuda_common.hpp> #include <ftl/rgbd/frame.hpp> @@ -244,6 +246,10 @@ class Source : public ftl::Configurable { detail::Source *_createFileImpl(const ftl::URI &uri); detail::Source *_createNetImpl(const ftl::URI &uri); detail::Source *_createDeviceImpl(const ftl::URI &uri); + + static ftl::codecs::Reader *__createReader(const std::string &path); + + static std::map<std::string, ftl::codecs::Reader*> readers__; }; } diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 642add6dc0e39ab496fec560cd3a5b0d77dbfb53..c1a1e4b38a2cdcafea900c2b5728b0c75a9d9b4f 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -54,6 +54,7 @@ struct StreamSource { std::list<detail::StreamClient> clients; SHARED_MUTEX mutex; unsigned long long frame; + int id; ftl::codecs::Encoder *hq_encoder_c1 = nullptr; ftl::codecs::Encoder *hq_encoder_c2 = nullptr; diff --git a/components/rgbd-sources/src/file_source.cpp b/components/rgbd-sources/src/file_source.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a7fe61169c0cdb626341756eaa18d8e2f0126a2d --- /dev/null +++ b/components/rgbd-sources/src/file_source.cpp @@ -0,0 +1,31 @@ +#include "file_source.hpp" + +using ftl::rgbd::detail::FileSource; + +FileSource::FileSource(ftl::rgbd::Source *s, ftl::codecs::Reader *r, int sid) : ftl::rgbd::detail::Source(s) { + reader_ = r; + r->onPacket(sid, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { + LOG(INFO) << "PACKET RECEIVED " << spkt.streamID; + }); +} + +FileSource::~FileSource() { + +} + +bool FileSource::capture(int64_t ts) { + reader_->read(ts); + return true; +} + +bool FileSource::retrieve() { + return true; +} + +bool FileSource::compute(int n, int b) { + return true; +} + +bool FileSource::isReady() { + return true; +} diff --git a/components/rgbd-sources/src/file_source.hpp b/components/rgbd-sources/src/file_source.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f69ec8321e381eb749484beed2c34db5cbba2d74 --- /dev/null +++ b/components/rgbd-sources/src/file_source.hpp @@ -0,0 +1,33 @@ +#pragma once +#ifndef _FTL_RGBD_FILE_SOURCE_HPP_ +#define _FTL_RGBD_FILE_SOURCE_HPP_ + +#include <loguru.hpp> + +#include <ftl/rgbd/source.hpp> +#include <ftl/codecs/reader.hpp> + +namespace ftl { +namespace rgbd { +namespace detail { + +class FileSource : public detail::Source { + public: + FileSource(ftl::rgbd::Source *, ftl::codecs::Reader *, int sid); + ~FileSource(); + + bool capture(int64_t ts); + bool retrieve(); + bool compute(int n, int b); + bool isReady(); + + //void reset(); + private: + ftl::codecs::Reader *reader_; +}; + +} +} +} + +#endif // _FTL_RGBD_FILE_SOURCE_HPP_ diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index 8dec0292a40f7d8224afe2d8e26ca22e0526ea4c..4a85c3d7cee7a9f897d0f9fdb87ceba1fa86e671 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -145,6 +145,13 @@ void Group::_computeJob(ftl::rgbd::Source *src) { } } +int Group::streamID(const ftl::rgbd::Source *s) const { + for (int i=0; i<sources_.size(); ++i) { + if (sources_[i] == s) return i; + } + return -1; +} + void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { if (latency_ == 0) { callback_ = cb; @@ -226,13 +233,13 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { ftl::timer::start(true); } -void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { +void Group::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { for (auto s : sources_) { s->addRawCallback(f); } } -void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { +void Group::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) { for (auto s : sources_) { s->removeRawCallback(f); } diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 416defb9a667c9b47890e15d38e1182965994b4e..8712f48d84498c0075aea717a0a2582d50ec4a70 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -246,7 +246,7 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk host_->notifyRaw(spkt, pkt); const ftl::rgbd::Channel chan = host_->getChannel(); - int rchan = spkt.channel & 0x1; + int rchan = spkt.channel; // & 0x1; NetFrame &frame = queue_.getFrame(spkt.timestamp, cv::Size(params_.width, params_.height), CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3)); @@ -280,7 +280,8 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk // Calculate how many packets to expect for this frame if (frame.chunk_total == 0) { // Getting a second channel first means expect double packets - frame.chunk_total = pkt.block_total * ((spkt.channel >> 1) + 1); + // FIXME: Assumes each packet has same number of blocks! + frame.chunk_total = pkt.block_total * spkt.channel_count; } ++frame.chunk_count; diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 4ec34a5e5685cc76d4356ac3766a5a97fa067679..96335342eb1ba013fbc79acb921df340ab2c1312 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -12,6 +12,8 @@ #include "snapshot_source.hpp" #endif +#include "file_source.hpp" + #ifdef HAVE_REALSENSE #include "realsense_source.hpp" using ftl::rgbd::detail::RealsenseSource; @@ -26,6 +28,9 @@ using ftl::rgbd::detail::ImageSource; using ftl::rgbd::detail::MiddleburySource; using ftl::rgbd::capability_t; using ftl::rgbd::Channel; +using ftl::rgbd::detail::FileSource; + +std::map<std::string, ftl::codecs::Reader*> Source::readers__; Source::Source(ftl::config::json_t &cfg) : Configurable(cfg), pose_(Eigen::Matrix4d::Identity()), net_(nullptr) { impl_ = nullptr; @@ -114,7 +119,10 @@ ftl::rgbd::detail::Source *Source::_createFileImpl(const ftl::URI &uri) { } else if (ftl::is_file(path)) { string ext = path.substr(eix+1); - if (ext == "png" || ext == "jpg") { + if (ext == "ftl") { + ftl::codecs::Reader *reader = __createReader(path); + return new FileSource(this, reader, std::stoi(uri.getFragment())); + } else if (ext == "png" || ext == "jpg") { return new ImageSource(this, path); } else if (ext == "mp4") { return new StereoVideoSource(this, path); @@ -137,6 +145,22 @@ ftl::rgbd::detail::Source *Source::_createFileImpl(const ftl::URI &uri) { return nullptr; } +ftl::codecs::Reader *Source::__createReader(const std::string &path) { + if (readers__.find(path) != readers__.end()) { + return readers__[path]; + } + + std::ifstream *file = new std::ifstream; + file->open(path); + + // FIXME: This is a memory leak, must delete ifstream somewhere. + + auto *r = new ftl::codecs::Reader(*file); + readers__[path] = r; + r->begin(); + return r; +} + ftl::rgbd::detail::Source *Source::_createNetImpl(const ftl::URI &uri) { return new NetSource(this); } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 4290a289027062e19f931cca87f32749b875bedc..9d8f68898dc29dfc2d285a63a1c1daea7e26381e 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -176,7 +176,8 @@ void Streamer::add(Source *src) { void Streamer::add(ftl::rgbd::Group *grp) { auto srcs = grp->sources(); - for (auto src : srcs) { + for (int i=0; i<srcs.size(); ++i) { + auto &src = srcs[i]; { UNIQUE_LOCK(mutex_,ulk); if (sources_.find(src->getID()) != sources_.end()) return; @@ -189,6 +190,7 @@ void Streamer::add(ftl::rgbd::Group *grp) { s->clientCount = 0; s->hq_count = 0; s->lq_count = 0; + s->id = i; sources_[src->getID()] = s; //group_.addSource(src); @@ -422,6 +424,8 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { // Receiver only waits for channel 1 by default // TODO: Each encode could be done in own thread if (hasChan2) { + // TODO: Stagger the reset between nodes... random phasing + if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset(); enc2->encode(fs.frames[j].get<cv::Mat>(fs.sources[j]->getChannel()), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ _transmitPacket(src, blk, 1, hasChan2, Quality::High); }); @@ -429,6 +433,7 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { if (enc2) enc2->reset(); } + // TODO: Stagger the reset between nodes... random phasing if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc1->reset(); enc1->encode(fs.frames[j].get<cv::Mat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ _transmitPacket(src, blk, 0, hasChan2, Quality::High); @@ -528,7 +533,10 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { void Streamer::_transmitPacket(StreamSource *src, const ftl::codecs::Packet &pkt, int chan, bool hasChan2, Quality q) { ftl::codecs::StreamPacket spkt = { frame_no_, - static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0)) + src->id, + (hasChan2) ? 2 : 1, + chan + //static_cast<uint8_t>((chan & 0x1) | ((hasChan2) ? 0x2 : 0x0)) }; _transmitPacket(src, spkt, pkt, q); diff --git a/components/rgbd-sources/test/source_unit.cpp b/components/rgbd-sources/test/source_unit.cpp index dca38be2ffb6e06b8be416c8de71a7a799c77867..ca66273a99f75f58d118d16237f558017911ee20 100644 --- a/components/rgbd-sources/test/source_unit.cpp +++ b/components/rgbd-sources/test/source_unit.cpp @@ -74,6 +74,18 @@ class SnapshotSource : public ftl::rgbd::detail::Source { bool isReady() { return true; }; }; +class FileSource : public ftl::rgbd::detail::Source { + public: + FileSource(ftl::rgbd::Source *host, ftl::codecs::Reader *r, int) : ftl::rgbd::detail::Source(host) { + last_type = "filesource"; + } + + bool capture(int64_t ts) { return true; } + bool retrieve() { return true; } + bool compute(int n, int b) { return true; }; + bool isReady() { return true; }; +}; + class RealsenseSource : public ftl::rgbd::detail::Source { public: explicit RealsenseSource(ftl::rgbd::Source *host) : ftl::rgbd::detail::Source(host) { @@ -112,6 +124,7 @@ class MiddleburySource : public ftl::rgbd::detail::Source { #define _FTL_RGBD_IMAGE_HPP_ #define _FTL_RGBD_REALSENSE_HPP_ #define _FTL_RGBD_MIDDLEBURY_SOURCE_HPP_ +#define _FTL_RGBD_FILE_SOURCE_HPP_ #include "../src/source.cpp"