diff --git a/applications/gui/src/ctrl_window.cpp b/applications/gui/src/ctrl_window.cpp index fc82c45838a568efc8b4e8d456cdb3fbbef2c8b4..3681022bd10c832ea2d722b1eb0bd02877b1af4a 100644 --- a/applications/gui/src/ctrl_window.cpp +++ b/applications/gui/src/ctrl_window.cpp @@ -121,7 +121,7 @@ void ControlWindow::_addNode() { } void ControlWindow::_updateDetails() { - node_details_ = ctrl_->getSlaves(); + node_details_ = ctrl_->getControllers(); node_titles_.clear(); for (auto &d : node_details_) { diff --git a/applications/gui/src/main.cpp b/applications/gui/src/main.cpp index 5ebc552fa0ee0664c7a0b7898a723b90bf87f29c..99af829eb646f20f1e0f680837e8855739fa6be1 100644 --- a/applications/gui/src/main.cpp +++ b/applications/gui/src/main.cpp @@ -23,32 +23,6 @@ int main(int argc, char **argv) { } }); - std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables; - - // FIXME: Move this elsewhere, it is not just for GUI - net->onConnect([&controller, &peerConfigurables](ftl::net::Peer *p) { - ftl::UUID peer = p->id(); - auto cs = controller->getConfigurables(peer); - for (auto c : cs) { - //LOG(INFO) << "NET CONFIG: " << c; - ftl::config::json_t *configuration = new ftl::config::json_t; - *configuration = controller->get(peer, c); - if (!configuration->empty()) { - ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *controller, *configuration); - peerConfigurables[peer].push_back(nc); - } - } - }); - - net->onDisconnect([&peerConfigurables](ftl::net::Peer *p) { - ftl::UUID peer = p->id(); - for (ftl::NetConfigurable *nc : peerConfigurables[peer]) { - ftl::config::json_t *configuration = &(nc->getConfig()); - delete nc; - delete configuration; - } - }); - net->start(); net->waitConnections(); diff --git a/applications/gui/src/screen.cpp b/applications/gui/src/screen.cpp index d0d9944214343699bd04230b07da47799f1f415f..77c4e4318e61e2916fb47ba1f4f7a5ec08760e3b 100644 --- a/applications/gui/src/screen.cpp +++ b/applications/gui/src/screen.cpp @@ -244,7 +244,7 @@ ftl::gui::Screen::Screen(ftl::Configurable *proot, ftl::net::Universe *pnet, ftl //net_->onConnect([this,popup](ftl::net::Peer *p) { { LOG(INFO) << "NET CONNECT"; - auto node_details = ctrl_->getSlaves(); + auto node_details = ctrl_->getControllers(); for (auto &d : node_details) { LOG(INFO) << "ADDING TITLE: " << d.dump(); diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index dac32881b73df64b9c02d8803c4b6eba6e763871..9cb5c0819d144c66cc49f125adbe64dc360deed5 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -12,7 +12,7 @@ #include <ftl/rgbd.hpp> #include <ftl/rgbd/virtual.hpp> #include <ftl/rgbd/streamer.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> #include <ftl/rgbd/group.hpp> #include <ftl/threads.hpp> #include <ftl/codecs/writer.hpp> @@ -82,58 +82,9 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { return rz * rx * ry; } -// TODO: * Remove this class (requires more general solution). Also does -// not process disconnections/reconnections/types etc. correctly. -// * Update when new options become available. - -class ConfigProxy { - private: - vector<ftl::UUID> peers_; - vector<std::string> uris_; - ftl::net::Universe *net_; - - public: - ConfigProxy(ftl::net::Universe *net) { - net_ = net; - - auto response = net_->findAll<std::string>("node_details"); - for (auto &r : response) { - auto r_json = json_t::parse(r); - peers_.push_back(ftl::UUID(r_json["id"].get<std::string>())); - uris_.push_back(r_json["title"].get<std::string>()); - } - } - - void add(ftl::Configurable *root, const std::string &uri, const std::string &name) { - auto config = json_t::parse(net_->call<string>(peers_[0], "get_cfg", uris_[0] + "/" + uri)); - auto *proxy = ftl::create<ftl::Configurable>(root, name); - - try { - for (auto &itm : config.get<json::object_t>()) { - auto key = itm.first; - auto value = itm.second; - if (*key.begin() == '$') { continue; } - - proxy->set(key, value); - proxy->on(key, [this, uri, key, value, proxy](const ftl::config::Event&) { - for (size_t i = 0; i < uris_.size(); i++) { - // TODO: check that config exists? - auto peer = peers_[i]; - std::string name = uris_[i] + "/" + uri + "/" + key; - net_->send(peer, "update_cfg", name, proxy->getConfig()[key].dump()); - } - }); - } - } - catch (nlohmann::detail::type_error) { - LOG(ERROR) << "Failed to add config proxy for: " << uri << "/" << name; - } - } -}; - static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - ftl::ctrl::Slave slave(net, root); + ftl::ctrl::Master ctrl(root, net); // Controls auto *controls = ftl::create<ftl::Configurable>(root, "controls"); @@ -192,17 +143,6 @@ static void run(ftl::Configurable *root) { return; } - ConfigProxy *configproxy = nullptr; - if (net->numberOfPeers() > 0) { - configproxy = new ConfigProxy(net); // TODO delete - auto *disparity = ftl::create<ftl::Configurable>(root, "disparity"); - configproxy->add(disparity, "source/disparity/algorithm", "algorithm"); - configproxy->add(disparity, "source/disparity/bilateral_filter", "bilateral_filter"); - configproxy->add(disparity, "source/disparity/optflow_filter", "optflow_filter"); - configproxy->add(disparity, "source/disparity/mls", "mls"); - configproxy->add(disparity, "source/disparity/cross", "cross"); - } - // Must find pose for each source... if (sources.size() > 1) { std::map<std::string, Eigen::Matrix4d> transformations; @@ -383,7 +323,7 @@ static void run(ftl::Configurable *root) { LOG(INFO) << "Shutting down..."; ftl::timer::stop(); - slave.stop(); + ctrl.stop(); net->shutdown(); ftl::pool.stop(); diff --git a/applications/recorder/src/main.cpp b/applications/recorder/src/main.cpp index 58966e176846367f232b9d214e0e5bd68064dede..d2001114f8f98c4bf35630c9676651bf6e91fae3 100644 --- a/applications/recorder/src/main.cpp +++ b/applications/recorder/src/main.cpp @@ -11,7 +11,7 @@ #include <ftl/rgbd.hpp> #include <ftl/rgbd/virtual.hpp> #include <ftl/rgbd/streamer.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> #include <ftl/rgbd/group.hpp> #include <ftl/threads.hpp> #include <ftl/codecs/writer.hpp> @@ -64,7 +64,7 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - ftl::ctrl::Slave slave(net, root); + ftl::ctrl::Master ctrl(root, net); // Controls auto *controls = ftl::create<ftl::Configurable>(root, "controls"); @@ -202,7 +202,7 @@ static void run(ftl::Configurable *root) { LOG(INFO) << "Shutting down..."; ftl::timer::stop(); - slave.stop(); + ctrl.stop(); net->shutdown(); ftl::pool.stop(); diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index f99754cbf558bb3d0f53fde37763855fac7af128..4f245a07705331324e3a2730411bec992f404dce 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -22,7 +22,7 @@ //#include <ftl/display.hpp> #include <ftl/rgbd/streamer.hpp> #include <ftl/net/universe.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> #include <nlohmann/json.hpp> #include "opencv2/imgproc.hpp" @@ -50,7 +50,7 @@ using json = nlohmann::json; static void run(ftl::Configurable *root) { Universe *net = ftl::create<Universe>(root, "net"); - ftl::ctrl::Slave slave(net, root); + ftl::ctrl::Master ctrl(root, net); auto paths = root->get<vector<string>>("paths"); string file = ""; @@ -84,7 +84,7 @@ static void run(ftl::Configurable *root) { ftl::timer::start(true); LOG(INFO) << "Stopping..."; - slave.stop(); + ctrl.stop(); stream->stop(); net->shutdown(); diff --git a/components/control/cpp/CMakeLists.txt b/components/control/cpp/CMakeLists.txt index f55ec9c19f0e6c30c3eafbaf203ee6a2e49b4ceb..3f6d4a29932ccd0cdadca61227de914d55d0f4b0 100644 --- a/components/control/cpp/CMakeLists.txt +++ b/components/control/cpp/CMakeLists.txt @@ -1,5 +1,4 @@ add_library(ftlctrl - src/slave.cpp src/master.cpp ) diff --git a/components/control/cpp/include/ftl/master.hpp b/components/control/cpp/include/ftl/master.hpp index 9b658aaadbedccdd86c452318ef1832230836ef7..c4782d16ea09928df9d8d3ca5273d84f5a86cf1c 100644 --- a/components/control/cpp/include/ftl/master.hpp +++ b/components/control/cpp/include/ftl/master.hpp @@ -10,8 +10,15 @@ #include <Eigen/Eigen> namespace ftl { + +class NetConfigurable; + namespace ctrl { +struct SystemState { + bool paused; +}; + struct LogEvent { int verbosity; std::string preamble; @@ -43,7 +50,7 @@ class Master { std::vector<std::string> getConfigurables(const ftl::UUID &peer); - std::vector<ftl::config::json_t> getSlaves(); + std::vector<ftl::config::json_t> getControllers(); std::vector<ftl::config::json_t> get(const std::string &uri); @@ -51,12 +58,26 @@ class Master { ftl::config::json_t get(const ftl::UUID &peer, const std::string &uri); + ftl::config::json_t getConfigurable(const ftl::UUID &peer, const std::string &uri); + void watch(const std::string &uri, std::function<void()> f); Eigen::Matrix4d getPose(const std::string &uri); void setPose(const std::string &uri, const Eigen::Matrix4d &pose); + /** + * Clean up to remove log and status forwarding over the network. + */ + void stop(); + + /** + * Do not call! Automatically called from logging subsystem. + */ + void sendLog(const loguru::Message& message); + + bool isPaused() const { return state_.paused; } + // Events //void onError(); @@ -72,6 +93,12 @@ class Master { std::vector<std::function<void(const LogEvent&)>> log_handlers_; ftl::Configurable *root_; ftl::net::Universe *net_; + std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables_; + std::vector<ftl::UUID> log_peers_; + RECURSIVE_MUTEX mutex_; + bool in_log_; + bool active_; + SystemState state_; }; } diff --git a/components/control/cpp/include/ftl/slave.hpp b/components/control/cpp/include/ftl/slave.hpp deleted file mode 100644 index e3ebd69e46fa5177eebbcc2d8d6b2f2cce70f204..0000000000000000000000000000000000000000 --- a/components/control/cpp/include/ftl/slave.hpp +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef _FTL_CTRL_SLAVE_HPP_ -#define _FTL_CTRL_SLAVE_HPP_ - -#include <ftl/net/universe.hpp> -#include <ftl/configurable.hpp> -#include <loguru.hpp> -#include <ftl/threads.hpp> - -namespace ftl { -namespace ctrl { - -struct SystemState { - bool paused; -}; - -/** - * Allows a node to be remote controlled and observed over the network. All - * such nodes should create a single instance of this class, but must call - * "stop()" before terminating the network. - */ -class Slave { - public: - Slave(ftl::net::Universe *, ftl::Configurable *); - ~Slave(); - - /** - * Clean up to remove log and status forwarding over the network. - */ - void stop(); - - /** - * Do not call! Automatically called from logging subsystem. - */ - void sendLog(const loguru::Message& message); - - bool isPaused() const { return state_.paused; } - - private: - std::vector<ftl::UUID> log_peers_; - ftl::net::Universe *net_; - RECURSIVE_MUTEX mutex_; - bool in_log_; - bool active_; - SystemState state_; -}; - -} -} - -#endif // _FTL_CTRL_SLAVE_HPP_ diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index a13a67a90b4bc7ccc1bd2c893580fc286dfb330a..059dfe9144a91d0a7d706de811ce353f0050a2ac 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -1,4 +1,5 @@ #include <ftl/master.hpp> +#include <ftl/net_configurable.hpp> using ftl::ctrl::Master; using ftl::net::Universe; @@ -11,6 +12,51 @@ using ftl::ctrl::LogEvent; Master::Master(Configurable *root, Universe *net) : root_(root), net_(net) { + // Init system state + state_.paused = false; + + net->bind("restart", []() { + LOG(WARNING) << "Remote restart..."; + //exit(1); + ftl::exit_code = 1; + ftl::running = false; + }); + + net->bind("shutdown", []() { + LOG(WARNING) << "Remote shutdown..."; + //exit(0); + ftl::running = false; + }); + + net->bind("pause", [this]() { + state_.paused = !state_.paused; + }); + + net->bind("update_cfg", [](const std::string &uri, const std::string &value) { + ftl::config::update(uri, nlohmann::json::parse(value)); + }); + + net->bind("get_cfg", [](const std::string &uri) -> std::string { + return ftl::config::resolve(uri, false).dump(); + }); + + net->bind("get_configurable", [](const std::string &uri) -> std::string { + return ftl::config::find(uri)->getConfig().dump(); + }); + + net->bind("list_configurables", []() { + return ftl::config::list(); + }); + + net->bind("log_subscribe", [this](const ftl::UUID &peer) { + UNIQUE_LOCK(mutex_, lk); + log_peers_.push_back(peer); + }); + + net->bind("connect", [this](const std::string &url) { + net_->connect(url); + }); + net->bind("log", [this](int v, const std::string &pre, const std::string &msg) { for (auto f : log_handlers_) { f({v,pre,msg}); @@ -20,16 +66,35 @@ Master::Master(Configurable *root, Universe *net) net->bind("node_details", [net,root]() -> std::vector<std::string> { ftl::config::json_t json { {"id", net->id().to_string()}, - {"title", root->value("title", *root->get<string>("$id"))}, - {"kind", "master"} + {"title", root->value("title", *root->get<string>("$id"))} }; return {json.dump()}; }); //net->broadcast("log_subscribe", net->id()); - net->onConnect([this](ftl::net::Peer*) { + net->onConnect([this](ftl::net::Peer *p) { //net_->broadcast("log_subscribe", net_->id()); + ftl::UUID peer = p->id(); + auto cs = getConfigurables(peer); + for (auto c : cs) { + //LOG(INFO) << "NET CONFIG: " << c; + ftl::config::json_t *configuration = new ftl::config::json_t; + *configuration = getConfigurable(peer, c); + if (!configuration->empty()) { + ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *this, *configuration); + peerConfigurables_[peer].push_back(nc); + } + } + }); + + net->onDisconnect([this](ftl::net::Peer *p) { + ftl::UUID peer = p->id(); + for (ftl::NetConfigurable *nc : peerConfigurables_[peer]) { + ftl::config::json_t *configuration = &(nc->getConfig()); + delete nc; + delete configuration; + } }); } @@ -70,7 +135,7 @@ void Master::set(const ftl::UUID &peer, const string &uri, const json_t &value) net_->send(peer, "update_cfg", uri, value.dump()); } -vector<json_t> Master::getSlaves() { +vector<json_t> Master::getControllers() { auto response = net_->findAll<string>("node_details"); vector<json_t> result; for (auto &r : response) { @@ -105,6 +170,10 @@ json_t Master::get(const ftl::UUID &peer, const string &uri) { return json_t::parse(net_->call<string>(peer, "get_cfg", uri)); } +json_t Master::getConfigurable(const ftl::UUID &peer, const string &uri) { + return json_t::parse(net_->call<string>(peer, "get_configurable", uri)); +} + void Master::watch(const string &uri, function<void()> f) { } @@ -133,4 +202,34 @@ void Master::setPose(const std::string &uri, const Eigen::Matrix4d &pose) { //void onError(); void Master::onLog(function<void(const LogEvent &)> h) { log_handlers_.push_back(h); +} + +void Master::stop() { + if (!active_) return; + active_ = false; + loguru::remove_all_callbacks(); + net_->unbind("restart"); + net_->unbind("shutdown"); + net_->unbind("update_cfg"); + net_->unbind("get_cfg"); + net_->unbind("slave_details"); // TODO: Remove + net_->unbind("log_subscribe"); +} + +void Master::sendLog(const loguru::Message& message) { + UNIQUE_LOCK(mutex_, lk); + if (in_log_) return; + in_log_ = true; + + for (auto &p : log_peers_) { + auto peer = net_->getPeer(p); + if (!peer || !peer->isConnected()) continue; + + std::cout << "sending log to master..." << std::endl; + if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) { + // TODO(Nick) Remove peer from loggers list... + } + } + + in_log_ = false; } \ No newline at end of file diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp deleted file mode 100644 index cd2d0f1ff03fa3aa88d94420a6811aac63c9c7b9..0000000000000000000000000000000000000000 --- a/components/control/cpp/src/slave.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include <ftl/slave.hpp> - -#include <ftl/threads.hpp> - -using ftl::Configurable; -using ftl::net::Universe; -using ftl::ctrl::Slave; -using std::string; - -// static void netLog(void* user_data, const loguru::Message& message) { -// Slave *slave = static_cast<Slave*>(user_data); -// slave->sendLog(message); -// } - -Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false), active_(true) { - - // Init system state - state_.paused = false; - - net->bind("restart", []() { - LOG(WARNING) << "Remote restart..."; - //exit(1); - ftl::exit_code = 1; - ftl::running = false; - }); - - net->bind("shutdown", []() { - LOG(WARNING) << "Remote shutdown..."; - //exit(0); - ftl::running = false; - }); - - net->bind("pause", [this]() { - state_.paused = !state_.paused; - }); - - net->bind("update_cfg", [](const std::string &uri, const std::string &value) { - ftl::config::update(uri, nlohmann::json::parse(value)); - }); - - net->bind("get_cfg", [](const std::string &uri) -> std::string { - return ftl::config::resolve(uri, false).dump(); - }); - - net->bind("list_configurables", []() { - return ftl::config::list(); - }); - - net->bind("node_details", [net,root]() -> std::vector<std::string> { - ftl::config::json_t json { - {"id", net->id().to_string()}, - {"title", root->value("title", *root->get<string>("$id"))}, - {"kind", "slave"} - }; - return {json.dump()}; - }); - - net->bind("log_subscribe", [this](const ftl::UUID &peer) { - UNIQUE_LOCK(mutex_, lk); - log_peers_.push_back(peer); - }); - - net->bind("connect", [this](const std::string &url) { - net_->connect(url); - }); - - //net->onConnect([this](ftl::net::Peer *peer) { - // net_->broadcast("new_peer", peer->id()); - //}); - - //loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO); -} - -Slave::~Slave() { - stop(); -} - -void Slave::stop() { - if (!active_) return; - active_ = false; - loguru::remove_all_callbacks(); - net_->unbind("restart"); - net_->unbind("shutdown"); - net_->unbind("update_cfg"); - net_->unbind("get_cfg"); - net_->unbind("slave_details"); - net_->unbind("log_subscribe"); -} - -void Slave::sendLog(const loguru::Message& message) { - UNIQUE_LOCK(mutex_, lk); - if (in_log_) return; - in_log_ = true; - - for (auto &p : log_peers_) { - auto peer = net_->getPeer(p); - if (!peer || !peer->isConnected()) continue; - - std::cout << "sending log to master..." << std::endl; - if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) { - // TODO(Nick) Remove peer from loggers list... - } - } - - in_log_ = false; -} diff --git a/components/net/cpp/src/net_configurable.cpp b/components/net/cpp/src/net_configurable.cpp index cf597c5c77eec05205b6bf0de3f360c7fac178b2..00852c666b1024b4c61b54b8f937ce88d07039e6 100644 --- a/components/net/cpp/src/net_configurable.cpp +++ b/components/net/cpp/src/net_configurable.cpp @@ -12,5 +12,5 @@ void ftl::NetConfigurable::inject(const std::string &name, nlohmann::json &value } void ftl::NetConfigurable::refresh() { - (*config_) = ctrl.get(peer, suri); + (*config_) = ctrl.getConfigurable(peer, suri); } diff --git a/components/net/cpp/test/net_configurable_unit.cpp b/components/net/cpp/test/net_configurable_unit.cpp index c8bf42247ef3f23023b25c159a968deda1fd419b..e3fef144490f64ce06375ad8226056afd2f5329f 100644 --- a/components/net/cpp/test/net_configurable_unit.cpp +++ b/components/net/cpp/test/net_configurable_unit.cpp @@ -1,6 +1,6 @@ #include "catch.hpp" #include <ftl/net_configurable.hpp> -#include <ftl/slave.hpp> +#include <ftl/master.hpp> using ftl::NetConfigurable; @@ -14,20 +14,20 @@ SCENARIO( "NetConfigurable::set()" ) { net->start(); ftl::ctrl::Master *controller = new ftl::ctrl::Master(root, net); - // Set up a slave, then call getSlaves() to get the UUID string + // Set up a slave, then call getControllers() to get the UUID string nlohmann::json jsonSlave = {{"$id", "slave"}, {"test", {{"peers", {"tcp://localhost:7077"}}}}}; ftl::Configurable *rootSlave; rootSlave = new ftl::Configurable(jsonSlave); ftl::net::Universe *netSlave = ftl::config::create<ftl::net::Universe>(rootSlave, std::string("test")); - ftl::ctrl::Slave slave(netSlave, rootSlave); + ftl::ctrl::Master ctrl(rootSlave, netSlave); netSlave->start(); netSlave->waitConnections(); net->waitConnections(); - auto slaves = controller->getSlaves(); - REQUIRE( slaves.size() == 1 ); + auto controllers = controller->getControllers(); + REQUIRE( controllers.size() == 1 ); - ftl::UUID peer = ftl::UUID(slaves[0]["id"].get<std::string>()); + ftl::UUID peer = ftl::UUID(controllers[0]["id"].get<std::string>()); const std::string suri = "slave_test"; nlohmann::json jsonTest = {{"$id", "slave_test"}, {"test", {{"peers", {"tcp://localhost:7077"}}}}}; NetConfigurable nc(peer, suri, *controller, jsonTest);