diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 0b9b812288b06ebca15f2b9ba23e0d22c72ca664..5355ceb47a4777216da54f759a74905aa56194a2 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -34,6 +34,8 @@ extern int setDescriptors(); namespace ftl { namespace net { +extern ftl::UUID this_peer; + class Universe; struct virtual_caller { diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index 8f84287fefb979d7fd2482051b3339fb99c92884..20fdfa21f3cf6fc83fe707f1e9da7e361d213320 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -54,6 +54,8 @@ class Universe { int numberOfPeers() const { return peers_.size(); } + Peer *getPeer(const ftl::UUID &pid) const; + /** * Bind a function to an RPC or service call name. This will implicitely * be called by any peer making the request. @@ -76,6 +78,12 @@ class Universe { template <typename... ARGS> void broadcast(const std::string &name, ARGS... args); + template <typename R, typename... ARGS> + R call(const UUID &pid, const std::string &name, ARGS... args); + + template <typename R, typename... ARGS> + std::optional<R> findOne(const std::string &name, ARGS... args); + /** * Send a non-blocking RPC call with no return value to all subscribers * of a resource. There may be no subscribers. @@ -88,7 +96,10 @@ class Universe { private: void _run(); int _setDescriptors(); + void _installBindings(); void _installBindings(Peer *); + bool _subscribe(const std::string &res); + std::optional<ftl::UUID> _findOwner(const std::string &res); static void __start(Universe *u); @@ -100,6 +111,8 @@ class Universe { fd_set sfdread_; std::vector<ftl::net::Listener*> listeners_; std::vector<ftl::net::Peer*> peers_; + std::map<std::string, std::vector<ftl::net::Peer*>> subscribers_; + std::map<ftl::UUID, ftl::net::Peer*> peer_ids_; ftl::UUID id_; ftl::net::Dispatcher disp_; @@ -115,6 +128,13 @@ void Universe::bind(const std::string &name, F func) { typename ftl::internal::func_kind_info<F>::args_kind()); } +template <typename F> +bool Universe::subscribe(const std::string &res, F func) { + bind(res, func); + _subscribe(res); + return true; +} + template <typename... ARGS> void Universe::broadcast(const std::string &name, ARGS... args) { for (auto p : peers_) { @@ -122,6 +142,29 @@ void Universe::broadcast(const std::string &name, ARGS... args) { } } +template <typename R, typename... ARGS> +std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { + for (auto p : peers_) { + p->send(name, args...); + } + return {}; +} + +template <typename R, typename... ARGS> +R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { + Peer *p = getPeer(pid); + if (p == nullptr) throw -1; + return p->call<R>(name, args...); +} + +template <typename... ARGS> +void Universe::publish(const std::string &res, ARGS... args) { + auto subs = subscribers_[res]; + for (auto p : subs) { + p->send(res, args...); + } +} + }; // namespace net }; // namespace ftl diff --git a/net/cpp/include/ftl/uuid.hpp b/net/cpp/include/ftl/uuid.hpp index 76fae762977e12d69904c21f710a7a6e5dea3e34..77b448beed1b337288e5a9d6bc041015b4031dcf 100644 --- a/net/cpp/include/ftl/uuid.hpp +++ b/net/cpp/include/ftl/uuid.hpp @@ -37,6 +37,9 @@ namespace ftl { bool operator!=(const UUID &u) const { return memcmp(&uuid_,&u.uuid_,16) != 0; } + bool operator<(const UUID &u) const { + return strncmp((const char*)uuid_, (const char *)u.uuid_, 16) < 0; + } /** * Get a raw data string. diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index bc969f91bc85afef40c4f0f7cf4c673ab7fe0149..bb0eb10b0b1958efd513a6cd7a94af1edad60ab0 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -51,6 +51,9 @@ using ftl::net::Dispatcher; int Peer::rpcid__ = 0; +// Global peer UUID +ftl::UUID ftl::net::this_peer; + static ctpl::thread_pool pool(5); // TODO(nick) Move to tcp_internal.cpp @@ -145,12 +148,13 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { } else { status_ = kConnected; version_ = version; + peerid_ = pid; } }); - ftl::UUID uuid; + //ftl::UUID uuid; - send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid); + send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); } } @@ -210,8 +214,8 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { } else { status_ = kConnected; version_ = version; - ftl::UUID uuid; - send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid); + peerid_ = pid; + send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); } }); } diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index 006131c4eb1d7120447d9b358051bbb0516424c2..6624a0a9aa409f913345990f5acc1defae17cbd7 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -12,8 +12,12 @@ using ftl::net::Peer; using ftl::net::Listener; using ftl::net::Universe; using nlohmann::json; +using ftl::UUID; +using std::optional; -Universe::Universe() : active_(true), thread_(Universe::__start, this) {} +Universe::Universe() : active_(true), thread_(Universe::__start, this) { + _installBindings(); +} Universe::Universe(nlohmann::json &config) : active_(true), config_(config), thread_(Universe::__start, this) { @@ -30,6 +34,8 @@ Universe::Universe(nlohmann::json &config) : connect(p); } } + + _installBindings(); } Universe::~Universe() { @@ -104,13 +110,41 @@ int Universe::_setDescriptors() { } void Universe::_installBindings(Peer *p) { - p->bind("__subscribe__", [this](const string &uri) { - // Add this peer to subscription list for uri resource + +} + +void Universe::_installBindings() { + bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool { + }); } +Peer *Universe::getPeer(const UUID &id) const { + auto ix = peer_ids_.find(id); + if (ix == peer_ids_.end()) return nullptr; + else return ix->second; +} + +optional<UUID> Universe::_findOwner(const string &res) { + // TODO(nick) cache this information + return findOne<UUID>("__owner__", res); +} + +bool Universe::_subscribe(const std::string &res) { + // Need to find who owns the resource + optional<UUID> pid = _findOwner(res); + + if (pid) { + return call<bool>(*pid, "__subscribe__", id_, res); + } else { + // No resource found + LOG(WARNING) << "Subscribe to unknown resource: " << res; + return false; + } +} + void Universe::__start(Universe * u) { u->_run(); }