Skip to content
Snippets Groups Projects
Commit 0b1c5a49 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fixes #60 with unbinds

parent b78a36fb
No related branches found
No related tags found
No related merge requests found
...@@ -119,7 +119,6 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS ...@@ -119,7 +119,6 @@ T *ftl::config::create(ftl::Configurable *parent, const std::string &name, ARGS
if (entity.is_object()) { if (entity.is_object()) {
if (!entity["$id"].is_string()) { if (!entity["$id"].is_string()) {
// TODO(Nick) Check for # in URI
std::string id_str = *parent->get<std::string>("$id"); std::string id_str = *parent->get<std::string>("$id");
if (id_str.find('#') != std::string::npos) { if (id_str.find('#') != std::string::npos) {
entity["$id"] = id_str + std::string("/") + name; entity["$id"] = id_str + std::string("/") + name;
......
...@@ -11,6 +11,9 @@ class Slave { ...@@ -11,6 +11,9 @@ class Slave {
public: public:
Slave(ftl::net::Universe *, ftl::Configurable *); Slave(ftl::net::Universe *, ftl::Configurable *);
~Slave(); ~Slave();
private:
ftl::net::Universe *net_;
}; };
} }
......
...@@ -19,7 +19,7 @@ Master::Master(Configurable *root, Universe *net) ...@@ -19,7 +19,7 @@ Master::Master(Configurable *root, Universe *net)
} }
Master::~Master() { Master::~Master() {
net_->unbind("log");
} }
void Master::restart() { void Master::restart() {
......
...@@ -10,7 +10,7 @@ static void netLog(void* user_data, const loguru::Message& message) { ...@@ -10,7 +10,7 @@ static void netLog(void* user_data, const loguru::Message& message) {
//net->publish("log", message.preamble, message.message); //net->publish("log", message.preamble, message.message);
} }
Slave::Slave(Universe *net, ftl::Configurable *root) { Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) {
net->bind("restart", []() { net->bind("restart", []() {
LOG(WARNING) << "Remote restart..."; LOG(WARNING) << "Remote restart...";
//exit(1); //exit(1);
...@@ -36,5 +36,8 @@ Slave::Slave(Universe *net, ftl::Configurable *root) { ...@@ -36,5 +36,8 @@ Slave::Slave(Universe *net, ftl::Configurable *root) {
} }
Slave::~Slave() { Slave::~Slave() {
net_->unbind("restart");
net_->unbind("shutdown");
net_->unbind("update_cfg");
net_->unbind("get_cfg");
} }
...@@ -119,6 +119,13 @@ class Dispatcher { ...@@ -119,6 +119,13 @@ class Dispatcher {
return std::make_unique<msgpack::object_handle>(result, std::move(z)); return std::make_unique<msgpack::object_handle>(result, std::move(z));
})); }));
} }
void unbind(const std::string &name) {
auto i = funcs_.find(name);
if (i != funcs_.end()) {
funcs_.erase(i);
}
}
std::vector<std::string> getBindings() const; std::vector<std::string> getBindings() const;
......
...@@ -80,7 +80,9 @@ class Universe : public ftl::Configurable { ...@@ -80,7 +80,9 @@ class Universe : public ftl::Configurable {
*/ */
template <typename F> template <typename F>
void bind(const std::string &name, F func); void bind(const std::string &name, F func);
void unbind(const std::string &name);
/** /**
* Subscribe a function to a resource. The subscribed function is * Subscribe a function to a resource. The subscribed function is
* triggered whenever that resource is published to. It is akin to * triggered whenever that resource is published to. It is akin to
...@@ -205,6 +207,7 @@ class Universe : public ftl::Configurable { ...@@ -205,6 +207,7 @@ class Universe : public ftl::Configurable {
template <typename F> template <typename F>
void Universe::bind(const std::string &name, F func) { void Universe::bind(const std::string &name, F func) {
// CHECK Need mutex?
disp_.bind(name, func, disp_.bind(name, func,
typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::result_kind(),
typename ftl::internal::func_kind_info<F>::args_kind()); typename ftl::internal::func_kind_info<F>::args_kind());
......
...@@ -155,7 +155,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_ ...@@ -155,7 +155,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_
void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) { void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) {
auto pos = funcs_.find(func); auto pos = funcs_.find(func);
if (pos != end(funcs_)) { if (pos != end(funcs_)) {
LOG(ERROR) << "RPC non unique binding for " << func; LOG(FATAL) << "RPC non unique binding for '" << func << "'";
throw -1; throw -1;
} }
} }
......
...@@ -96,6 +96,11 @@ Peer *Universe::connect(const string &addr) { ...@@ -96,6 +96,11 @@ Peer *Universe::connect(const string &addr) {
return p; return p;
} }
void Universe::unbind(const std::string &name) {
unique_lock<mutex> lk(net_mutex_);
disp_.unbind(name);
}
int Universe::waitConnections() { int Universe::waitConnections() {
int count = 0; int count = 0;
for (auto p : peers_) { for (auto p : peers_) {
......
...@@ -68,7 +68,12 @@ NetSource::NetSource(nlohmann::json &config, ftl::net::Universe *net) ...@@ -68,7 +68,12 @@ NetSource::NetSource(nlohmann::json &config, ftl::net::Universe *net)
} }
NetSource::~NetSource() { NetSource::~NetSource() {
// TODO Unsubscribe auto uri = get<string>("uri");
// TODO(Nick) If URI changes then must unbind + rebind.
if (uri) {
net_->unbind(*uri);
}
} }
void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) { void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) {
......
...@@ -69,7 +69,12 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) ...@@ -69,7 +69,12 @@ Streamer::Streamer(nlohmann::json &config, Universe *net)
} }
Streamer::~Streamer() { Streamer::~Streamer() {
// TODO Unbind everything from net.... net_->unbind("find_stream");
net_->unbind("list_streams");
net_->unbind("source_calibration");
net_->unbind("get_stream");
net_->unbind("sync_streams");
net_->unbind("ping_streamer");
pool_.stop(); pool_.stop();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment