Skip to content
Snippets Groups Projects
Commit e15e48db authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/196/snapshotcapture' into 'master'

Implements #196 stream capturing

Closes #196

See merge request nicolas.pope/ftl!127
parents bb35a4b5 f4bf85ab
No related branches found
No related tags found
1 merge request!127Implements #196 stream capturing
Pipeline #15316 passed
Showing
with 811 additions and 8 deletions
......@@ -220,6 +220,7 @@ add_subdirectory(components/rgbd-sources)
add_subdirectory(components/control/cpp)
add_subdirectory(applications/calibration)
add_subdirectory(applications/groupview)
add_subdirectory(applications/player)
if (BUILD_RENDERER)
add_subdirectory(components/renderers)
......
set(PLAYERSRC
src/main.cpp
)
add_executable(ftl-player ${PLAYERSRC})
target_include_directories(ftl-player PRIVATE src)
target_link_libraries(ftl-player ftlcommon ftlcodecs ftlrgbd Threads::Threads ${OpenCV_LIBS})
#include <loguru.hpp>
#include <ftl/configuration.hpp>
#include <ftl/codecs/reader.hpp>
#include <ftl/codecs/decoder.hpp>
#include <ftl/codecs/packet.hpp>
#include <ftl/rgbd/camera.hpp>
#include <fstream>
#include <Eigen/Eigen>
using ftl::codecs::codec_t;
static ftl::codecs::Decoder *decoder;
static void createDecoder(const ftl::codecs::Packet &pkt) {
if (decoder) {
if (!decoder->accepts(pkt)) {
ftl::codecs::free(decoder);
} else {
return;
}
}
decoder = ftl::codecs::allocateDecoder(pkt);
}
static void visualizeDepthMap( const cv::Mat &depth, cv::Mat &out,
const float max_depth)
{
depth.convertTo(out, CV_8U, 255.0f / max_depth);
out = 255 - out;
//cv::Mat mask = (depth >= 39.0f); // TODO (mask for invalid pixels)
applyColorMap(out, out, cv::COLORMAP_JET);
//out.setTo(cv::Scalar(255, 255, 255), mask);
}
int main(int argc, char **argv) {
std::string filename(argv[1]);
LOG(INFO) << "Playing: " << filename;
auto root = ftl::configure(argc, argv, "player_default");
std::ifstream f;
f.open(filename);
if (!f.is_open()) LOG(ERROR) << "Could not open file";
ftl::codecs::Reader r(f);
if (!r.begin()) LOG(ERROR) << "Bad ftl file";
LOG(INFO) << "Playing...";
int current_stream = 0;
int current_channel = 0;
bool res = r.read(90000000000000, [&current_stream,&current_channel,&r](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
if (spkt.channel != current_channel) return;
if (spkt.streamID == current_stream) {
if (pkt.codec == codec_t::POSE) {
Eigen::Matrix4d p = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data());
LOG(INFO) << "Have pose: " << p;
return;
}
if (pkt.codec == codec_t::CALIBRATION) {
ftl::rgbd::Camera *camera = (ftl::rgbd::Camera*)pkt.data.data();
LOG(INFO) << "Have calibration: " << camera->fx;
return;
}
LOG(INFO) << "Reading packet: (" << (int)spkt.streamID << "," << (int)spkt.channel << ") " << (int)pkt.codec << ", " << (int)pkt.definition;
cv::Mat frame(cv::Size(ftl::codecs::getWidth(pkt.definition),ftl::codecs::getHeight(pkt.definition)), (spkt.channel == 1) ? CV_32F : CV_8UC3);
createDecoder(pkt);
try {
decoder->decode(pkt, frame);
} catch (std::exception &e) {
LOG(INFO) << "Decoder exception: " << e.what();
}
if (!frame.empty()) {
if (spkt.channel == 1) {
visualizeDepthMap(frame, frame, 8.0f);
}
double time = (double)(spkt.timestamp - r.getStartTime()) / 1000.0;
cv::putText(frame, std::string("Time: ") + std::to_string(time) + std::string("s"), cv::Point(10,20), cv::FONT_HERSHEY_PLAIN, 1, cv::Scalar(0,0,255));
cv::imshow("Player", frame);
}
int key = cv::waitKey(20);
if (key >= 48 && key <= 57) {
current_stream = key - 48;
} else if (key == 'd') {
current_channel = (current_channel == 0) ? 1 : 0;
} else if (key == 27) {
r.end();
}
}
});
if (!res) LOG(ERROR) << "No frames left";
r.end();
ftl::running = false;
return 0;
}
......@@ -15,6 +15,7 @@
#include <ftl/slave.hpp>
#include <ftl/rgbd/group.hpp>
#include <ftl/threads.hpp>
#include <ftl/codecs/writer.hpp>
#include "ilw/ilw.hpp"
#include <ftl/render/splat_render.hpp>
......@@ -63,6 +64,33 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
return rz * rx * ry;
}
static void writeSourceProperties(ftl::codecs::Writer &writer, int id, ftl::rgbd::Source *src) {
ftl::codecs::StreamPacket spkt;
ftl::codecs::Packet pkt;
spkt.timestamp = 0;
spkt.streamID = id;
spkt.channel = 0;
spkt.channel_count = 1;
pkt.codec = ftl::codecs::codec_t::CALIBRATION;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)&src->parameters(), (uint8_t*)&src->parameters() + sizeof(ftl::rgbd::Camera)));
writer.write(spkt, pkt);
pkt.codec = ftl::codecs::codec_t::POSE;
pkt.definition = ftl::codecs::definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = std::move(std::vector<uint8_t>((uint8_t*)src->getPose().data(), (uint8_t*)src->getPose().data() + 4*4*sizeof(double)));
writer.write(spkt, pkt);
}
static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root);
......@@ -159,6 +187,46 @@ static void run(ftl::Configurable *root) {
group.addSource(in);
}
// ---- Recording code -----------------------------------------------------
std::ofstream fileout;
ftl::codecs::Writer writer(fileout);
auto recorder = [&writer,&group](ftl::rgbd::Source *src, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
ftl::codecs::StreamPacket s = spkt;
// Patch stream ID to match order in group
s.streamID = group.streamID(src);
writer.write(s, pkt);
};
root->set("record", false);
// Allow stream recording
root->on("record", [&group,&fileout,&writer,&recorder](const ftl::config::Event &e) {
if (e.entity->value("record", false)) {
char timestamp[18];
std::time_t t=std::time(NULL);
std::strftime(timestamp, sizeof(timestamp), "%F-%H%M%S", std::localtime(&t));
fileout.open(std::string(timestamp) + ".ftl");
writer.begin();
// TODO: Write pose+calibration+config packets
auto sources = group.sources();
for (int i=0; i<sources.size(); ++i) {
writeSourceProperties(writer, i, sources[i]);
}
group.addRawCallback(std::function(recorder));
} else {
group.removeRawCallback(recorder);
writer.end();
fileout.close();
}
});
// -------------------------------------------------------------------------
stream->setLatency(5); // FIXME: This depends on source!?
stream->add(&group);
stream->run();
......
......@@ -5,6 +5,8 @@ set(CODECSRC
src/opencv_encoder.cpp
src/opencv_decoder.cpp
src/generate.cpp
src/writer.cpp
src/reader.cpp
)
if (HAVE_NVPIPE)
......
......@@ -14,7 +14,15 @@ enum struct codec_t : uint8_t {
JPG = 0,
PNG,
H264,
HEVC // H265
HEVC, // H265
// TODO: Add audio codecs
WAV,
JSON = 100, // A JSON string
CALIBRATION, // Camera parameters object
POSE, // 4x4 eigen matrix
RAW // Some unknown binary format (msgpack?)
};
/**
......@@ -29,6 +37,8 @@ enum struct definition_t : uint8_t {
SD480 = 5,
LD360 = 6,
Any = 7
// TODO: Add audio definitions
};
/**
......
......@@ -10,6 +10,14 @@
namespace ftl {
namespace codecs {
/**
* First bytes of our file format.
*/
struct Header {
const char magic[4] = {'F','T','L','F'};
uint8_t version = 1;
};
/**
* A single network packet for the compressed video stream. It includes the raw
* data along with any block metadata required to reconstruct. The underlying
......@@ -21,9 +29,10 @@ struct Packet {
ftl::codecs::definition_t definition;
uint8_t block_total; // Packets expected per frame
uint8_t block_number; // This packets number within a frame
uint8_t flags; // Codec dependent flags (eg. I-Frame or P-Frame)
std::vector<uint8_t> data;
MSGPACK_DEFINE(codec, definition, block_total, block_number, data);
MSGPACK_DEFINE(codec, definition, block_total, block_number, flags, data);
};
/**
......@@ -33,9 +42,11 @@ struct Packet {
*/
struct StreamPacket {
int64_t timestamp;
uint8_t channel; // first bit = channel, second bit indicates second channel being sent
uint8_t streamID; // Source number...
uint8_t channel_count; // Number of channels to expect (usually 1 or 2)
uint8_t channel; // Actual channel of this current set of packets
MSGPACK_DEFINE(timestamp, channel);
MSGPACK_DEFINE(timestamp, streamID, channel_count, channel);
};
}
......
#ifndef _FTL_CODECS_READER_HPP_
#define _FTL_CODECS_READER_HPP_
#include <iostream>
#include <msgpack.hpp>
#include <inttypes.h>
#include <functional>
#include <ftl/codecs/packet.hpp>
namespace ftl {
namespace codecs {
class Reader {
public:
Reader(std::istream &);
~Reader();
/**
* Read packets up to and including requested timestamp. A provided callback
* is called for each packet read, in order stored in file. Returns true if
* there are still more packets available beyond specified timestamp, false
* otherwise (end-of-file). Timestamps are in local (clock adjusted) time
* and the timestamps stored in the file are aligned to the time when open
* was called.
*/
bool read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &);
/**
* An alternative version of read where packet events are generated for
* specific streams, allowing different handlers for different streams.
* This allows demuxing and is used by player sources. Each source can call
* this read, only the first one will generate the data packets.
*/
bool read(int64_t ts);
void onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &);
bool begin();
bool end();
inline int64_t getStartTime() const { return timestart_; };
private:
std::istream *stream_;
msgpack::unpacker buffer_;
std::tuple<StreamPacket,Packet> data_;
bool has_data_;
int64_t timestart_;
bool playing_;
std::vector<std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)>> handlers_;
};
}
}
#endif // _FTL_CODECS_READER_HPP_
#ifndef _FTL_CODECS_WRITER_HPP_
#define _FTL_CODECS_WRITER_HPP_
#include <iostream>
#include <msgpack.hpp>
//#include <Eigen/Eigen>
#include <ftl/codecs/packet.hpp>
namespace ftl {
namespace codecs {
class Writer {
public:
Writer(std::ostream &);
~Writer();
bool begin();
bool write(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &);
bool end();
private:
std::ostream *stream_;
msgpack::sbuffer buffer_;
int64_t timestart_;
};
}
}
#endif // _FTL_CODECS_WRITER_HPP_
#include <loguru.hpp>
#include <ftl/codecs/reader.hpp>
#include <ftl/timer.hpp>
#include <tuple>
using ftl::codecs::Reader;
using ftl::codecs::StreamPacket;
using ftl::codecs::Packet;
using std::get;
Reader::Reader(std::istream &s) : stream_(&s), has_data_(false), playing_(false) {
}
Reader::~Reader() {
}
bool Reader::begin() {
ftl::codecs::Header h;
(*stream_).read((char*)&h, sizeof(h));
if (h.magic[0] != 'F' || h.magic[1] != 'T' || h.magic[2] != 'L' || h.magic[3] != 'F') return false;
// Capture current time to adjust timestamps
timestart_ = ftl::timer::get_time();
playing_ = true;
return true;
}
bool Reader::read(int64_t ts, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) {
if (has_data_ && get<0>(data_).timestamp <= ts) {
f(get<0>(data_), get<1>(data_));
has_data_ = false;
} else if (has_data_) {
return false;
}
bool partial = false;
while (playing_ && stream_->good() || buffer_.nonparsed_size() > 0) {
if (buffer_.nonparsed_size() == 0 || (partial && buffer_.nonparsed_size() < 10000000)) {
buffer_.reserve_buffer(10000000);
stream_->read(buffer_.buffer(), buffer_.buffer_capacity());
//if (stream_->bad()) return false;
int bytes = stream_->gcount();
if (bytes == 0) return false;
buffer_.buffer_consumed(bytes);
partial = false;
}
msgpack::object_handle msg;
if (!buffer_.next(msg)) {
LOG(INFO) << "NO Message: " << buffer_.nonparsed_size();
partial = true;
continue;
}
std::tuple<StreamPacket,Packet> data;
msgpack::object obj = msg.get();
try {
obj.convert(data);
} catch (std::exception &e) {
LOG(INFO) << "Corrupt message: " << buffer_.nonparsed_size();
//partial = true;
//continue;
return false;
}
// Adjust timestamp
get<0>(data).timestamp += timestart_;
if (get<0>(data).timestamp <= ts) {
f(get<0>(data),get<1>(data));
} else {
data_ = data;
has_data_ = true;
return true;
}
}
return false;
}
bool Reader::read(int64_t ts) {
return read(ts, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
if (handlers_.size() > spkt.streamID && (bool)handlers_[spkt.streamID]) {
handlers_[spkt.streamID](spkt, pkt);
}
});
}
void Reader::onPacket(int streamID, const std::function<void(const ftl::codecs::StreamPacket &, const ftl::codecs::Packet &)> &f) {
if (streamID >= handlers_.size()) handlers_.resize(streamID+1);
handlers_[streamID] = f;
}
bool Reader::end() {
playing_ = false;
return true;
}
#include <ftl/codecs/writer.hpp>
#include <ftl/timer.hpp>
#include <tuple>
using ftl::codecs::Writer;
Writer::Writer(std::ostream &s) : stream_(&s) {}
Writer::~Writer() {
}
bool Writer::begin() {
ftl::codecs::Header h;
h.version = 0;
(*stream_).write((const char*)&h, sizeof(h));
// Capture current time to adjust timestamps
timestart_ = ftl::timer::get_time();
return true;
}
bool Writer::end() {
return true;
}
bool Writer::write(const ftl::codecs::StreamPacket &s, const ftl::codecs::Packet &p) {
ftl::codecs::StreamPacket s2 = s;
// Adjust timestamp relative to start of file.
s2.timestamp -= timestart_;
auto data = std::make_tuple(s2,p);
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
(*stream_).write(buffer.data(), buffer.size());
//buffer_.clear();
return true;
}
......@@ -30,3 +30,17 @@ target_link_libraries(nvpipe_codec_unit
add_test(NvPipeCodecUnitTest nvpipe_codec_unit)
### Reader Writer Unit ################################################################
add_executable(rw_unit
./tests.cpp
../src/writer.cpp
../src/reader.cpp
./readwrite_test.cpp
)
target_include_directories(rw_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include")
target_link_libraries(rw_unit
Threads::Threads ${OS_LIBS} ${OpenCV_LIBS} ${CUDA_LIBRARIES} ftlcommon Eigen3::Eigen)
add_test(RWUnitTest rw_unit)
#include "catch.hpp"
#include <ftl/codecs/writer.hpp>
#include <ftl/codecs/reader.hpp>
#include <ftl/timer.hpp>
#include <sstream>
using ftl::codecs::Writer;
using ftl::codecs::Reader;
using ftl::codecs::StreamPacket;
using ftl::codecs::Packet;
using ftl::codecs::codec_t;
using ftl::codecs::definition_t;
TEST_CASE( "Write and read - Single frame" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = ftl::timer::get_time();
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(ftl::timer::get_time()+10, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == 44);
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 1 );
REQUIRE( !res );
}
TEST_CASE( "Write and read - Multiple frames" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = ftl::timer::get_time();
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.timestamp += 50;
pkt.data = {55,55,55};
w.write(spkt, pkt);
spkt.timestamp += 50;
pkt.data = {66,66,66};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(ftl::timer::get_time()+100, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 3 );
REQUIRE( !res );
}
TEST_CASE( "Write and read - Multiple streams" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = ftl::timer::get_time();
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.streamID = 1;
pkt.data = {55,55,55};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n1 = 0;
int n2 = 0;
Reader r(s);
r.onPacket(0, [&n1](const StreamPacket &rspkt, const Packet &rpkt) {
++n1;
REQUIRE(rspkt.streamID == 0);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == 44);
});
r.onPacket(1, [&n2](const StreamPacket &rspkt, const Packet &rpkt) {
++n2;
REQUIRE(rspkt.streamID == 1);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == 55);
});
r.begin();
bool res = r.read(ftl::timer::get_time()+100);
r.end();
REQUIRE( n1 == 1 );
REQUIRE( n2 == 1 );
REQUIRE( !res );
}
TEST_CASE( "Write and read - Multiple frames with limit" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = ftl::timer::get_time();
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.timestamp += 50;
pkt.data = {55,55,55};
w.write(spkt, pkt);
spkt.timestamp += 50;
pkt.data = {66,66,66};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(ftl::timer::get_time()+50, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 2 );
REQUIRE( res );
}
TEST_CASE( "Write and read - Multiple reads" ) {
std::stringstream s;
Writer w(s);
StreamPacket spkt;
Packet pkt;
spkt.channel = 0;
spkt.timestamp = ftl::timer::get_time();
spkt.streamID = 0;
pkt.codec = codec_t::JSON;
pkt.definition = definition_t::Any;
pkt.block_number = 0;
pkt.block_total = 1;
pkt.flags = 0;
pkt.data = {44,44,44};
w.begin();
w.write(spkt, pkt);
spkt.timestamp += 50;
pkt.data = {55,55,55};
w.write(spkt, pkt);
spkt.timestamp += 50;
pkt.data = {66,66,66};
w.write(spkt, pkt);
w.end();
REQUIRE( s.str().size() > 0 );
int n = 0;
Reader r(s);
r.begin();
bool res = r.read(ftl::timer::get_time()+50, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == ((n == 1) ? 44 : (n == 2) ? 55 : 66));
REQUIRE(rspkt.channel == 0);
});
REQUIRE( n == 2 );
REQUIRE( res );
n = 0;
res = r.read(ftl::timer::get_time()+100, [&n](const StreamPacket &rspkt, const Packet &rpkt) {
++n;
REQUIRE(rpkt.codec == codec_t::JSON);
REQUIRE(rpkt.data.size() == 3);
REQUIRE(rpkt.data[0] == 66 );
REQUIRE(rspkt.channel == 0);
});
r.end();
REQUIRE( n == 1 );
REQUIRE( !res );
}
......@@ -19,6 +19,7 @@ set(RGBDSRC
src/abr.cpp
src/offilter.cpp
src/virtual.cpp
src/file_source.cpp
)
if (HAVE_REALSENSE)
......
......@@ -73,12 +73,12 @@ class Group {
* There is no guarantee about order or timing and the callback itself will
* need to ensure synchronisation of timestamps.
*/
void addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
void addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
/**
* Removes a raw data callback from all sources in the group.
*/
void removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
void removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &);
inline std::vector<Source*> sources() const { return sources_; }
......@@ -99,6 +99,8 @@ class Group {
void stop() {}
int streamID(const ftl::rgbd::Source *s) const;
private:
std::vector<FrameSet> framesets_;
std::vector<Source*> sources_;
......
......@@ -9,9 +9,11 @@
#include <ftl/uri.hpp>
#include <ftl/rgbd/detail/source.hpp>
#include <ftl/codecs/packet.hpp>
#include <ftl/codecs/reader.hpp>
#include <opencv2/opencv.hpp>
#include <Eigen/Eigen>
#include <string>
#include <map>
#include <ftl/cuda_common.hpp>
#include <ftl/rgbd/frame.hpp>
......@@ -244,6 +246,10 @@ class Source : public ftl::Configurable {
detail::Source *_createFileImpl(const ftl::URI &uri);
detail::Source *_createNetImpl(const ftl::URI &uri);
detail::Source *_createDeviceImpl(const ftl::URI &uri);
static ftl::codecs::Reader *__createReader(const std::string &path);
static std::map<std::string, ftl::codecs::Reader*> readers__;
};
}
......
......@@ -54,6 +54,7 @@ struct StreamSource {
std::list<detail::StreamClient> clients;
SHARED_MUTEX mutex;
unsigned long long frame;
int id;
ftl::codecs::Encoder *hq_encoder_c1 = nullptr;
ftl::codecs::Encoder *hq_encoder_c2 = nullptr;
......
#include "file_source.hpp"
using ftl::rgbd::detail::FileSource;
FileSource::FileSource(ftl::rgbd::Source *s, ftl::codecs::Reader *r, int sid) : ftl::rgbd::detail::Source(s) {
reader_ = r;
r->onPacket(sid, [this](const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
LOG(INFO) << "PACKET RECEIVED " << spkt.streamID;
});
}
FileSource::~FileSource() {
}
bool FileSource::capture(int64_t ts) {
reader_->read(ts);
return true;
}
bool FileSource::retrieve() {
return true;
}
bool FileSource::compute(int n, int b) {
return true;
}
bool FileSource::isReady() {
return true;
}
#pragma once
#ifndef _FTL_RGBD_FILE_SOURCE_HPP_
#define _FTL_RGBD_FILE_SOURCE_HPP_
#include <loguru.hpp>
#include <ftl/rgbd/source.hpp>
#include <ftl/codecs/reader.hpp>
namespace ftl {
namespace rgbd {
namespace detail {
class FileSource : public detail::Source {
public:
FileSource(ftl::rgbd::Source *, ftl::codecs::Reader *, int sid);
~FileSource();
bool capture(int64_t ts);
bool retrieve();
bool compute(int n, int b);
bool isReady();
//void reset();
private:
ftl::codecs::Reader *reader_;
};
}
}
}
#endif // _FTL_RGBD_FILE_SOURCE_HPP_
......@@ -145,6 +145,13 @@ void Group::_computeJob(ftl::rgbd::Source *src) {
}
}
int Group::streamID(const ftl::rgbd::Source *s) const {
for (int i=0; i<sources_.size(); ++i) {
if (sources_[i] == s) return i;
}
return -1;
}
void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
if (latency_ == 0) {
callback_ = cb;
......@@ -226,13 +233,13 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) {
ftl::timer::start(true);
}
void Group::addRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
void Group::addRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->addRawCallback(f);
}
}
void Group::removeRawCallback(std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
void Group::removeRawCallback(const std::function<void(ftl::rgbd::Source*, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt)> &f) {
for (auto s : sources_) {
s->removeRawCallback(f);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment