Skip to content
Snippets Groups Projects
Commit 8b6c9189 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Enable async call cancel, findOne and hopefully a working pubsub

parent 6797a90e
No related branches found
No related tags found
No related merge requests found
......@@ -114,9 +114,11 @@ class Peer {
* Non-blocking Remote Procedure Call using a callback function.
*/
template <typename T, typename... ARGS>
void asyncCall(const std::string &name,
int asyncCall(const std::string &name,
std::function<void(const T&)> cb,
ARGS... args);
void cancelCall(int id);
/**
* Blocking Remote Procedure Call using a string name.
......@@ -248,7 +250,7 @@ R Peer::call(const std::string &name, ARGS... args) {
std::condition_variable cv;
R result;
asyncCall<R>(name, [&](const R &r) {
int id = asyncCall<R>(name, [&](const R &r) {
std::unique_lock<std::mutex> lk(m);
hasreturned = true;
result = r;
......@@ -262,7 +264,7 @@ R Peer::call(const std::string &name, ARGS... args) {
}
if (!hasreturned) {
// TODO(nick) remove callback
cancelCall(id);
throw 1;
}
......@@ -270,7 +272,7 @@ R Peer::call(const std::string &name, ARGS... args) {
}
template <typename T, typename... ARGS>
void Peer::asyncCall(
int Peer::asyncCall(
const std::string &name,
std::function<void(const T&)> cb,
ARGS... args) {
......@@ -286,6 +288,7 @@ void Peer::asyncCall(
callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
_send();
return rpcid;
}
};
......
......@@ -9,6 +9,7 @@
#include <vector>
#include <string>
#include <thread>
#include <map>
namespace ftl {
namespace net {
......@@ -91,7 +92,11 @@ class Universe {
template <typename... ARGS>
void publish(const std::string &res, ARGS... args);
// TODO(nick) Add find_one, find_all, call_any ...
/**
* Register your ownership of a new resource. This must be called before
* publishing to this resource and before any peers attempt to subscribe.
*/
bool createResource(const std::string &uri);
private:
void _run();
......@@ -111,7 +116,8 @@ class Universe {
fd_set sfdread_;
std::vector<ftl::net::Listener*> listeners_;
std::vector<ftl::net::Peer*> peers_;
std::map<std::string, std::vector<ftl::net::Peer*>> subscribers_;
std::map<std::string, std::vector<ftl::UUID>> subscribers_;
std::unordered_set<std::string> owned_;
std::map<ftl::UUID, ftl::net::Peer*> peer_ids_;
ftl::UUID id_;
ftl::net::Dispatcher disp_;
......@@ -144,10 +150,40 @@ void Universe::broadcast(const std::string &name, ARGS... args) {
template <typename R, typename... ARGS>
std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
bool hasreturned = false;
std::mutex m;
std::condition_variable cv;
std::optional<R> result;
auto handler = [&](const std::optional<R> &r) {
std::unique_lock<std::mutex> lk(m);
if (hasreturned || !r) return;
hasreturned = true;
result = r;
lk.unlock();
cv.notify_one();
};
std::map<Peer*, int> record;
for (auto p : peers_) {
p->send(name, args...);
record[p] = p->asyncCall<std::optional<R>>(name, handler, std::forward<ARGS>(args)...);
}
return {};
{ // Block thread until async callback notifies us
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
// Cancel any further results
for (auto p : peers_) {
auto m = record.find(p);
if (m != record.end()) {
p->cancelCall(m->second);
}
}
}
return result;
}
template <typename R, typename... ARGS>
......@@ -161,7 +197,10 @@ template <typename... ARGS>
void Universe::publish(const std::string &res, ARGS... args) {
auto subs = subscribers_[res];
for (auto p : subs) {
p->send(res, args...);
auto peer = getPeer(p);
if (peer) {
*peer->send(res, args...);
}
}
}
......
......@@ -465,7 +465,13 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
(*callbacks_[id])(res);
callbacks_.erase(id);
} else {
LOG(ERROR) << "Missing RPC callback for result";
LOG(WARNING) << "Missing RPC callback for result - discarding";
}
}
void Peer::cancelCall(int id) {
if (callbacks_.count(id) > 0) {
callbacks_.erase(id);
}
}
......
......@@ -115,10 +115,13 @@ void Universe::_installBindings(Peer *p) {
void Universe::_installBindings() {
bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
});
bind("__owner__", [this](const std::string &res) -> optional<UUID> {
if (owned_.count(res) > 0) return ftl::net::this_peer;
else return {};
});
}
Peer *Universe::getPeer(const UUID &id) const {
......@@ -132,6 +135,11 @@ optional<UUID> Universe::_findOwner(const string &res) {
return findOne<UUID>("__owner__", res);
}
bool Universe::createResource(const std::string &uri) {
owned_.insert(uri);
return true;
}
bool Universe::_subscribe(const std::string &res) {
// Need to find who owns the resource
optional<UUID> pid = _findOwner(res);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment