Skip to content
Snippets Groups Projects
Commit c31ae9bc authored by Iiro Rastas's avatar Iiro Rastas
Browse files

Combine Master and Slave and remove ConfigProxy

The functionality of Master and Slave is now in a single class, which is
currently named Master. NetConfigurables are added and removed in this
class instead of the GUI, so that all applications can now access
NetConfigurables. Due to this change, ConfigProxy has been removed from
reconstruction.
parent 7bd2292a
No related branches found
No related tags found
1 merge request!203Resolves #262 and resolves #126
Pipeline #17229 passed
...@@ -23,32 +23,6 @@ int main(int argc, char **argv) { ...@@ -23,32 +23,6 @@ int main(int argc, char **argv) {
} }
}); });
std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables;
// FIXME: Move this elsewhere, it is not just for GUI
net->onConnect([&controller, &peerConfigurables](ftl::net::Peer *p) {
ftl::UUID peer = p->id();
auto cs = controller->getConfigurables(peer);
for (auto c : cs) {
//LOG(INFO) << "NET CONFIG: " << c;
ftl::config::json_t *configuration = new ftl::config::json_t;
*configuration = controller->get(peer, c);
if (!configuration->empty()) {
ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *controller, *configuration);
peerConfigurables[peer].push_back(nc);
}
}
});
net->onDisconnect([&peerConfigurables](ftl::net::Peer *p) {
ftl::UUID peer = p->id();
for (ftl::NetConfigurable *nc : peerConfigurables[peer]) {
ftl::config::json_t *configuration = &(nc->getConfig());
delete nc;
delete configuration;
}
});
net->start(); net->start();
net->waitConnections(); net->waitConnections();
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
#include <ftl/rgbd.hpp> #include <ftl/rgbd.hpp>
#include <ftl/rgbd/virtual.hpp> #include <ftl/rgbd/virtual.hpp>
#include <ftl/rgbd/streamer.hpp> #include <ftl/rgbd/streamer.hpp>
#include <ftl/slave.hpp> #include <ftl/master.hpp>
#include <ftl/rgbd/group.hpp> #include <ftl/rgbd/group.hpp>
#include <ftl/threads.hpp> #include <ftl/threads.hpp>
#include <ftl/codecs/writer.hpp> #include <ftl/codecs/writer.hpp>
...@@ -82,58 +82,9 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { ...@@ -82,58 +82,9 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
return rz * rx * ry; return rz * rx * ry;
} }
// TODO: * Remove this class (requires more general solution). Also does not
// process disconnections/reconnections/types etc. correctly.
// * Update when new options become available.
class ConfigProxy {
private:
vector<ftl::UUID> peers_;
vector<std::string> uris_;
ftl::net::Universe *net_;
public:
ConfigProxy(ftl::net::Universe *net) {
net_ = net;
auto response = net_->findAll<std::string>("node_details");
for (auto &r : response) {
auto r_json = json_t::parse(r);
peers_.push_back(ftl::UUID(r_json["id"].get<std::string>()));
uris_.push_back(r_json["title"].get<std::string>());
}
}
void add(ftl::Configurable *root, const std::string &uri, const std::string &name) {
auto config = json_t::parse(net_->call<string>(peers_[0], "get_cfg", uris_[0] + "/" + uri));
auto *proxy = ftl::create<ftl::Configurable>(root, name);
try {
for (auto &itm : config.get<json::object_t>()) {
auto key = itm.first;
auto value = itm.second;
if (*key.begin() == '$') { continue; }
proxy->set(key, value);
proxy->on(key, [this, uri, key, value, proxy](const ftl::config::Event&) {
for (size_t i = 0; i < uris_.size(); i++) {
// TODO: check that config exists?
auto peer = peers_[i];
std::string name = uris_[i] + "/" + uri + "/" + key;
net_->send(peer, "update_cfg", name, proxy->getConfig()[key].dump());
}
});
}
}
catch (nlohmann::detail::type_error) {
LOG(ERROR) << "Failed to add config proxy for: " << uri << "/" << name;
}
}
};
static void run(ftl::Configurable *root) { static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net"); Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root); ftl::ctrl::Master ctrl(root, net);
// Controls // Controls
auto *controls = ftl::create<ftl::Configurable>(root, "controls"); auto *controls = ftl::create<ftl::Configurable>(root, "controls");
...@@ -192,17 +143,6 @@ static void run(ftl::Configurable *root) { ...@@ -192,17 +143,6 @@ static void run(ftl::Configurable *root) {
return; return;
} }
ConfigProxy *configproxy = nullptr;
if (net->numberOfPeers() > 0) {
configproxy = new ConfigProxy(net); // TODO delete
auto *disparity = ftl::create<ftl::Configurable>(root, "disparity");
configproxy->add(disparity, "source/disparity/algorithm", "algorithm");
configproxy->add(disparity, "source/disparity/bilateral_filter", "bilateral_filter");
configproxy->add(disparity, "source/disparity/optflow_filter", "optflow_filter");
configproxy->add(disparity, "source/disparity/mls", "mls");
configproxy->add(disparity, "source/disparity/cross", "cross");
}
// Must find pose for each source... // Must find pose for each source...
if (sources.size() > 1) { if (sources.size() > 1) {
std::map<std::string, Eigen::Matrix4d> transformations; std::map<std::string, Eigen::Matrix4d> transformations;
...@@ -382,7 +322,7 @@ static void run(ftl::Configurable *root) { ...@@ -382,7 +322,7 @@ static void run(ftl::Configurable *root) {
LOG(INFO) << "Shutting down..."; LOG(INFO) << "Shutting down...";
ftl::timer::stop(); ftl::timer::stop();
slave.stop(); ctrl.stop();
net->shutdown(); net->shutdown();
ftl::pool.stop(); ftl::pool.stop();
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include <ftl/rgbd.hpp> #include <ftl/rgbd.hpp>
#include <ftl/rgbd/virtual.hpp> #include <ftl/rgbd/virtual.hpp>
#include <ftl/rgbd/streamer.hpp> #include <ftl/rgbd/streamer.hpp>
#include <ftl/slave.hpp> #include <ftl/master.hpp>
#include <ftl/rgbd/group.hpp> #include <ftl/rgbd/group.hpp>
#include <ftl/threads.hpp> #include <ftl/threads.hpp>
#include <ftl/codecs/writer.hpp> #include <ftl/codecs/writer.hpp>
...@@ -64,7 +64,7 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) { ...@@ -64,7 +64,7 @@ static Eigen::Affine3d create_rotation_matrix(float ax, float ay, float az) {
static void run(ftl::Configurable *root) { static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net"); Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root); ftl::ctrl::Master ctrl(root, net);
// Controls // Controls
auto *controls = ftl::create<ftl::Configurable>(root, "controls"); auto *controls = ftl::create<ftl::Configurable>(root, "controls");
...@@ -202,7 +202,7 @@ static void run(ftl::Configurable *root) { ...@@ -202,7 +202,7 @@ static void run(ftl::Configurable *root) {
LOG(INFO) << "Shutting down..."; LOG(INFO) << "Shutting down...";
ftl::timer::stop(); ftl::timer::stop();
slave.stop(); ctrl.stop();
net->shutdown(); net->shutdown();
ftl::pool.stop(); ftl::pool.stop();
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
//#include <ftl/display.hpp> //#include <ftl/display.hpp>
#include <ftl/rgbd/streamer.hpp> #include <ftl/rgbd/streamer.hpp>
#include <ftl/net/universe.hpp> #include <ftl/net/universe.hpp>
#include <ftl/slave.hpp> #include <ftl/master.hpp>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include "opencv2/imgproc.hpp" #include "opencv2/imgproc.hpp"
...@@ -50,7 +50,7 @@ using json = nlohmann::json; ...@@ -50,7 +50,7 @@ using json = nlohmann::json;
static void run(ftl::Configurable *root) { static void run(ftl::Configurable *root) {
Universe *net = ftl::create<Universe>(root, "net"); Universe *net = ftl::create<Universe>(root, "net");
ftl::ctrl::Slave slave(net, root); ftl::ctrl::Master ctrl(root, net);
auto paths = root->get<vector<string>>("paths"); auto paths = root->get<vector<string>>("paths");
string file = ""; string file = "";
...@@ -84,7 +84,7 @@ static void run(ftl::Configurable *root) { ...@@ -84,7 +84,7 @@ static void run(ftl::Configurable *root) {
ftl::timer::start(true); ftl::timer::start(true);
LOG(INFO) << "Stopping..."; LOG(INFO) << "Stopping...";
slave.stop(); ctrl.stop();
stream->stop(); stream->stop();
net->shutdown(); net->shutdown();
......
add_library(ftlctrl add_library(ftlctrl
src/slave.cpp
src/master.cpp src/master.cpp
) )
......
...@@ -10,8 +10,15 @@ ...@@ -10,8 +10,15 @@
#include <Eigen/Eigen> #include <Eigen/Eigen>
namespace ftl { namespace ftl {
class NetConfigurable;
namespace ctrl { namespace ctrl {
struct SystemState {
bool paused;
};
struct LogEvent { struct LogEvent {
int verbosity; int verbosity;
std::string preamble; std::string preamble;
...@@ -57,6 +64,18 @@ class Master { ...@@ -57,6 +64,18 @@ class Master {
void setPose(const std::string &uri, const Eigen::Matrix4d &pose); void setPose(const std::string &uri, const Eigen::Matrix4d &pose);
/**
* Clean up to remove log and status forwarding over the network.
*/
void stop();
/**
* Do not call! Automatically called from logging subsystem.
*/
void sendLog(const loguru::Message& message);
bool isPaused() const { return state_.paused; }
// Events // Events
//void onError(); //void onError();
...@@ -72,6 +91,12 @@ class Master { ...@@ -72,6 +91,12 @@ class Master {
std::vector<std::function<void(const LogEvent&)>> log_handlers_; std::vector<std::function<void(const LogEvent&)>> log_handlers_;
ftl::Configurable *root_; ftl::Configurable *root_;
ftl::net::Universe *net_; ftl::net::Universe *net_;
std::map<ftl::UUID, std::vector<ftl::NetConfigurable*>> peerConfigurables_;
std::vector<ftl::UUID> log_peers_;
RECURSIVE_MUTEX mutex_;
bool in_log_;
bool active_;
SystemState state_;
}; };
} }
......
#ifndef _FTL_CTRL_SLAVE_HPP_
#define _FTL_CTRL_SLAVE_HPP_
#include <ftl/net/universe.hpp>
#include <ftl/configurable.hpp>
#include <loguru.hpp>
#include <ftl/threads.hpp>
namespace ftl {
namespace ctrl {
struct SystemState {
bool paused;
};
/**
* Allows a node to be remote controlled and observed over the network. All
* such nodes should create a single instance of this class, but must call
* "stop()" before terminating the network.
*/
class Slave {
public:
Slave(ftl::net::Universe *, ftl::Configurable *);
~Slave();
/**
* Clean up to remove log and status forwarding over the network.
*/
void stop();
/**
* Do not call! Automatically called from logging subsystem.
*/
void sendLog(const loguru::Message& message);
bool isPaused() const { return state_.paused; }
private:
std::vector<ftl::UUID> log_peers_;
ftl::net::Universe *net_;
RECURSIVE_MUTEX mutex_;
bool in_log_;
bool active_;
SystemState state_;
};
}
}
#endif // _FTL_CTRL_SLAVE_HPP_
#include <ftl/master.hpp> #include <ftl/master.hpp>
#include <ftl/net_configurable.hpp>
using ftl::ctrl::Master; using ftl::ctrl::Master;
using ftl::net::Universe; using ftl::net::Universe;
...@@ -11,6 +12,47 @@ using ftl::ctrl::LogEvent; ...@@ -11,6 +12,47 @@ using ftl::ctrl::LogEvent;
Master::Master(Configurable *root, Universe *net) Master::Master(Configurable *root, Universe *net)
: root_(root), net_(net) { : root_(root), net_(net) {
// Init system state
state_.paused = false;
net->bind("restart", []() {
LOG(WARNING) << "Remote restart...";
//exit(1);
ftl::exit_code = 1;
ftl::running = false;
});
net->bind("shutdown", []() {
LOG(WARNING) << "Remote shutdown...";
//exit(0);
ftl::running = false;
});
net->bind("pause", [this]() {
state_.paused = !state_.paused;
});
net->bind("update_cfg", [](const std::string &uri, const std::string &value) {
ftl::config::update(uri, nlohmann::json::parse(value));
});
net->bind("get_cfg", [](const std::string &uri) -> std::string {
return ftl::config::resolve(uri, false).dump();
});
net->bind("list_configurables", []() {
return ftl::config::list();
});
net->bind("log_subscribe", [this](const ftl::UUID &peer) {
UNIQUE_LOCK(mutex_, lk);
log_peers_.push_back(peer);
});
net->bind("connect", [this](const std::string &url) {
net_->connect(url);
});
net->bind("log", [this](int v, const std::string &pre, const std::string &msg) { net->bind("log", [this](int v, const std::string &pre, const std::string &msg) {
for (auto f : log_handlers_) { for (auto f : log_handlers_) {
f({v,pre,msg}); f({v,pre,msg});
...@@ -20,16 +62,35 @@ Master::Master(Configurable *root, Universe *net) ...@@ -20,16 +62,35 @@ Master::Master(Configurable *root, Universe *net)
net->bind("node_details", [net,root]() -> std::vector<std::string> { net->bind("node_details", [net,root]() -> std::vector<std::string> {
ftl::config::json_t json { ftl::config::json_t json {
{"id", net->id().to_string()}, {"id", net->id().to_string()},
{"title", root->value("title", *root->get<string>("$id"))}, {"title", root->value("title", *root->get<string>("$id"))}
{"kind", "master"}
}; };
return {json.dump()}; return {json.dump()};
}); });
//net->broadcast("log_subscribe", net->id()); //net->broadcast("log_subscribe", net->id());
net->onConnect([this](ftl::net::Peer*) { net->onConnect([this](ftl::net::Peer *p) {
//net_->broadcast("log_subscribe", net_->id()); //net_->broadcast("log_subscribe", net_->id());
ftl::UUID peer = p->id();
auto cs = getConfigurables(peer);
for (auto c : cs) {
//LOG(INFO) << "NET CONFIG: " << c;
ftl::config::json_t *configuration = new ftl::config::json_t;
*configuration = get(peer, c);
if (!configuration->empty()) {
ftl::NetConfigurable *nc = new ftl::NetConfigurable(peer, c, *this, *configuration);
peerConfigurables_[peer].push_back(nc);
}
}
});
net->onDisconnect([this](ftl::net::Peer *p) {
ftl::UUID peer = p->id();
for (ftl::NetConfigurable *nc : peerConfigurables_[peer]) {
ftl::config::json_t *configuration = &(nc->getConfig());
delete nc;
delete configuration;
}
}); });
} }
...@@ -134,3 +195,33 @@ void Master::setPose(const std::string &uri, const Eigen::Matrix4d &pose) { ...@@ -134,3 +195,33 @@ void Master::setPose(const std::string &uri, const Eigen::Matrix4d &pose) {
void Master::onLog(function<void(const LogEvent &)> h) { void Master::onLog(function<void(const LogEvent &)> h) {
log_handlers_.push_back(h); log_handlers_.push_back(h);
} }
void Master::stop() {
if (!active_) return;
active_ = false;
loguru::remove_all_callbacks();
net_->unbind("restart");
net_->unbind("shutdown");
net_->unbind("update_cfg");
net_->unbind("get_cfg");
net_->unbind("slave_details"); // TODO: Remove
net_->unbind("log_subscribe");
}
void Master::sendLog(const loguru::Message& message) {
UNIQUE_LOCK(mutex_, lk);
if (in_log_) return;
in_log_ = true;
for (auto &p : log_peers_) {
auto peer = net_->getPeer(p);
if (!peer || !peer->isConnected()) continue;
std::cout << "sending log to master..." << std::endl;
if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) {
// TODO(Nick) Remove peer from loggers list...
}
}
in_log_ = false;
}
\ No newline at end of file
#include <ftl/slave.hpp>
#include <ftl/threads.hpp>
using ftl::Configurable;
using ftl::net::Universe;
using ftl::ctrl::Slave;
using std::string;
// static void netLog(void* user_data, const loguru::Message& message) {
// Slave *slave = static_cast<Slave*>(user_data);
// slave->sendLog(message);
// }
Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false), active_(true) {
// Init system state
state_.paused = false;
net->bind("restart", []() {
LOG(WARNING) << "Remote restart...";
//exit(1);
ftl::exit_code = 1;
ftl::running = false;
});
net->bind("shutdown", []() {
LOG(WARNING) << "Remote shutdown...";
//exit(0);
ftl::running = false;
});
net->bind("pause", [this]() {
state_.paused = !state_.paused;
});
net->bind("update_cfg", [](const std::string &uri, const std::string &value) {
ftl::config::update(uri, nlohmann::json::parse(value));
});
net->bind("get_cfg", [](const std::string &uri) -> std::string {
return ftl::config::resolve(uri, false).dump();
});
net->bind("list_configurables", []() {
return ftl::config::list();
});
net->bind("node_details", [net,root]() -> std::vector<std::string> {
ftl::config::json_t json {
{"id", net->id().to_string()},
{"title", root->value("title", *root->get<string>("$id"))},
{"kind", "slave"}
};
return {json.dump()};
});
net->bind("log_subscribe", [this](const ftl::UUID &peer) {
UNIQUE_LOCK(mutex_, lk);
log_peers_.push_back(peer);
});
net->bind("connect", [this](const std::string &url) {
net_->connect(url);
});
//net->onConnect([this](ftl::net::Peer *peer) {
// net_->broadcast("new_peer", peer->id());
//});
//loguru::add_callback("net_log", netLog, this, loguru::Verbosity_INFO);
}
Slave::~Slave() {
stop();
}
void Slave::stop() {
if (!active_) return;
active_ = false;
loguru::remove_all_callbacks();
net_->unbind("restart");
net_->unbind("shutdown");
net_->unbind("update_cfg");
net_->unbind("get_cfg");
net_->unbind("slave_details");
net_->unbind("log_subscribe");
}
void Slave::sendLog(const loguru::Message& message) {
UNIQUE_LOCK(mutex_, lk);
if (in_log_) return;
in_log_ = true;
for (auto &p : log_peers_) {
auto peer = net_->getPeer(p);
if (!peer || !peer->isConnected()) continue;
std::cout << "sending log to master..." << std::endl;
if (!net_->send(p, "log", message.verbosity, message.preamble, message.message)) {
// TODO(Nick) Remove peer from loggers list...
}
}
in_log_ = false;
}
#include "catch.hpp" #include "catch.hpp"
#include <ftl/net_configurable.hpp> #include <ftl/net_configurable.hpp>
#include <ftl/slave.hpp> #include <ftl/master.hpp>
using ftl::NetConfigurable; using ftl::NetConfigurable;
...@@ -19,7 +19,7 @@ SCENARIO( "NetConfigurable::set()" ) { ...@@ -19,7 +19,7 @@ SCENARIO( "NetConfigurable::set()" ) {
ftl::Configurable *rootSlave; ftl::Configurable *rootSlave;
rootSlave = new ftl::Configurable(jsonSlave); rootSlave = new ftl::Configurable(jsonSlave);
ftl::net::Universe *netSlave = ftl::config::create<ftl::net::Universe>(rootSlave, std::string("test")); ftl::net::Universe *netSlave = ftl::config::create<ftl::net::Universe>(rootSlave, std::string("test"));
ftl::ctrl::Slave slave(netSlave, rootSlave); ftl::ctrl::Master ctrl(rootSlave, netSlave);
netSlave->start(); netSlave->start();
netSlave->waitConnections(); netSlave->waitConnections();
net->waitConnections(); net->waitConnections();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment