diff --git a/components/streams/include/ftl/streams/builder.hpp b/components/streams/include/ftl/streams/builder.hpp index cc4046484b2dfd612df7a220781703b83b1b98fc..3ed4eaf3f9b1dd90d7aa2fcc42056086f30f5585 100644 --- a/components/streams/include/ftl/streams/builder.hpp +++ b/components/streams/include/ftl/streams/builder.hpp @@ -21,7 +21,7 @@ namespace streams { */ class Builder { public: - Builder(ftl::data::Pool &pool, int id); + Builder(ftl::data::Pool *pool, int id); Builder(); ~Builder(); diff --git a/components/streams/src/builder.cpp b/components/streams/src/builder.cpp index eb2b5ca27901e3f84f31869b1026b7e4e04dc6ea..b5b8b134387dcd35f585376169aec8ff6fec8e4e 100644 --- a/components/streams/src/builder.cpp +++ b/components/streams/src/builder.cpp @@ -80,7 +80,7 @@ ftl::data::FrameSet *Builder::_get(int64_t timestamp) { } ftl::data::Frame &Builder::get(int64_t timestamp, size_t ix) { - if (timestamp <= 0 || ix >= 32) throw FTL_Error("Invalid frame timestamp or index"); + if (timestamp <= 0 || ix >= 32) throw FTL_Error("Invalid frame timestamp or index (" << timestamp << ", " << ix << ")"); UNIQUE_LOCK(mutex_, lk); @@ -90,9 +90,12 @@ ftl::data::Frame &Builder::get(int64_t timestamp, size_t ix) { auto *fs = _get(timestamp); - if (ix >= fs->frames.size()) { - throw FTL_Error("Frame index out-of-bounds"); - } + //if (ix >= fs->frames.size()) { + //throw FTL_Error("Frame index out-of-bounds - " << ix << "(" << fs->frames.size() << ")"); + while (fs->frames.size() < size_) { + fs->frames.push_back(std::move(pool_->allocate((fs->frameset() << 8) + fs->frames.size(), fs->timestamp()))); + } + //} //if (fs->frames.size() < size_) fs->frames.resize(size_); @@ -125,6 +128,8 @@ void Builder::completed(int64_t ts, size_t ix) { ++fs->count; } + LOG(INFO) << "COMPLETE FRAME : " << fs->timestamp() << " " << fs->count << "(" << size_ << ")"; + // No buffering, so do a schedule here for immediate effect if (bufferSize_ == 0 && !fs->test(ftl::data::FSFlag::STALE) && static_cast<unsigned int>(fs->count) >= size_) { UNIQUE_LOCK(mutex_, lk); @@ -165,6 +170,8 @@ void Builder::_schedule() { // Calling onFrameset but without all frames so mark as partial if (static_cast<size_t>(fs->count) < fs->frames.size()) fs->set(ftl::data::FSFlag::PARTIAL); + for (auto &f : fs->frames) f.store(); + try { cb_.trigger(*fs); } catch(const ftl::exception &e) { diff --git a/components/streams/src/receiver.cpp b/components/streams/src/receiver.cpp index 73367067a19bab7b858fcd11ff2660edc7087f0a..296481ce7fc98ea8b6a5c4e5acf1feaf999691a0 100644 --- a/components/streams/src/receiver.cpp +++ b/components/streams/src/receiver.cpp @@ -144,6 +144,13 @@ void Receiver::_processState(const StreamPacket &spkt, const Packet &pkt) { for (int i=0; i<pkt.frame_count; ++i) { InternalVideoStates &frame = _getVideoFrame(spkt,i); + LOG(INFO) << "GOT STATE " << (int)spkt.channel; + + if (spkt.channel == Channel::Calibration) { + auto &f = builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number); + f.createChange<ftl::rgbd::Camera>(Channel::Calibration, ftl::data::ChangeType::FOREIGN) = parseCalibration(pkt); + } + // Deal with the special channels... /*switch (spkt.channel) { case Channel::Configuration : ftl::config::parseJSON(frame.state.getConfig(), parseConfig(pkt)); break; @@ -241,18 +248,18 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { auto [tx,ty] = ftl::codecs::chooseTileConfig(pkt.frame_count); // Get the frameset - builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count); // TODO: This is a hack + builder(spkt.streamID).get(spkt.timestamp, spkt.frame_number+pkt.frame_count-1); // TODO: This is a hack auto *fs = builder(spkt.streamID).get(spkt.timestamp); - const auto &calibration = fs->frames[0].get<ftl::rgbd::Camera>(Channel::Calibration); - int width = calibration.width; - int height = calibration.height; - - if (width == 0 || height == 0) { + if (!fs->frames[0].has(Channel::Calibration)) { LOG(WARNING) << "No calibration, skipping frame"; return; } + const auto &calibration = fs->frames[0].get<ftl::rgbd::Camera>(Channel::Calibration); + int width = calibration.width; + int height = calibration.height; + //LOG(INFO) << " CODEC = " << (int)pkt.codec << " " << (int)pkt.flags << " " << (int)spkt.channel; //LOG(INFO) << "Decode surface: " << (width*tx) << "x" << (height*ty); @@ -323,11 +330,12 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { // Add channel to frame and allocate memory if required const cv::Size size = cv::Size(width, height); - frame.create<cv::cuda::GpuMat>(spkt.channel).create(size, ftl::codecs::type(spkt.channel)); //(isFloatChannel(rchan) ? CV_32FC1 : CV_8UC4)); + auto &buf = frame.createChange<ftl::rgbd::VideoFrame>(spkt.channel, ftl::data::ChangeType::FOREIGN).createGPU(); + buf.create(size, ftl::codecs::type(spkt.channel)); //(isFloatChannel(rchan) ? CV_32FC1 : CV_8UC4)); cv::Rect roi((i % tx)*width, (i / tx)*height, width, height); cv::cuda::GpuMat sroi = surface(roi); - sroi.copyTo(frame.set<cv::cuda::GpuMat>(spkt.channel), cvstream); + sroi.copyTo(buf, cvstream); } // Must ensure all processing is finished before completing a frame. @@ -341,7 +349,7 @@ void Receiver::_processVideo(const StreamPacket &spkt, const Packet &pkt) { const auto *cs = stream_; auto sel = stream_->selected(spkt.frameSetID()) & cs->available(spkt.frameSetID()); - frame.create<cv::cuda::GpuMat>(spkt.channel); + //frame.create<cv::cuda::GpuMat>(spkt.channel); if (i == 0) { Packet tmppkt = pkt; diff --git a/components/streams/test/CMakeLists.txt b/components/streams/test/CMakeLists.txt index 678651d9fb8b17f13eccb44bf3a66cd15a7889b2..fb22160dee3f9074bb7afc33cc48a6e60a21a3a8 100644 --- a/components/streams/test/CMakeLists.txt +++ b/components/streams/test/CMakeLists.txt @@ -51,19 +51,20 @@ add_test(FileStreamUnitTest filestream_unit) #add_test(SenderUnitTest sender_unit) ### Receiver Unit ############################################################## -#add_executable(receiver_unit -#$<TARGET_OBJECTS:CatchTest> -# ./receiver_unit.cpp -# ../src/receiver.cpp -# ../src/stream.cpp -# ../src/injectors.cpp -# ../src/parsers.cpp -#) -#target_include_directories(receiver_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") -#target_link_libraries(receiver_unit -# ftlcommon ftlcodecs ftlrgbd ftlaudio) +add_executable(receiver_unit +$<TARGET_OBJECTS:CatchTest> + ./receiver_unit.cpp + ../src/receiver.cpp + ../src/stream.cpp + ../src/injectors.cpp + ../src/parsers.cpp + ../src/builder.cpp +) +target_include_directories(receiver_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(receiver_unit + ftlcommon ftlcodecs ftlrgbd ftlaudio) -#add_test(ReceiverUnitTest receiver_unit) +add_test(ReceiverUnitTest receiver_unit) ### Builder Unit ############################################################### add_executable(builder_unit diff --git a/components/streams/test/receiver_unit.cpp b/components/streams/test/receiver_unit.cpp index 0ed51f850dd1028dd57e11c0b89876eb53cbb08c..45160cae2aad57fea23fac288007b70e6cdd4c4f 100644 --- a/components/streams/test/receiver_unit.cpp +++ b/components/streams/test/receiver_unit.cpp @@ -3,9 +3,12 @@ #include <ftl/streams/receiver.hpp> #include <ftl/codecs/nvidia_encoder.hpp> #include <ftl/streams/injectors.hpp> +#include <ftl/rgbd/frame.hpp> #include <nlohmann/json.hpp> +#include <loguru.hpp> + using ftl::codecs::definition_t; using ftl::codecs::codec_t; using ftl::stream::Receiver; @@ -50,13 +53,16 @@ class TestStream : public ftl::stream::Stream { TEST_CASE( "Receiver generating onFrameSet" ) { + ftl::data::make_channel<ftl::rgbd::Camera>(Channel::Calibration, "calibration", ftl::data::StorageMode::PERSISTENT); json_t global = json_t{{"$id","ftl://test"}}; ftl::config::configure(global); + ftl::data::Pool pool(5,7); + json_t cfg = json_t{ {"$id","ftl://test/1"} }; - auto *receiver = ftl::create<Receiver>(cfg); + auto *receiver = ftl::create<Receiver>(cfg, &pool); json_t cfg2 = json_t{ {"$id","ftl://test/2"} @@ -80,12 +86,13 @@ TEST_CASE( "Receiver generating onFrameSet" ) { spkt.channel = Channel::Colour; spkt.streamID = 0; - ftl::rgbd::Frame dummy; - ftl::rgbd::FrameState state; - state.getLeft().width = 1280; - state.getLeft().height = 720; - dummy.setOrigin(&state); - ftl::stream::injectCalibration(&stream, dummy, 0, 0, 0); + ftl::data::Frame dummy = pool.allocate(0,10); + dummy.store(); + ftl::rgbd::Frame &state = dummy.cast<ftl::rgbd::Frame>(); + state.setLeft().width = 1280; + state.setLeft().height = 720; + + ftl::stream::injectCalibration(&stream, state, 10, 0, 0); ftl::timer::start(false); @@ -98,10 +105,10 @@ TEST_CASE( "Receiver generating onFrameSet" ) { stream.post(spkt, pkt); int count = 0; - receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { + auto h = receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { ++count; - REQUIRE( fs.timestamp == 10 ); + REQUIRE( fs.timestamp() == 10 ); REQUIRE( fs.frames.size() == 1 ); REQUIRE( fs.frames[0].hasChannel(Channel::Colour) ); REQUIRE( fs.frames[0].get<cv::cuda::GpuMat>(Channel::Colour).rows == 720 ); @@ -118,7 +125,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { SECTION("multi-frameset") { cv::cuda::GpuMat m(cv::Size(1280,720), CV_8UC4, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 1, 1, 0); + ftl::stream::injectCalibration(&stream, state, 10, 1, 0); bool r = encoder.encode(m, pkt); REQUIRE( r ); @@ -126,8 +133,8 @@ TEST_CASE( "Receiver generating onFrameSet" ) { stream.post(spkt, pkt); std::atomic<int> mask = 0; - receiver->onFrameSet([&mask](ftl::rgbd::FrameSet &fs) { - mask |= 1 << fs.id; + auto h = receiver->onFrameSet([&mask](ftl::rgbd::FrameSet &fs) { + mask |= 1 << fs.frameset(); return true; }); @@ -142,7 +149,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { SECTION("a tiled colour frame") { cv::cuda::GpuMat m(cv::Size(2560,720), CV_8UC4, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 0, 0, 1); + ftl::stream::injectCalibration(&stream, state, 10, 0, 1); pkt.frame_count = 2; bool r = encoder.encode(m, pkt); @@ -151,10 +158,10 @@ TEST_CASE( "Receiver generating onFrameSet" ) { stream.post(spkt, pkt); int count = 0; - receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { + auto h = receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { ++count; - REQUIRE( fs.timestamp == 10 ); + REQUIRE( fs.timestamp() == 10 ); REQUIRE( fs.frames.size() == 2 ); REQUIRE( fs.frames[0].hasChannel(Channel::Colour) ); REQUIRE( fs.frames[0].get<cv::cuda::GpuMat>(Channel::Colour).rows == 720 ); @@ -174,7 +181,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { SECTION("a tiled lossy depth frame") { cv::cuda::GpuMat m(cv::Size(2560,720), CV_32F, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 0, 0, 1); + ftl::stream::injectCalibration(&stream, state, 10, 0, 1); spkt.channel = Channel::Depth; pkt.frame_count = 2; @@ -185,10 +192,10 @@ TEST_CASE( "Receiver generating onFrameSet" ) { stream.post(spkt, pkt); int count = 0; - receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { + auto h = receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { ++count; - REQUIRE( fs.timestamp == 10 ); + REQUIRE( fs.timestamp() == 10 ); REQUIRE( fs.frames.size() == 2 ); REQUIRE( fs.frames[0].hasChannel(Channel::Depth) ); REQUIRE( fs.frames[0].get<cv::cuda::GpuMat>(Channel::Depth).rows == 720 ); @@ -208,7 +215,7 @@ TEST_CASE( "Receiver generating onFrameSet" ) { SECTION("a tiled lossless depth frame") { cv::cuda::GpuMat m(cv::Size(2560,720), CV_32F, cv::Scalar(0)); - ftl::stream::injectCalibration(&stream, dummy, 0, 0, 1); + ftl::stream::injectCalibration(&stream, state, 10, 0, 1); spkt.channel = Channel::Depth; pkt.frame_count = 2; @@ -220,10 +227,10 @@ TEST_CASE( "Receiver generating onFrameSet" ) { stream.post(spkt, pkt); int count = 0; - receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { + auto h = receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { ++count; - REQUIRE( fs.timestamp == 10 ); + REQUIRE( fs.timestamp() == 10 ); REQUIRE( fs.frames.size() == 2 ); REQUIRE( fs.frames[0].hasChannel(Channel::Depth) ); REQUIRE( fs.frames[0].get<cv::cuda::GpuMat>(Channel::Depth).rows == 720 ); @@ -245,16 +252,20 @@ TEST_CASE( "Receiver generating onFrameSet" ) { //while (ftl::pool.n_idle() != ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); delete receiver; //ftl::config::cleanup(); + ftl::data::clearRegistry(); } TEST_CASE( "Receiver sync bugs" ) { + ftl::data::make_channel<ftl::rgbd::Camera>(Channel::Calibration, "calibration", ftl::data::StorageMode::PERSISTENT); json_t global = json_t{{"$id","ftl://test"}}; ftl::config::configure(global); + ftl::data::Pool pool(5,7); + json_t cfg = json_t{ {"$id","ftl://test/1"} }; - auto *receiver = ftl::create<Receiver>(cfg); + auto *receiver = ftl::create<Receiver>(cfg, &pool); json_t cfg2 = json_t{ {"$id","ftl://test/2"} @@ -278,12 +289,13 @@ TEST_CASE( "Receiver sync bugs" ) { spkt.channel = Channel::Colour; spkt.streamID = 0; - ftl::rgbd::Frame dummy; - ftl::rgbd::FrameState state; - state.getLeft().width = 1280; - state.getLeft().height = 720; - dummy.setOrigin(&state); - ftl::stream::injectCalibration(&stream, dummy, 0, 0, 0); + ftl::data::Frame dummy = pool.allocate(0,10); + dummy.store(); + ftl::rgbd::Frame &state = dummy.cast<ftl::rgbd::Frame>(); + state.setLeft().width = 1280; + state.setLeft().height = 720; + + ftl::stream::injectCalibration(&stream, state, 10, 0, 0); ftl::timer::start(false); @@ -298,10 +310,10 @@ TEST_CASE( "Receiver sync bugs" ) { int count = 0; int64_t ts = 0; bool haswrongchan = false; - receiver->onFrameSet([&count,&ts,&haswrongchan](ftl::rgbd::FrameSet &fs) { + auto h = receiver->onFrameSet([&count,&ts,&haswrongchan](ftl::rgbd::FrameSet &fs) { ++count; - ts = fs.timestamp; + ts = fs.timestamp(); haswrongchan = fs.frames[0].hasChannel(Channel::ColourHighRes); return true; @@ -329,16 +341,20 @@ TEST_CASE( "Receiver sync bugs" ) { ftl::timer::stop(true); //while (ftl::pool.n_idle() != ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); delete receiver; + ftl::data::clearRegistry(); } TEST_CASE( "Receiver non zero buffer" ) { + ftl::data::make_channel<ftl::rgbd::Camera>(Channel::Calibration, "calibration", ftl::data::StorageMode::PERSISTENT); json_t global = json_t{{"$id","ftl://test"}}; ftl::config::configure(global); + ftl::data::Pool pool(5,7); + json_t cfg = json_t{ {"$id","ftl://test/1"} }; - auto *receiver = ftl::create<Receiver>(cfg); + auto *receiver = ftl::create<Receiver>(cfg, &pool); json_t cfg2 = json_t{ {"$id","ftl://test/2"} @@ -362,12 +378,12 @@ TEST_CASE( "Receiver non zero buffer" ) { spkt.channel = Channel::Colour; spkt.streamID = 0; - ftl::rgbd::Frame dummy; - ftl::rgbd::FrameState state; - state.getLeft().width = 1280; - state.getLeft().height = 720; - dummy.setOrigin(&state); - ftl::stream::injectCalibration(&stream, dummy, 0, 0, 0); + ftl::data::Frame dummy = pool.allocate(0,10); + dummy.store(); + ftl::rgbd::Frame &state = dummy.cast<ftl::rgbd::Frame>(); + state.setLeft().width = 1280; + state.setLeft().height = 720; + ftl::stream::injectCalibration(&stream, state, 10, 0, 0); ftl::timer::start(false); @@ -378,10 +394,10 @@ TEST_CASE( "Receiver non zero buffer" ) { REQUIRE( r ); int count = 0; - receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { + auto h = receiver->onFrameSet([&count](ftl::rgbd::FrameSet &fs) { ++count; - REQUIRE( fs.timestamp == 10 ); + REQUIRE( fs.timestamp() == 10 ); REQUIRE( fs.frames.size() == 1 ); REQUIRE( fs.frames[0].hasChannel(Channel::Colour) ); REQUIRE( fs.frames[0].get<cv::cuda::GpuMat>(Channel::Colour).rows == 720 ); @@ -403,4 +419,5 @@ TEST_CASE( "Receiver non zero buffer" ) { ftl::timer::stop(true); //while (ftl::pool.n_idle() != ftl::pool.size()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); delete receiver; + ftl::data::clearRegistry(); } diff --git a/components/structures/src/new_frame.cpp b/components/structures/src/new_frame.cpp index 5d5a934764ef0beee7287e990b35f1c207ebe55e..7ed41592af23f8eb3b53f9c54687dc6070edda05 100644 --- a/components/structures/src/new_frame.cpp +++ b/components/structures/src/new_frame.cpp @@ -16,6 +16,9 @@ static std::unordered_map<ftl::codecs::Channel, ChannelConfig> reg_channels; void ftl::data::registerChannel(ftl::codecs::Channel c, const ChannelConfig &config) { auto i = reg_channels.find(c); if (i != reg_channels.end()) { + if (i->second.mode == config.mode && i->second.type_id == config.type_id && i->second.name == config.name) { + return; + } throw FTL_Error("Channel " << static_cast<unsigned int>(c) << " already registered"); }