diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 5355ceb47a4777216da54f759a74905aa56194a2..0681cf69368a7b67ce551d7b3bec7f1df0c755ab 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -114,9 +114,11 @@ class Peer { * Non-blocking Remote Procedure Call using a callback function. */ template <typename T, typename... ARGS> - void asyncCall(const std::string &name, + int asyncCall(const std::string &name, std::function<void(const T&)> cb, ARGS... args); + + void cancelCall(int id); /** * Blocking Remote Procedure Call using a string name. @@ -248,7 +250,7 @@ R Peer::call(const std::string &name, ARGS... args) { std::condition_variable cv; R result; - asyncCall<R>(name, [&](const R &r) { + int id = asyncCall<R>(name, [&](const R &r) { std::unique_lock<std::mutex> lk(m); hasreturned = true; result = r; @@ -262,7 +264,7 @@ R Peer::call(const std::string &name, ARGS... args) { } if (!hasreturned) { - // TODO(nick) remove callback + cancelCall(id); throw 1; } @@ -270,7 +272,7 @@ R Peer::call(const std::string &name, ARGS... args) { } template <typename T, typename... ARGS> -void Peer::asyncCall( +int Peer::asyncCall( const std::string &name, std::function<void(const T&)> cb, ARGS... args) { @@ -286,6 +288,7 @@ void Peer::asyncCall( callbacks_[rpcid] = std::make_unique<caller<T>>(cb); _send(); + return rpcid; } }; diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index 20fdfa21f3cf6fc83fe707f1e9da7e361d213320..18adf6ce15043d276e9ab645b1b4929f8a0b2c99 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -9,6 +9,7 @@ #include <vector> #include <string> #include <thread> +#include <map> namespace ftl { namespace net { @@ -91,7 +92,11 @@ class Universe { template <typename... ARGS> void publish(const std::string &res, ARGS... args); - // TODO(nick) Add find_one, find_all, call_any ... + /** + * Register your ownership of a new resource. This must be called before + * publishing to this resource and before any peers attempt to subscribe. + */ + bool createResource(const std::string &uri); private: void _run(); @@ -111,7 +116,8 @@ class Universe { fd_set sfdread_; std::vector<ftl::net::Listener*> listeners_; std::vector<ftl::net::Peer*> peers_; - std::map<std::string, std::vector<ftl::net::Peer*>> subscribers_; + std::map<std::string, std::vector<ftl::UUID>> subscribers_; + std::unordered_set<std::string> owned_; std::map<ftl::UUID, ftl::net::Peer*> peer_ids_; ftl::UUID id_; ftl::net::Dispatcher disp_; @@ -144,10 +150,40 @@ void Universe::broadcast(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { + bool hasreturned = false; + std::mutex m; + std::condition_variable cv; + + std::optional<R> result; + + auto handler = [&](const std::optional<R> &r) { + std::unique_lock<std::mutex> lk(m); + if (hasreturned || !r) return; + hasreturned = true; + result = r; + lk.unlock(); + cv.notify_one(); + }; + + std::map<Peer*, int> record; for (auto p : peers_) { - p->send(name, args...); + record[p] = p->asyncCall<std::optional<R>>(name, handler, std::forward<ARGS>(args)...); } - return {}; + + { // Block thread until async callback notifies us + std::unique_lock<std::mutex> lk(m); + cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); + + // Cancel any further results + for (auto p : peers_) { + auto m = record.find(p); + if (m != record.end()) { + p->cancelCall(m->second); + } + } + } + + return result; } template <typename R, typename... ARGS> @@ -161,7 +197,10 @@ template <typename... ARGS> void Universe::publish(const std::string &res, ARGS... args) { auto subs = subscribers_[res]; for (auto p : subs) { - p->send(res, args...); + auto peer = getPeer(p); + if (peer) { + *peer->send(res, args...); + } } } diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index bb0eb10b0b1958efd513a6cd7a94af1edad60ab0..60c1e122f032f35dd4d89101f617a372d83c1320 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -465,7 +465,13 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { (*callbacks_[id])(res); callbacks_.erase(id); } else { - LOG(ERROR) << "Missing RPC callback for result"; + LOG(WARNING) << "Missing RPC callback for result - discarding"; + } +} + +void Peer::cancelCall(int id) { + if (callbacks_.count(id) > 0) { + callbacks_.erase(id); } } diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index 6624a0a9aa409f913345990f5acc1defae17cbd7..1ccd80bb95795c3842d56d703435e65711242e7a 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -115,10 +115,13 @@ void Universe::_installBindings(Peer *p) { void Universe::_installBindings() { bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { - + LOG(INFO) << "Subscription to " << uri << " by " << id.to_string(); }); - + bind("__owner__", [this](const std::string &res) -> optional<UUID> { + if (owned_.count(res) > 0) return ftl::net::this_peer; + else return {}; + }); } Peer *Universe::getPeer(const UUID &id) const { @@ -132,6 +135,11 @@ optional<UUID> Universe::_findOwner(const string &res) { return findOne<UUID>("__owner__", res); } +bool Universe::createResource(const std::string &uri) { + owned_.insert(uri); + return true; +} + bool Universe::_subscribe(const std::string &res) { // Need to find who owns the resource optional<UUID> pid = _findOwner(res);