Skip to content
Snippets Groups Projects
Commit 6797a90e authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Start adding resource subscription code

parent 34fc9ace
No related branches found
No related tags found
No related merge requests found
Pipeline #9727 passed
...@@ -34,6 +34,8 @@ extern int setDescriptors(); ...@@ -34,6 +34,8 @@ extern int setDescriptors();
namespace ftl { namespace ftl {
namespace net { namespace net {
extern ftl::UUID this_peer;
class Universe; class Universe;
struct virtual_caller { struct virtual_caller {
......
...@@ -54,6 +54,8 @@ class Universe { ...@@ -54,6 +54,8 @@ class Universe {
int numberOfPeers() const { return peers_.size(); } int numberOfPeers() const { return peers_.size(); }
Peer *getPeer(const ftl::UUID &pid) const;
/** /**
* Bind a function to an RPC or service call name. This will implicitely * Bind a function to an RPC or service call name. This will implicitely
* be called by any peer making the request. * be called by any peer making the request.
...@@ -76,6 +78,12 @@ class Universe { ...@@ -76,6 +78,12 @@ class Universe {
template <typename... ARGS> template <typename... ARGS>
void broadcast(const std::string &name, ARGS... args); void broadcast(const std::string &name, ARGS... args);
template <typename R, typename... ARGS>
R call(const UUID &pid, const std::string &name, ARGS... args);
template <typename R, typename... ARGS>
std::optional<R> findOne(const std::string &name, ARGS... args);
/** /**
* Send a non-blocking RPC call with no return value to all subscribers * Send a non-blocking RPC call with no return value to all subscribers
* of a resource. There may be no subscribers. * of a resource. There may be no subscribers.
...@@ -88,7 +96,10 @@ class Universe { ...@@ -88,7 +96,10 @@ class Universe {
private: private:
void _run(); void _run();
int _setDescriptors(); int _setDescriptors();
void _installBindings();
void _installBindings(Peer *); void _installBindings(Peer *);
bool _subscribe(const std::string &res);
std::optional<ftl::UUID> _findOwner(const std::string &res);
static void __start(Universe *u); static void __start(Universe *u);
...@@ -100,6 +111,8 @@ class Universe { ...@@ -100,6 +111,8 @@ class Universe {
fd_set sfdread_; fd_set sfdread_;
std::vector<ftl::net::Listener*> listeners_; std::vector<ftl::net::Listener*> listeners_;
std::vector<ftl::net::Peer*> peers_; std::vector<ftl::net::Peer*> peers_;
std::map<std::string, std::vector<ftl::net::Peer*>> subscribers_;
std::map<ftl::UUID, ftl::net::Peer*> peer_ids_;
ftl::UUID id_; ftl::UUID id_;
ftl::net::Dispatcher disp_; ftl::net::Dispatcher disp_;
...@@ -115,6 +128,13 @@ void Universe::bind(const std::string &name, F func) { ...@@ -115,6 +128,13 @@ void Universe::bind(const std::string &name, F func) {
typename ftl::internal::func_kind_info<F>::args_kind()); typename ftl::internal::func_kind_info<F>::args_kind());
} }
template <typename F>
bool Universe::subscribe(const std::string &res, F func) {
bind(res, func);
_subscribe(res);
return true;
}
template <typename... ARGS> template <typename... ARGS>
void Universe::broadcast(const std::string &name, ARGS... args) { void Universe::broadcast(const std::string &name, ARGS... args) {
for (auto p : peers_) { for (auto p : peers_) {
...@@ -122,6 +142,29 @@ void Universe::broadcast(const std::string &name, ARGS... args) { ...@@ -122,6 +142,29 @@ void Universe::broadcast(const std::string &name, ARGS... args) {
} }
} }
template <typename R, typename... ARGS>
std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
for (auto p : peers_) {
p->send(name, args...);
}
return {};
}
template <typename R, typename... ARGS>
R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
Peer *p = getPeer(pid);
if (p == nullptr) throw -1;
return p->call<R>(name, args...);
}
template <typename... ARGS>
void Universe::publish(const std::string &res, ARGS... args) {
auto subs = subscribers_[res];
for (auto p : subs) {
p->send(res, args...);
}
}
}; // namespace net }; // namespace net
}; // namespace ftl }; // namespace ftl
......
...@@ -37,6 +37,9 @@ namespace ftl { ...@@ -37,6 +37,9 @@ namespace ftl {
bool operator!=(const UUID &u) const { bool operator!=(const UUID &u) const {
return memcmp(&uuid_,&u.uuid_,16) != 0; return memcmp(&uuid_,&u.uuid_,16) != 0;
} }
bool operator<(const UUID &u) const {
return strncmp((const char*)uuid_, (const char *)u.uuid_, 16) < 0;
}
/** /**
* Get a raw data string. * Get a raw data string.
......
...@@ -51,6 +51,9 @@ using ftl::net::Dispatcher; ...@@ -51,6 +51,9 @@ using ftl::net::Dispatcher;
int Peer::rpcid__ = 0; int Peer::rpcid__ = 0;
// Global peer UUID
ftl::UUID ftl::net::this_peer;
static ctpl::thread_pool pool(5); static ctpl::thread_pool pool(5);
// TODO(nick) Move to tcp_internal.cpp // TODO(nick) Move to tcp_internal.cpp
...@@ -145,12 +148,13 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { ...@@ -145,12 +148,13 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
} else { } else {
status_ = kConnected; status_ = kConnected;
version_ = version; version_ = version;
peerid_ = pid;
} }
}); });
ftl::UUID uuid; //ftl::UUID uuid;
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
} }
} }
...@@ -210,8 +214,8 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -210,8 +214,8 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
} else { } else {
status_ = kConnected; status_ = kConnected;
version_ = version; version_ = version;
ftl::UUID uuid; peerid_ = pid;
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, uuid); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
} }
}); });
} }
......
...@@ -12,8 +12,12 @@ using ftl::net::Peer; ...@@ -12,8 +12,12 @@ using ftl::net::Peer;
using ftl::net::Listener; using ftl::net::Listener;
using ftl::net::Universe; using ftl::net::Universe;
using nlohmann::json; using nlohmann::json;
using ftl::UUID;
using std::optional;
Universe::Universe() : active_(true), thread_(Universe::__start, this) {} Universe::Universe() : active_(true), thread_(Universe::__start, this) {
_installBindings();
}
Universe::Universe(nlohmann::json &config) : Universe::Universe(nlohmann::json &config) :
active_(true), config_(config), thread_(Universe::__start, this) { active_(true), config_(config), thread_(Universe::__start, this) {
...@@ -30,6 +34,8 @@ Universe::Universe(nlohmann::json &config) : ...@@ -30,6 +34,8 @@ Universe::Universe(nlohmann::json &config) :
connect(p); connect(p);
} }
} }
_installBindings();
} }
Universe::~Universe() { Universe::~Universe() {
...@@ -104,13 +110,41 @@ int Universe::_setDescriptors() { ...@@ -104,13 +110,41 @@ int Universe::_setDescriptors() {
} }
void Universe::_installBindings(Peer *p) { void Universe::_installBindings(Peer *p) {
p->bind("__subscribe__", [this](const string &uri) {
// Add this peer to subscription list for uri resource }
void Universe::_installBindings() {
bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
}); });
} }
Peer *Universe::getPeer(const UUID &id) const {
auto ix = peer_ids_.find(id);
if (ix == peer_ids_.end()) return nullptr;
else return ix->second;
}
optional<UUID> Universe::_findOwner(const string &res) {
// TODO(nick) cache this information
return findOne<UUID>("__owner__", res);
}
bool Universe::_subscribe(const std::string &res) {
// Need to find who owns the resource
optional<UUID> pid = _findOwner(res);
if (pid) {
return call<bool>(*pid, "__subscribe__", id_, res);
} else {
// No resource found
LOG(WARNING) << "Subscribe to unknown resource: " << res;
return false;
}
}
void Universe::__start(Universe * u) { void Universe::__start(Universe * u) {
u->_run(); u->_run();
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment