diff --git a/components/common/cpp/include/ftl/configuration.hpp b/components/common/cpp/include/ftl/configuration.hpp index 531852ca5fbdc7f7be0eceda9eb32e341842dd7c..046b58a07bf536a691a970b3a9d93bf4b5ab119b 100644 --- a/components/common/cpp/include/ftl/configuration.hpp +++ b/components/common/cpp/include/ftl/configuration.hpp @@ -119,7 +119,6 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS if (entity.is_object()) { if (!entity["$id"].is_string()) { - // TODO(Nick) Check for # in URI std::string id_str = *parent->get<std::string>("$id"); if (id_str.find('#') != std::string::npos) { entity["$id"] = id_str + std::string("/") + name; diff --git a/components/control/cpp/include/ftl/slave.hpp b/components/control/cpp/include/ftl/slave.hpp index e61290d630c1f10bd38335b228d9657b17ab6de7..08e973339af0154efb089778c0f38d284b540cdb 100644 --- a/components/control/cpp/include/ftl/slave.hpp +++ b/components/control/cpp/include/ftl/slave.hpp @@ -11,6 +11,9 @@ class Slave { public: Slave(ftl::net::Universe *, ftl::Configurable *); ~Slave(); + + private: + ftl::net::Universe *net_; }; } diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 004d2f7cf79741ea95b18a612d28c216540984c5..33ea746b7d0364d56c8d1b410386a09c4f3b846a 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -19,7 +19,7 @@ Master::Master(Configurable *root, Universe *net) } Master::~Master() { - + net_->unbind("log"); } void Master::restart() { diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index 8485c825ef8f22acaa345803c8fdef6fbbbcaf35..076584230b9948eff6c6caaded14438bd718363a 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -10,7 +10,7 @@ static void netLog(void* user_data, const loguru::Message& message) { //net->publish("log", message.preamble, message.message); } -Slave::Slave(Universe *net, ftl::Configurable *root) { +Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) { net->bind("restart", []() { LOG(WARNING) << "Remote restart..."; //exit(1); @@ -36,5 +36,8 @@ Slave::Slave(Universe *net, ftl::Configurable *root) { } Slave::~Slave() { - + net_->unbind("restart"); + net_->unbind("shutdown"); + net_->unbind("update_cfg"); + net_->unbind("get_cfg"); } diff --git a/components/net/cpp/include/ftl/net/dispatcher.hpp b/components/net/cpp/include/ftl/net/dispatcher.hpp index 7291adb43c0daa6a516c968254fa45f5f8dd9e6f..0b4d3b34dcb4e1662a43c834abe740319c1f56d2 100644 --- a/components/net/cpp/include/ftl/net/dispatcher.hpp +++ b/components/net/cpp/include/ftl/net/dispatcher.hpp @@ -119,6 +119,13 @@ class Dispatcher { return std::make_unique<msgpack::object_handle>(result, std::move(z)); })); } + + void unbind(const std::string &name) { + auto i = funcs_.find(name); + if (i != funcs_.end()) { + funcs_.erase(i); + } + } std::vector<std::string> getBindings() const; diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 21a5da2eaa01fe48ec84f1e06aa6a9f67d1e77bd..0c02dbe4540d3f876ba709d7a2ee6b479e207c35 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -80,7 +80,9 @@ class Universe : public ftl::Configurable { */ template <typename F> void bind(const std::string &name, F func); - + + void unbind(const std::string &name); + /** * Subscribe a function to a resource. The subscribed function is * triggered whenever that resource is published to. It is akin to @@ -205,6 +207,7 @@ class Universe : public ftl::Configurable { template <typename F> void Universe::bind(const std::string &name, F func) { + // CHECK Need mutex? disp_.bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::args_kind()); diff --git a/components/net/cpp/src/dispatcher.cpp b/components/net/cpp/src/dispatcher.cpp index 7bfd7352c27010b80ac13005ffb63c4a1a2add80..719ae2b65fe9b0cdd682c95cf03f33ca688c1768 100644 --- a/components/net/cpp/src/dispatcher.cpp +++ b/components/net/cpp/src/dispatcher.cpp @@ -155,7 +155,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_ void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) { auto pos = funcs_.find(func); if (pos != end(funcs_)) { - LOG(ERROR) << "RPC non unique binding for " << func; + LOG(FATAL) << "RPC non unique binding for '" << func << "'"; throw -1; } } diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 04404e8da0030f8e267adbe0bd485c1eb1da9aab..750258c3600f9b51ca39cd1ed08ad6d6a9bb6be8 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -96,6 +96,11 @@ Peer *Universe::connect(const string &addr) { return p; } +void Universe::unbind(const std::string &name) { + unique_lock<mutex> lk(net_mutex_); + disp_.unbind(name); +} + int Universe::waitConnections() { int count = 0; for (auto p : peers_) { diff --git a/components/rgbd-sources/src/net_source.cpp b/components/rgbd-sources/src/net_source.cpp index 7a9d3cbe21835daed8a704204f238639c15c0203..cc2e8461af7c61effa56fafc20d19e2cc840d39b 100644 --- a/components/rgbd-sources/src/net_source.cpp +++ b/components/rgbd-sources/src/net_source.cpp @@ -68,7 +68,12 @@ NetSource::NetSource(nlohmann::json &config, ftl::net::Universe *net) } NetSource::~NetSource() { - // TODO Unsubscribe + auto uri = get<string>("uri"); + + // TODO(Nick) If URI changes then must unbind + rebind. + if (uri) { + net_->unbind(*uri); + } } void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) { diff --git a/components/rgbd-sources/src/rgbd_streamer.cpp b/components/rgbd-sources/src/rgbd_streamer.cpp index 6aefb9725441562f7dc079d8191fb102315fb5db..a5b9804b4f50202830de0d00fd5dafdbba4a8e1b 100644 --- a/components/rgbd-sources/src/rgbd_streamer.cpp +++ b/components/rgbd-sources/src/rgbd_streamer.cpp @@ -69,7 +69,12 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) } Streamer::~Streamer() { - // TODO Unbind everything from net.... + net_->unbind("find_stream"); + net_->unbind("list_streams"); + net_->unbind("source_calibration"); + net_->unbind("get_stream"); + net_->unbind("sync_streams"); + net_->unbind("ping_streamer"); pool_.stop(); }