diff --git a/applications/gui/src/ctrl_window.cpp b/applications/gui/src/ctrl_window.cpp index fc82c45838a568efc8b4e8d456cdb3fbbef2c8b4..3681022bd10c832ea2d722b1eb0bd02877b1af4a 100644 --- a/applications/gui/src/ctrl_window.cpp +++ b/applications/gui/src/ctrl_window.cpp @@ -121,7 +121,7 @@ void ControlWindow::_addNode() { } void ControlWindow::_updateDetails() { - node_details_ = ctrl_->getSlaves(); + node_details_ = ctrl_->getControllers(); node_titles_.clear(); for (auto &d : node_details_) { diff --git a/applications/gui/src/main.cpp b/applications/gui/src/main.cpp index 5ebc552fa0ee0664c7a0b7898a723b90bf87f29c..99af829eb646f20f1e0f680837e8855739fa6be1 100644 --- a/applications/gui/src/main.cpp +++ b/applications/gui/src/main.cpp @@ -23,32 +23,6 @@ int main(int argc, char **argv) { } }); - std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables; - - // FIXME: Move this elsewhere, it is not just for GUI - net->onConnect([&controller, &peerConfigurables](ftl::net::Peer *p) { - ftl::UUID peer = p->id(); - auto cs = controller->getConfigurables(peer); - for (auto c : cs) { - //LOG(INFO) << "NET CONFIG: " << c; - ftl::config::json_t *configuration = new ftl::config::json_t; - *configuration = controller->get(peer, c); - if (!configuration->empty()) { - ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *controller, *configuration); - peerConfigurables[peer].push_back(nc); - } - } - }); - - net->onDisconnect([&peerConfigurables](ftl::net::Peer *p) { - ftl::UUID peer = p->id(); - for (ftl::NetConfigurable *nc : peerConfigurables[peer]) { - ftl::config::json_t *configuration = &(nc->getConfig()); - delete nc; - delete configuration; - } - }); - net->start(); net->waitConnections(); diff --git a/applications/gui/src/screen.cpp b/applications/gui/src/screen.cpp index d0d9944214343699bd04230b07da47799f1f415f..77c4e4318e61e2916fb47ba1f4f7a5ec08760e3b 100644 --- a/applications/gui/src/screen.cpp +++ b/applications/gui/src/screen.cpp @@ -244,7 +244,7 @@ ftl::gui::Screen::Screen(ftl::Configurable *proot, ftl::net::Universe *pnet, ftl //net_->onConnect([this,popup](ftl::net::Peer *p) { { LOG(INFO) << "NET CONNECT"; - auto node_details = ctrl_->getSlaves(); + auto node_details = ctrl_->getControllers(); for (auto &d : node_details) { LOG(INFO) << "ADDING TITLE: " << d.dump(); diff --git a/applications/player/src/main.cpp b/applications/player/src/main.cpp index 33e146880668fb29224aaffc8942cb4255ebf0c1..2741cac2ef832f3cc60073f7320d443ea9620c9d 100644 --- a/applications/player/src/main.cpp +++ b/applications/player/src/main.cpp @@ -40,6 +40,21 @@ static void visualizeDepthMap( const cv::Mat &depth, cv::Mat &out, //out.setTo(cv::Scalar(255, 255, 255), mask); } +static std::string nameForCodec(ftl::codecs::codec_t c) { + switch(c) { + case codec_t::JPG : return "JPEG"; + case codec_t::PNG : return "PNG"; + case codec_t::H264 : return "H264"; + case codec_t::HEVC : return "HEVC"; + case codec_t::JSON : return "JSON"; + case codec_t::POSE : return "POSE"; + case codec_t::RAW : return "RAW"; + case codec_t::CALIBRATION : return "CALIBRATION"; + case codec_t::MSGPACK : return "MSGPACK"; + default: return std::string("UNKNOWN (") + std::to_string((int)c) + std::string(")"); + } +} + int main(int argc, char **argv) { std::string filename(argv[1]); LOG(INFO) << "Playing: " << filename; @@ -73,6 +88,11 @@ int main(int argc, char **argv) { if (!(channel_mask[spkt.streamID][(int)spkt.channel])) { channel_mask[spkt.streamID].set((int)spkt.channel); LOG(INFO) << " - Channel " << (int)spkt.channel << " found (" << (int)spkt.streamID << ")"; + LOG(INFO) << " - Codec = " << nameForCodec(pkt.codec); + LOG(INFO) << " - Width = " << ftl::codecs::getWidth(pkt.definition); + LOG(INFO) << " - Height = " << ftl::codecs::getHeight(pkt.definition); + LOG(INFO) << " - Start Time = " << float(spkt.timestamp - r.getStartTime()) / 1000.0f << "(s)"; + LOG(INFO) << " - Blocks = " << (int)pkt.block_total; } if (spkt.streamID == current_stream) { diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index dac32881b73df64b9c02d8803c4b6eba6e763871..9cb5c0819d144c66cc49f125adbe64dc360deed5 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -12,7 +12,7 @@ #include <ftl/rgbd.hpp> #include <ftl/rgbd/virtual.hpp> #include <ftl/rgbd/streamer.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> #include <ftl/rgbd/group.hpp> #include <ftl/threads.hpp> #include <ftl/codecs/writer.hpp> @@ -82,58 +82,9 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { return rz * rx * ry; } -// TODO: * Remove this class (requires more general solution). Also does -// not process disconnections/reconnections/types etc. correctly. -// * Update when new options become available. - -class ConfigProxy { - private: - vector<ftl::UUID> peers_; - vector<std::string> uris_; - ftl::net::Universe *net_; - - public: - ConfigProxy(ftl::net::Universe *net) { - net_ = net; - - auto response = net_->findAll<std::string>("node_details"); - for (auto &r : response) { - auto r_json = json_t::parse(r); - peers_.push_back(ftl::UUID(r_json["id"].get<std::string>())); - uris_.push_back(r_json["title"].get<std::string>()); - } - } - - void add(ftl::Configurable *root, const std::string &uri, const std::string &name) { - auto config = json_t::parse(net_->call<string>(peers_[0], "get_cfg", uris_[0] + "/" + uri)); - auto *proxy = ftl::create<ftl::Configurable>(root, name); - - try { - for (auto &itm : config.get<json::object_t>()) { - auto key = itm.first; - auto value = itm.second; - if (*key.begin() == '$') { continue; } - - proxy->set(key, value); - proxy->on(key, [this, uri, key, value, proxy](const ftl::config::Event&) { - for (size_t i = 0; i < uris_.size(); i++) { - // TODO: check that config exists? - auto peer = peers_[i]; - std::string name = uris_[i] + "/" + uri + "/" + key; - net_->send(peer, "update_cfg", name, proxy->getConfig()[key].dump()); - } - }); - } - } - catch (nlohmann::detail::type_error) { - LOG(ERROR) << "Failed to add config proxy for: " << uri << "/" << name; - } - } -}; - static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - ftl::ctrl::Slave slave(net, root); + ftl::ctrl::Master ctrl(root, net); // Controls auto *controls = ftl::create<ftl::Configurable>(root, "controls"); @@ -192,17 +143,6 @@ static void run(ftl::Configurable *root) { return; } - ConfigProxy *configproxy = nullptr; - if (net->numberOfPeers() > 0) { - configproxy = new ConfigProxy(net); // TODO delete - auto *disparity = ftl::create<ftl::Configurable>(root, "disparity"); - configproxy->add(disparity, "source/disparity/algorithm", "algorithm"); - configproxy->add(disparity, "source/disparity/bilateral_filter", "bilateral_filter"); - configproxy->add(disparity, "source/disparity/optflow_filter", "optflow_filter"); - configproxy->add(disparity, "source/disparity/mls", "mls"); - configproxy->add(disparity, "source/disparity/cross", "cross"); - } - // Must find pose for each source... if (sources.size() > 1) { std::map<std::string, Eigen::Matrix4d> transformations; @@ -383,7 +323,7 @@ static void run(ftl::Configurable *root) { LOG(INFO) << "Shutting down..."; ftl::timer::stop(); - slave.stop(); + ctrl.stop(); net->shutdown(); ftl::pool.stop(); diff --git a/applications/recorder/src/main.cpp b/applications/recorder/src/main.cpp index 58966e176846367f232b9d214e0e5bd68064dede..d2001114f8f98c4bf35630c9676651bf6e91fae3 100644 --- a/applications/recorder/src/main.cpp +++ b/applications/recorder/src/main.cpp @@ -11,7 +11,7 @@ #include <ftl/rgbd.hpp> #include <ftl/rgbd/virtual.hpp> #include <ftl/rgbd/streamer.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> #include <ftl/rgbd/group.hpp> #include <ftl/threads.hpp> #include <ftl/codecs/writer.hpp> @@ -64,7 +64,7 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - ftl::ctrl::Slave slave(net, root); + ftl::ctrl::Master ctrl(root, net); // Controls auto *controls = ftl::create<ftl::Configurable>(root, "controls"); @@ -202,7 +202,7 @@ static void run(ftl::Configurable *root) { LOG(INFO) << "Shutting down..."; ftl::timer::stop(); - slave.stop(); + ctrl.stop(); net->shutdown(); ftl::pool.stop(); diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index f99754cbf558bb3d0f53fde37763855fac7af128..4f245a07705331324e3a2730411bec992f404dce 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -22,7 +22,7 @@ //#include <ftl/display.hpp> #include <ftl/rgbd/streamer.hpp> #include <ftl/net/universe.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> #include <nlohmann/json.hpp> #include "opencv2/imgproc.hpp" @@ -50,7 +50,7 @@ using json = nlohmann::json; static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - ftl::ctrl::Slave slave(net, root); + ftl::ctrl::Master ctrl(root, net); auto paths = root->get<vector<string>>("paths"); string file = ""; @@ -84,7 +84,7 @@ static void run(ftl::Configurable *root) { ftl::timer::start(true); LOG(INFO) << "Stopping..."; - slave.stop(); + ctrl.stop(); stream->stop(); net->shutdown(); diff --git a/components/codecs/include/ftl/codecs/hevc.hpp b/components/codecs/include/ftl/codecs/hevc.hpp index f658635d6f239b4aa7a21331f60f6936c517ba93..b3a32246544f3cf24a4ad09345c2f47a96eb0735 100644 --- a/components/codecs/include/ftl/codecs/hevc.hpp +++ b/components/codecs/include/ftl/codecs/hevc.hpp @@ -97,6 +97,10 @@ inline NALType getNALType(const std::vector<uint8_t> &data) { return static_cast<NALType>((data[4] >> 1) & 0x3F); } +inline bool validNAL(const std::vector<uint8_t> &data) { + return data[0] == 0 && data[1] == 0 && data[2] == 0 && data[3] == 1; +} + /** * Check the HEVC bitstream for an I-Frame. With NvPipe, all I-Frames start * with a VPS NAL unit so just check for this. diff --git a/components/codecs/src/nvpipe_decoder.cpp b/components/codecs/src/nvpipe_decoder.cpp index 77a3105f88b84f2b9c00f5dba152bbc9814c70db..d6652549c73fa5c5d6388030c480e73f331a4a7c 100644 --- a/components/codecs/src/nvpipe_decoder.cpp +++ b/components/codecs/src/nvpipe_decoder.cpp @@ -37,6 +37,7 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out is_float_channel_ = is_float_frame; last_definition_ = pkt.definition; + //LOG(INFO) << "DECODE OUT: " << out.rows << ", " << out.type(); //LOG(INFO) << "DECODE RESOLUTION: (" << (int)pkt.definition << ") " << ftl::codecs::getWidth(pkt.definition) << "x" << ftl::codecs::getHeight(pkt.definition); // Build a decoder instance of the correct kind @@ -49,8 +50,6 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out if (!nv_decoder_) { //LOG(INFO) << "Bitrate=" << (int)bitrate << " width=" << ABRController::getColourWidth(bitrate); LOG(FATAL) << "Could not create decoder: " << NvPipe_GetError(NULL); - } else { - DLOG(INFO) << "Decoder created"; } seen_iframe_ = false; @@ -60,38 +59,46 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out tmp_.create(cv::Size(ftl::codecs::getWidth(pkt.definition),ftl::codecs::getHeight(pkt.definition)), (is_float_frame) ? CV_16U : CV_8UC4); // Check for an I-Frame - if (pkt.codec == ftl::codecs::codec_t::HEVC) { - if (ftl::codecs::hevc::isIFrame(pkt.data)) seen_iframe_ = true; - } else if (pkt.codec == ftl::codecs::codec_t::H264) { - if (ftl::codecs::h264::isIFrame(pkt.data)) seen_iframe_ = true; + if (!seen_iframe_) { + if (pkt.codec == ftl::codecs::codec_t::HEVC) { + if (ftl::codecs::hevc::isIFrame(pkt.data)) seen_iframe_ = true; + } else if (pkt.codec == ftl::codecs::codec_t::H264) { + if (ftl::codecs::h264::isIFrame(pkt.data)) seen_iframe_ = true; + } } // No I-Frame yet so don't attempt to decode P-Frames. if (!seen_iframe_) return false; + // Final checks for validity + if (pkt.data.size() == 0 || tmp_.size() != out.size()) { // || !ftl::codecs::hevc::validNAL(pkt.data)) { + LOG(ERROR) << "Failed to decode packet"; + return false; + } + int rc = NvPipe_Decode(nv_decoder_, pkt.data.data(), pkt.data.size(), tmp_.data, tmp_.cols, tmp_.rows, tmp_.step); if (rc == 0) LOG(ERROR) << "NvPipe decode error: " << NvPipe_GetError(nv_decoder_); if (is_float_frame) { // Is the received frame the same size as requested output? - if (out.rows == ftl::codecs::getHeight(pkt.definition)) { + //if (out.rows == ftl::codecs::getHeight(pkt.definition)) { tmp_.convertTo(out, CV_32FC1, 1.0f/1000.0f, stream_); - } else { + /*} else { LOG(WARNING) << "Resizing decoded frame from " << tmp_.size() << " to " << out.size(); // FIXME: This won't work on GPU tmp_.convertTo(tmp_, CV_32FC1, 1.0f/1000.0f, stream_); cv::cuda::resize(tmp_, out, out.size(), 0, 0, cv::INTER_NEAREST, stream_); - } + }*/ } else { // Is the received frame the same size as requested output? - if (out.rows == ftl::codecs::getHeight(pkt.definition)) { + //if (out.rows == ftl::codecs::getHeight(pkt.definition)) { // Flag 0x1 means frame is in RGB so needs conversion to BGR if (pkt.flags & 0x1) { cv::cuda::cvtColor(tmp_, out, cv::COLOR_RGBA2BGR, 0, stream_); } else { cv::cuda::cvtColor(tmp_, out, cv::COLOR_BGRA2BGR, 0, stream_); } - } else { + /*} else { LOG(WARNING) << "Resizing decoded frame from " << tmp_.size() << " to " << out.size(); // FIXME: This won't work on GPU, plus it allocates extra memory... // Flag 0x1 means frame is in RGB so needs conversion to BGR @@ -101,7 +108,7 @@ bool NvPipeDecoder::decode(const ftl::codecs::Packet &pkt, cv::cuda::GpuMat &out cv::cuda::cvtColor(tmp_, tmp_, cv::COLOR_BGRA2BGR, 0, stream_); } cv::cuda::resize(tmp_, out, out.size(), 0.0, 0.0, cv::INTER_LINEAR, stream_); - } + }*/ } stream_.waitForCompletion(); diff --git a/components/codecs/src/nvpipe_encoder.cpp b/components/codecs/src/nvpipe_encoder.cpp index 132a3209ad0849dd76f1a5f7438eba8f5655b854..86fccdefc0d91f85694b105986eb49a423cc5863 100644 --- a/components/codecs/src/nvpipe_encoder.cpp +++ b/components/codecs/src/nvpipe_encoder.cpp @@ -123,7 +123,7 @@ bool NvPipeEncoder::encode(const cv::cuda::GpuMat &in, definition_t odefinition, pkt.data.resize(cs); was_reset_ = false; - if (cs == 0) { + if (cs == 0 || cs >= ftl::codecs::kVideoBufferSize) { LOG(ERROR) << "Could not encode video frame: " << NvPipe_GetError(nvenc_); return false; } else { diff --git a/components/codecs/test/nvpipe_codec_unit.cpp b/components/codecs/test/nvpipe_codec_unit.cpp index dc63131f7cb10435d7c73db21fffd8ff9a09af53..dccc65f9671a70ddc1879d12d0b8ef38aa9a1f01 100644 --- a/components/codecs/test/nvpipe_codec_unit.cpp +++ b/components/codecs/test/nvpipe_codec_unit.cpp @@ -81,7 +81,8 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) { }); } - SECTION("Full HD in, 720 out, FHD encoding") { + // No longer supported + /*SECTION("Full HD in, 720 out, FHD encoding") { in = cv::cuda::GpuMat(cv::Size(1920,1080), CV_8UC3, cv::Scalar(255,0,0)); out = cv::cuda::GpuMat(cv::Size(1280,720), CV_8UC3, cv::Scalar(0,0,0)); @@ -90,9 +91,10 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) { }); REQUIRE( (out.rows == 720) ); - } + }*/ - SECTION("HHD in, FHD out, FHD encoding") { + // No longer supported + /*SECTION("HHD in, FHD out, FHD encoding") { in = cv::cuda::GpuMat(cv::Size(1280,720), CV_8UC3, cv::Scalar(255,0,0)); out = cv::cuda::GpuMat(cv::Size(1920,1080), CV_8UC3, cv::Scalar(0,0,0)); @@ -101,9 +103,10 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) { }); REQUIRE( (out.rows == 1080) ); - } + }*/ - SECTION("FHD in, HHD out, SD encoding") { + // No longer supported + /*SECTION("FHD in, HHD out, SD encoding") { in = cv::cuda::GpuMat(cv::Size(1920,1080), CV_8UC3, cv::Scalar(255,0,0)); out = cv::cuda::GpuMat(cv::Size(1280,720), CV_8UC3, cv::Scalar(0,0,0)); @@ -112,7 +115,7 @@ TEST_CASE( "NvPipeDecoder::decode() - A colour test image" ) { }); REQUIRE( (out.rows == 720) ); - } + }*/ REQUIRE( r ); REQUIRE( (cv::cuda::sum(out) != cv::Scalar(0,0,0)) ); diff --git a/components/control/cpp/CMakeLists.txt b/components/control/cpp/CMakeLists.txt index f55ec9c19f0e6c30c3eafbaf203ee6a2e49b4ceb..3f6d4a29932ccd0cdadca61227de914d55d0f4b0 100644 --- a/components/control/cpp/CMakeLists.txt +++ b/components/control/cpp/CMakeLists.txt @@ -1,5 +1,4 @@ add_library(ftlctrl - src/slave.cpp src/master.cpp ) diff --git a/components/control/cpp/include/ftl/master.hpp b/components/control/cpp/include/ftl/master.hpp index 9b658aaadbedccdd86c452318ef1832230836ef7..c4782d16ea09928df9d8d3ca5273d84f5a86cf1c 100644 --- a/components/control/cpp/include/ftl/master.hpp +++ b/components/control/cpp/include/ftl/master.hpp @@ -10,8 +10,15 @@ #include <Eigen/Eigen> namespace ftl { + +class NetConfigurable; + namespace ctrl { +struct SystemState { + bool paused; +}; + struct LogEvent { int verbosity; std::string preamble; @@ -43,7 +50,7 @@ class Master { std::vector<std::string> getConfigurables(const ftl::UUID &peer); - std::vector<ftl::config::json_t> getSlaves(); + std::vector<ftl::config::json_t> getControllers(); std::vector<ftl::config::json_t> get(const std::string &uri); @@ -51,12 +58,26 @@ class Master { ftl::config::json_t get(const ftl::UUID &peer, const std::string &uri); + ftl::config::json_t getConfigurable(const ftl::UUID &peer, const std::string &uri); + void watch(const std::string &uri, std::function<void()> f); Eigen::Matrix4d getPose(const std::string &uri); void setPose(const std::string &uri, const Eigen::Matrix4d &pose); + /** + * Clean up to remove log and status forwarding over the network. + */ + void stop(); + + /** + * Do not call! Automatically called from logging subsystem. + */ + void sendLog(const loguru::Message& message); + + bool isPaused() const { return state_.paused; } + // Events //void onError(); @@ -72,6 +93,12 @@ class Master { std::vector<std::function<void(const LogEvent&)>> log_handlers_; ftl::Configurable *root_; ftl::net::Universe *net_; + std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables_; + std::vector<ftl::UUID> log_peers_; + RECURSIVE_MUTEX mutex_; + bool in_log_; + bool active_; + SystemState state_; }; } diff --git a/components/control/cpp/include/ftl/slave.hpp b/components/control/cpp/include/ftl/slave.hpp deleted file mode 100644 index e3ebd69e46fa5177eebbcc2d8d6b2f2cce70f204..0000000000000000000000000000000000000000 --- a/components/control/cpp/include/ftl/slave.hpp +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef _FTL_CTRL_SLAVE_HPP_ -#define _FTL_CTRL_SLAVE_HPP_ - -#include <ftl/net/universe.hpp> -#include <ftl/configurable.hpp> -#include <loguru.hpp> -#include <ftl/threads.hpp> - -namespace ftl { -namespace ctrl { - -struct SystemState { - bool paused; -}; - -/** - * Allows a node to be remote controlled and observed over the network. All - * such nodes should create a single instance of this class, but must call - * "stop()" before terminating the network. - */ -class Slave { - public: - Slave(ftl::net::Universe *, ftl::Configurable *); - ~Slave(); - - /** - * Clean up to remove log and status forwarding over the network. - */ - void stop(); - - /** - * Do not call! Automatically called from logging subsystem. - */ - void sendLog(const loguru::Message& message); - - bool isPaused() const { return state_.paused; } - - private: - std::vector<ftl::UUID> log_peers_; - ftl::net::Universe *net_; - RECURSIVE_MUTEX mutex_; - bool in_log_; - bool active_; - SystemState state_; -}; - -} -} - -#endif // _FTL_CTRL_SLAVE_HPP_ diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index a13a67a90b4bc7ccc1bd2c893580fc286dfb330a..059dfe9144a91d0a7d706de811ce353f0050a2ac 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -1,4 +1,5 @@ #include <ftl/master.hpp> +#include <ftl/net_configurable.hpp> using ftl::ctrl::Master; using ftl::net::Universe; @@ -11,6 +12,51 @@ using ftl::ctrl::LogEvent; Master::Master(Configurable *root, Universe *net) : root_(root), net_(net) { + // Init system state + state_.paused = false; + + net->bind("restart", []() { + LOG(WARNING) << "Remote restart..."; + //exit(1); + ftl::exit_code = 1; + ftl::running = false; + }); + + net->bind("shutdown", []() { + LOG(WARNING) << "Remote shutdown..."; + //exit(0); + ftl::running = false; + }); + + net->bind("pause", [this]() { + state_.paused = !state_.paused; + }); + + net->bind("update_cfg", [](const std::string &uri, const std::string &value) { + ftl::config::update(uri, nlohmann::json::parse(value)); + }); + + net->bind("get_cfg", [](const std::string &uri) -> std::string { + return ftl::config::resolve(uri, false).dump(); + }); + + net->bind("get_configurable", [](const std::string &uri) -> std::string { + return ftl::config::find(uri)->getConfig().dump(); + }); + + net->bind("list_configurables", []() { + return ftl::config::list(); + }); + + net->bind("log_subscribe", [this](const ftl::UUID &peer) { + UNIQUE_LOCK(mutex_, lk); + log_peers_.push_back(peer); + }); + + net->bind("connect", [this](const std::string &url) { + net_->connect(url); + }); + net->bind("log", [this](int v, const std::string &pre, const std::string &msg) { for (auto f : log_handlers_) { f({v,pre,msg}); @@ -20,16 +66,35 @@ Master::Master(Configurable *root, Universe *net) net->bind("node_details", [net,root]() -> std::vector<std::string> { ftl::config::json_t json { {"id", net->id().to_string()}, - {"title", root->value("title", *root->get<string>("$id"))}, - {"kind", "master"} + {"title", root->value("title", *root->get<string>("$id"))} }; return {json.dump()}; }); //net->broadcast("log_subscribe", net->id()); - net->onConnect([this](ftl::net::Peer*) { + net->onConnect([this](ftl::net::Peer *p) { //net_->broadcast("log_subscribe", net_->id()); + ftl::UUID peer = p->id(); + auto cs = getConfigurables(peer); + for (auto c : cs) { + //LOG(INFO) << "NET CONFIG: " << c; + ftl::config::json_t *configuration = new ftl::config::json_t; + *configuration = getConfigurable(peer, c); + if (!configuration->empty()) { + ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *this, *configuration); + peerConfigurables_[peer].push_back(nc); + } + } + }); + + net->onDisconnect([this](ftl::net::Peer *p) { + ftl::UUID peer = p->id(); + for (ftl::NetConfigurable *nc : peerConfigurables_[peer]) { + ftl::config::json_t *configuration = &(nc->getConfig()); + delete nc; + delete configuration; + } }); } @@ -70,7 +135,7 @@ void Master::set(const ftl::UUID &peer, const string &uri, const json_t &value) net_->send(peer, "update_cfg", uri, value.dump()); } -vector<json_t> Master::getSlaves() { +vector<json_t> Master::getControllers() { auto response = net_->findAll<string>("node_details"); vector<json_t> result; for (auto &r : response) { @@ -105,6 +170,10 @@ json_t Master::get(const ftl::UUID &peer, const string &uri) { return json_t::parse(net_->call<string>(peer, "get_cfg", uri)); } +json_t Master::getConfigurable(const ftl::UUID &peer, const string &uri) { + return json_t::parse(net_->call<string>(peer, "get_configurable", uri)); +} + void Master::watch(const string &uri, function<void()> f) { } @@ -133,4 +202,34 @@ void Master::setPose(const std::string &uri, const Eigen::Matrix4d &pose) { //void onError(); void Master::onLog(function<void(const LogEvent &)> h) { log_handlers_.push_back(h); +} + +void Master::stop() { + if (!active_) return; + active_ = false; + loguru::remove_all_callbacks(); + net_->unbind("restart"); + net_->unbind("shutdown"); + net_->unbind("update_cfg"); + net_->unbind("get_cfg"); + net_->unbind("slave_details"); // TODO: Remove + net_->unbind("log_subscribe"); +} + +void Master::sendLog(const loguru::Message& message) { + UNIQUE_LOCK(mutex_, lk); + if (in_log_) return; + in_log_ = true; + + for (auto &p : log_peers_) { + auto peer = net_->getPeer(p); + if (!peer || !peer->isConnected()) continue; + + std::cout << "sending log to master..." << std::endl; + if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) { + // TODO(Nick) Remove peer from loggers list... + } + } + + in_log_ = false; } \ No newline at end of file diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp deleted file mode 100644 index cd2d0f1ff03fa3aa88d94420a6811aac63c9c7b9..0000000000000000000000000000000000000000 --- a/components/control/cpp/src/slave.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include <ftl/slave.hpp> - -#include <ftl/threads.hpp> - -using ftl::Configurable; -using ftl::net::Universe; -using ftl::ctrl::Slave; -using std::string; - -// static void netLog(void* user_data, const loguru::Message& message) { -// Slave *slave = static_cast<Slave*>(user_data); -// slave->sendLog(message); -// } - -Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false), active_(true) { - - // Init system state - state_.paused = false; - - net->bind("restart", []() { - LOG(WARNING) << "Remote restart..."; - //exit(1); - ftl::exit_code = 1; - ftl::running = false; - }); - - net->bind("shutdown", []() { - LOG(WARNING) << "Remote shutdown..."; - //exit(0); - ftl::running = false; - }); - - net->bind("pause", [this]() { - state_.paused = !state_.paused; - }); - - net->bind("update_cfg", [](const std::string &uri, const std::string &value) { - ftl::config::update(uri, nlohmann::json::parse(value)); - }); - - net->bind("get_cfg", [](const std::string &uri) -> std::string { - return ftl::config::resolve(uri, false).dump(); - }); - - net->bind("list_configurables", []() { - return ftl::config::list(); - }); - - net->bind("node_details", [net,root]() -> std::vector<std::string> { - ftl::config::json_t json { - {"id", net->id().to_string()}, - {"title", root->value("title", *root->get<string>("$id"))}, - {"kind", "slave"} - }; - return {json.dump()}; - }); - - net->bind("log_subscribe", [this](const ftl::UUID &peer) { - UNIQUE_LOCK(mutex_, lk); - log_peers_.push_back(peer); - }); - - net->bind("connect", [this](const std::string &url) { - net_->connect(url); - }); - - //net->onConnect([this](ftl::net::Peer *peer) { - // net_->broadcast("new_peer", peer->id()); - //}); - - //loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO); -} - -Slave::~Slave() { - stop(); -} - -void Slave::stop() { - if (!active_) return; - active_ = false; - loguru::remove_all_callbacks(); - net_->unbind("restart"); - net_->unbind("shutdown"); - net_->unbind("update_cfg"); - net_->unbind("get_cfg"); - net_->unbind("slave_details"); - net_->unbind("log_subscribe"); -} - -void Slave::sendLog(const loguru::Message& message) { - UNIQUE_LOCK(mutex_, lk); - if (in_log_) return; - in_log_ = true; - - for (auto &p : log_peers_) { - auto peer = net_->getPeer(p); - if (!peer || !peer->isConnected()) continue; - - std::cout << "sending log to master..." << std::endl; - if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) { - // TODO(Nick) Remove peer from loggers list... - } - } - - in_log_ = false; -} diff --git a/components/net/cpp/src/net_configurable.cpp b/components/net/cpp/src/net_configurable.cpp index cf597c5c77eec05205b6bf0de3f360c7fac178b2..00852c666b1024b4c61b54b8f937ce88d07039e6 100644 --- a/components/net/cpp/src/net_configurable.cpp +++ b/components/net/cpp/src/net_configurable.cpp @@ -12,5 +12,5 @@ void ftl::NetConfigurable::inject(const std::string &name, nlohmann::json &value } void ftl::NetConfigurable::refresh() { - (*config_) = ctrl.get(peer, suri); + (*config_) = ctrl.getConfigurable(peer, suri); } diff --git a/components/net/cpp/test/net_configurable_unit.cpp b/components/net/cpp/test/net_configurable_unit.cpp index c8bf42247ef3f23023b25c159a968deda1fd419b..e3fef144490f64ce06375ad8226056afd2f5329f 100644 --- a/components/net/cpp/test/net_configurable_unit.cpp +++ b/components/net/cpp/test/net_configurable_unit.cpp @@ -1,6 +1,6 @@ #include "catch.hpp" #include <ftl/net_configurable.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> using ftl::NetConfigurable; @@ -14,20 +14,20 @@ SCENARIO( "NetConfigurable::set()" ) { net->start(); ftl::ctrl::Master *controller = new ftl::ctrl::Master(root, net); - // Set up a slave, then call getSlaves() to get the UUID string + // Set up a slave, then call getControllers() to get the UUID string nlohmann::json jsonSlave = {{"$id", "slave"}, {"test", {{"peers", {"tcp://localhost:7077"}}}}}; ftl::Configurable *rootSlave; rootSlave = new ftl::Configurable(jsonSlave); ftl::net::Universe *netSlave = ftl::config::create<ftl::net::Universe>(rootSlave, std::string("test")); - ftl::ctrl::Slave slave(netSlave, rootSlave); + ftl::ctrl::Master ctrl(rootSlave, netSlave); netSlave->start(); netSlave->waitConnections(); net->waitConnections(); - auto slaves = controller->getSlaves(); - REQUIRE( slaves.size() == 1 ); + auto controllers = controller->getControllers(); + REQUIRE( controllers.size() == 1 ); - ftl::UUID peer = ftl::UUID(slaves[0]["id"].get<std::string>()); + ftl::UUID peer = ftl::UUID(controllers[0]["id"].get<std::string>()); const std::string suri = "slave_test"; nlohmann::json jsonTest = {{"$id", "slave_test"}, {"test", {{"peers", {"tcp://localhost:7077"}}}}}; NetConfigurable nc(peer, suri, *controller, jsonTest); diff --git a/components/rgbd-sources/include/ftl/rgbd/frame.hpp b/components/rgbd-sources/include/ftl/rgbd/frame.hpp index e7a949600e6ba097aeda54460e83a1529851371e..8411c71a626e23216fcedac5df35e0ce49863f3b 100644 --- a/components/rgbd-sources/include/ftl/rgbd/frame.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/frame.hpp @@ -223,7 +223,7 @@ ftl::cuda::TextureObject<T> &Frame::createTexture(ftl::codecs::Channel c, const //LOG(INFO) << "Creating texture object"; m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated); } else if (m.tex.cvType() != ftl::traits::OpenCVType<T>::value || m.tex.width() != m.gpu.cols || m.tex.height() != m.gpu.rows) { - LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'"; + //LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'"; m.tex.free(); m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated); } @@ -256,7 +256,7 @@ ftl::cuda::TextureObject<T> &Frame::createTexture(ftl::codecs::Channel c, bool i //LOG(INFO) << "Creating texture object"; m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated); } else if (m.tex.cvType() != ftl::traits::OpenCVType<T>::value || m.tex.width() != m.gpu.cols || m.tex.height() != m.gpu.rows || m.tex.devicePtr() != m.gpu.data) { - LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'."; + //LOG(INFO) << "Recreating texture object for '" << ftl::codecs::name(c) << "'."; m.tex.free(); m.tex = ftl::cuda::TextureObject<T>(m.gpu, interpolated); } diff --git a/components/rgbd-sources/src/abr.cpp b/components/rgbd-sources/src/abr.cpp index c338d4725ed2ab493fd61143d24b9b7241453622..d387cde26990f5e5acc1d38530375f73733d3789 100644 --- a/components/rgbd-sources/src/abr.cpp +++ b/components/rgbd-sources/src/abr.cpp @@ -41,7 +41,7 @@ bitrate_t ABRController::selectBitrate(const NetFrame &frame) { float actual_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(frame.tx_latency))) / 1048576.0f; float min_mbps = (float(frame.tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f; - //LOG(INFO) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps"; + //if (actual_mbps < min_mbps) LOG(WARNING) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps"; float ratio = actual_mbps / min_mbps; //LOG(INFO) << "Rate Ratio = " << frame.tx_latency; diff --git a/components/rgbd-sources/src/group.cpp b/components/rgbd-sources/src/group.cpp index 625d62e2c9767ee6164d2835e832de20994ec983..aad850d2cf9d501d6d655b1f77978de6f1bab39e 100644 --- a/components/rgbd-sources/src/group.cpp +++ b/components/rgbd-sources/src/group.cpp @@ -214,8 +214,8 @@ void Group::sync(std::function<bool(ftl::rgbd::FrameSet &)> cb) { try { cb(*fs); //LOG(INFO) << "Frameset processed (" << name_ << "): " << fs->timestamp; - } catch(...) { - LOG(ERROR) << "Exception in group sync callback"; + } catch(std::exception &e) { + LOG(ERROR) << "Exception in group sync callback: " << e.what(); } // The buffers are invalid after callback so mark stale diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 4a7873515a2a77faf44965e94208ec005085caf8..f4f217af128b559c45c0d109de3a670c29393159 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -314,12 +314,15 @@ void Source::inject(const Eigen::Matrix4d &pose) { spkt.channel_count = 0; spkt.channel = Channel::Pose; spkt.streamID = 0; - pkt.codec = ftl::codecs::codec_t::POSE; + pkt.codec = ftl::codecs::codec_t::MSGPACK; pkt.definition = ftl::codecs::definition_t::Any; pkt.block_number = 0; pkt.block_total = 1; pkt.flags = 0; - pkt.data = std::move(std::vector<uint8_t>((uint8_t*)pose.data(), (uint8_t*)pose.data() + 4*4*sizeof(double))); + + std::vector<double> data(pose.data(), pose.data() + 4*4*sizeof(double)); + VectorBuffer buf(pkt.data); + msgpack::pack(buf, data); notifyRaw(spkt, pkt); } diff --git a/components/rgbd-sources/src/sources/ftlfile/file_source.cpp b/components/rgbd-sources/src/sources/ftlfile/file_source.cpp index 0962c1886dc199e50530343c0d01edf4e74e37f0..554a558b3b5cecffc94f59ae8f3f98a65051d72e 100644 --- a/components/rgbd-sources/src/sources/ftlfile/file_source.cpp +++ b/components/rgbd-sources/src/sources/ftlfile/file_source.cpp @@ -93,7 +93,12 @@ void FileSource::_processPose(ftl::codecs::Packet &pkt) { Eigen::Matrix4d p = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data()); host_->setPose(p); } else if (pkt.codec == codec_t::MSGPACK) { + auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); + std::vector<double> posevec; + unpacked.get().convert(posevec); + Eigen::Matrix4d p(posevec.data()); + host_->setPose(p); } } @@ -184,21 +189,34 @@ bool FileSource::compute(int n, int b) { if (c.spkt.channel == Channel::Colour) { rgb_.create(cv::Size(ftl::codecs::getWidth(c.pkt.definition),ftl::codecs::getHeight(c.pkt.definition)), CV_8UC3); - } else { + _createDecoder(0, c.pkt); + + try { + decoders_[0]->decode(c.pkt, rgb_); + } catch (std::exception &e) { + LOG(INFO) << "Decoder exception: " << e.what(); + } + } else if (host_->getChannel() == c.spkt.channel) { depth_.create(cv::Size(ftl::codecs::getWidth(c.pkt.definition),ftl::codecs::getHeight(c.pkt.definition)), CV_32F); + _createDecoder(1, c.pkt); + try { + decoders_[1]->decode(c.pkt, depth_); + } catch (std::exception &e) { + LOG(INFO) << "Decoder exception: " << e.what(); + } } - _createDecoder((c.spkt.channel == Channel::Colour) ? 0 : 1, c.pkt); + //_createDecoder((c.spkt.channel == Channel::Colour) ? 0 : 1, c.pkt); - try { + /*try { decoders_[(c.spkt.channel == Channel::Colour) ? 0 : 1]->decode(c.pkt, (c.spkt.channel == Channel::Colour) ? rgb_ : depth_); } catch (std::exception &e) { LOG(INFO) << "Decoder exception: " << e.what(); - } + }*/ } // FIXME: Consider case of Channel::None - if (lastc != 2) { + if (lastc < 2) { LOG(ERROR) << "Channels not in sync (" << sourceid_ << "): " << lastts; return false; } diff --git a/components/rgbd-sources/src/sources/net/net.cpp b/components/rgbd-sources/src/sources/net/net.cpp index 694aa50f884210664aa33bc3a4cb39dbb6f9d3b2..c29ad5491b0b0a15a3d8549936bfe14b18762330 100644 --- a/components/rgbd-sources/src/sources/net/net.cpp +++ b/components/rgbd-sources/src/sources/net/net.cpp @@ -8,6 +8,7 @@ #include "colour.hpp" #include <ftl/rgbd/streamer.hpp> +#include <ftl/codecs/bitrates.hpp> using ftl::rgbd::detail::NetFrame; using ftl::rgbd::detail::NetFrameQueue; @@ -21,6 +22,7 @@ using std::this_thread::sleep_for; using std::chrono::milliseconds; using std::tuple; using ftl::codecs::Channel; +using ftl::codecs::codec_t; // ===== NetFrameQueue ========================================================= @@ -250,7 +252,24 @@ void NetSource::_processCalibration(const ftl::codecs::Packet &pkt) { } void NetSource::_processPose(const ftl::codecs::Packet &pkt) { - LOG(INFO) << "Got POSE channel"; + if (pkt.codec == ftl::codecs::codec_t::POSE) { + Eigen::Matrix4d p = Eigen::Map<Eigen::Matrix4d>((double*)pkt.data.data()); + //host_->setPose(p); + } else if (pkt.codec == ftl::codecs::codec_t::MSGPACK) { + auto unpacked = msgpack::unpack((const char*)pkt.data.data(), pkt.data.size()); + std::vector<double> posevec; + unpacked.get().convert(posevec); + + Eigen::Matrix4d p(posevec.data()); + //host_->setPose(p); + // TODO: What to do with pose? + } +} + +void NetSource::_checkDataRate(size_t tx_size, int64_t tx_latency) { + float actual_mbps = (float(tx_size) * 8.0f * (1000.0f / float(tx_latency))) / 1048576.0f; + float min_mbps = (float(tx_size) * 8.0f * (1000.0f / float(ftl::timer::getInterval()))) / 1048576.0f; + if (actual_mbps < min_mbps) LOG(WARNING) << "Bitrate = " << actual_mbps << "Mbps, min required = " << min_mbps << "Mbps"; } void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { @@ -276,10 +295,36 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk LOG(WARNING) << "Missing calibration, skipping frame"; return; } + + //LOG(INFO) << "PACKET: " << spkt.timestamp << ", " << (int)spkt.channel << ", " << (int)pkt.codec; const cv::Size size = cv::Size(ftl::codecs::getWidth(pkt.definition), ftl::codecs::getHeight(pkt.definition)); NetFrame &frame = queue_.getFrame(spkt.timestamp, size, CV_8UC3, (isFloatChannel(chan) ? CV_32FC1 : CV_8UC3)); + if (timestamp_ > 0 && frame.timestamp <= timestamp_) { + LOG(ERROR) << "Duplicate frame - " << frame.timestamp << " received=" << int(rchan) << " uri=" << uri_; + return; + } + + // Calculate how many packets to expect for this channel + if (frame.chunk_total[channum] == 0) { + frame.chunk_total[channum] = pkt.block_total; + } + + // Capture tx time of first received chunk + if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) { + UNIQUE_LOCK(frame.mtx, flk); + if (frame.chunk_count[0] == 0 && frame.chunk_count[1] == 0) { + frame.tx_latency = int64_t(ttimeoff); + } + } + + ++frame.chunk_count[channum]; + if (frame.chunk_count[channum] > frame.chunk_total[channum]) { + LOG(WARNING) << "Too many channel packets received, discarding"; + return; + } + // Update frame statistics frame.tx_size += pkt.data.size(); @@ -308,29 +353,10 @@ void NetSource::_recvPacket(short ttimeoff, const ftl::codecs::StreamPacket &spk //ftl::rgbd::colourCorrection(tmp_rgb, gamma_, temperature_); // TODO:(Nick) Decode directly into double buffer if no scaling + + _checkDataRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff)); - if (timestamp_ > 0 && frame.timestamp <= timestamp_) { - LOG(ERROR) << "BAD DUPLICATE FRAME - " << frame.timestamp << " received=" << int(rchan) << " uri=" << uri_; - return; - } - - // Calculate how many packets to expect for this channel - if (frame.chunk_total[channum] == 0) { - frame.chunk_total[channum] = pkt.block_total; - } - - ++frame.chunk_count[channum]; if (frame.chunk_count[channum] == frame.chunk_total[channum]) ++frame.channel_count; - if (frame.chunk_count[channum] > frame.chunk_total[channum]) LOG(FATAL) << "TOO MANY CHUNKS"; - - // Capture tx time of first received chunk - // FIXME: This seems broken - if (channum == 1 && frame.chunk_count[channum] == 1) { - UNIQUE_LOCK(frame.mtx, flk); - if (frame.chunk_count[channum] == 1) { - frame.tx_latency = int64_t(ttimeoff); - } - } // Last chunk of both channels now received, so we are done. if (frame.channel_count == spkt.channel_count) { diff --git a/components/rgbd-sources/src/sources/net/net.hpp b/components/rgbd-sources/src/sources/net/net.hpp index 5cef2726d2cdc5c34c161a74b25d45234f55ce48..515bb8a5ff7ee5d788530d9ad00495f7f880b83d 100644 --- a/components/rgbd-sources/src/sources/net/net.hpp +++ b/components/rgbd-sources/src/sources/net/net.hpp @@ -89,6 +89,7 @@ class NetSource : public detail::Source { void _processCalibration(const ftl::codecs::Packet &pkt); void _processConfig(const ftl::codecs::Packet &pkt); void _processPose(const ftl::codecs::Packet &pkt); + void _checkDataRate(size_t tx_size, int64_t tx_latency); }; } diff --git a/components/rgbd-sources/src/sources/snapshot/snapshot_source.cpp b/components/rgbd-sources/src/sources/snapshot/snapshot_source.cpp index 136a2e7dfcc7b279228cdd5c6efc0bfb8d303baa..61acdb9d814560c72fcd15e7f05cdfaf8fda66cd 100644 --- a/components/rgbd-sources/src/sources/snapshot/snapshot_source.cpp +++ b/components/rgbd-sources/src/sources/snapshot/snapshot_source.cpp @@ -53,6 +53,8 @@ SnapshotSource::SnapshotSource(ftl::rgbd::Source *host, Snapshot &snapshot, cons host->setPose(pose); mspf_ = 1000 / host_->value("fps", 20); + + cudaStreamCreate(&stream_); } bool SnapshotSource::compute(int n, int b) { @@ -61,11 +63,14 @@ bool SnapshotSource::compute(int n, int b) { //snap_rgb_.copyTo(rgb_); //snap_depth_.copyTo(depth_); - rgb_.upload(snap_rgb_); - depth_.upload(snap_depth_); - - auto cb = host_->callback(); - if (cb) cb(timestamp_, rgb_, depth_); + cv::cuda::Stream cvstream = cv::cuda::StreamAccessor::wrapStream(stream_); + rgb_.upload(snap_rgb_, cvstream); + depth_.upload(snap_depth_, cvstream); + cudaStreamSynchronize(stream_); + + //auto cb = host_->callback(); + //if (cb) cb(timestamp_, rgb_, depth_); + host_->notify(timestamp_, rgb_, depth_); frame_idx_ = (frame_idx_ + 1) % snapshot_.getFramesCount(); diff --git a/components/rgbd-sources/src/sources/snapshot/snapshot_source.hpp b/components/rgbd-sources/src/sources/snapshot/snapshot_source.hpp index de1b0df48be79df732f51144226f5c7e6d2f0478..80a0bf392b39fb9d5215dd80034768d806ac7957 100644 --- a/components/rgbd-sources/src/sources/snapshot/snapshot_source.hpp +++ b/components/rgbd-sources/src/sources/snapshot/snapshot_source.hpp @@ -32,6 +32,7 @@ class SnapshotSource : public detail::Source { cv::Mat snap_rgb_; cv::Mat snap_depth_; int mspf_; + cudaStream_t stream_; }; } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index a4b02f34554898ced490c22f5f6eb5f365080c41..a672c1bd136e03ed52f99dbd38237b52efb67472 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -343,7 +343,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID // Finally, inject calibration and config data s->src->inject(Channel::Calibration, s->src->parameters(Channel::Left), Channel::Left, s->src->getCapabilities()); s->src->inject(Channel::Calibration, s->src->parameters(Channel::Right), Channel::Right, s->src->getCapabilities()); - //s->src->inject(s->src->getPose()); + s->src->inject(s->src->getPose()); //if (!(*s->src->get<nlohmann::json>("meta")).is_null()) { s->src->inject(Channel::Configuration, "/original", s->src->getConfig().dump()); //} @@ -464,20 +464,32 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { auto *enc1 = src->hq_encoder_c1; auto *enc2 = src->hq_encoder_c2; - // Important to send channel 2 first if needed... - // Receiver only waits for channel 1 by default - // TODO: Each encode could be done in own thread - if (hasChan2) { - // TODO: Stagger the reset between nodes... random phasing - if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset(); + MUTEX mtx; + std::condition_variable cv; + bool chan2done = false; - auto chan = fs.sources[j]->getChannel(); - - enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan](const ftl::codecs::Packet &blk){ - _transmitPacket(src, blk, chan, hasChan2, Quality::High); + if (hasChan2) { + ftl::pool.push([this,&fs,enc2,src,hasChan2,&cv,j,&chan2done](int id) { + // TODO: Stagger the reset between nodes... random phasing + if (fs.timestamp % (10*ftl::timer::getInterval()) == 0) enc2->reset(); + + auto chan = fs.sources[j]->getChannel(); + + try { + enc2->encode(fs.frames[j].get<cv::cuda::GpuMat>(chan), src->hq_bitrate, [this,src,hasChan2,chan,&cv,&chan2done](const ftl::codecs::Packet &blk){ + _transmitPacket(src, blk, chan, hasChan2, Quality::High); + chan2done = true; + cv.notify_one(); + }); + } catch (std::exception &e) { + LOG(ERROR) << "Exception in encoder: " << e.what(); + chan2done = true; + cv.notify_one(); + } }); } else { if (enc2) enc2->reset(); + chan2done = true; } // TODO: Stagger the reset between nodes... random phasing @@ -485,6 +497,10 @@ void Streamer::_process(ftl::rgbd::FrameSet &fs) { enc1->encode(fs.frames[j].get<cv::cuda::GpuMat>(Channel::Colour), src->hq_bitrate, [this,src,hasChan2](const ftl::codecs::Packet &blk){ _transmitPacket(src, blk, Channel::Colour, hasChan2, Quality::High); }); + + // Ensure both channels have been completed. + std::unique_lock<std::mutex> lk(mtx); + cv.wait(lk, [&chan2done]{ return chan2done; }); } }