Skip to content
Snippets Groups Projects
Commit 32795d0c authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Add subscribe tests and get working. Fix for onConnect trigger in peer

parent d4153fb2
No related branches found
No related tags found
No related merge requests found
......@@ -140,9 +140,9 @@ class Peer {
template <typename F>
void bind(const std::string &name, F func);
//void onError(std::function<void(Socket&, int err, const char *msg)> &f) {}
void onConnect(std::function<void()> &f);
void onDisconnect(std::function<void()> &f) {}
// void onError(std::function<void(Peer &, int err, const char *msg)> &f) {}
void onConnect(const std::function<void(Peer &)> &f);
void onDisconnect(std::function<void(Peer &)> &f) {}
bool isWaiting() const { return is_waiting_; }
......@@ -180,6 +180,13 @@ class Peer {
int _send();
template<typename... ARGS>
void _trigger(const std::vector<std::function<void(Peer &, ARGS...)>> &hs, ARGS... args) {
for (auto h : hs) {
h(*this, args...);
}
}
/*template <typename... ARGS>
int _send(const std::string &t, ARGS... args);
......@@ -215,9 +222,9 @@ class Peer {
ftl::UUID peerid_;
ftl::net::Dispatcher *disp_;
std::vector<std::function<void()>> open_handlers_;
std::vector<std::function<void(Peer &)>> open_handlers_;
//std::vector<std::function<void(const ftl::net::Error &)>> error_handlers_
std::vector<std::function<void()>> close_handlers_;
std::vector<std::function<void(Peer &)>> close_handlers_;
std::map<int, std::unique_ptr<virtual_caller>> callbacks_;
static int rpcid__;
......
......@@ -57,6 +57,8 @@ class Universe {
Peer *getPeer(const ftl::UUID &pid) const;
int numberOfSubscribers(const std::string &res) const;
/**
* Bind a function to an RPC or service call name. This will implicitely
* be called by any peer making the request.
......@@ -138,8 +140,7 @@ void Universe::bind(const std::string &name, F func) {
template <typename F>
bool Universe::subscribe(const std::string &res, F func) {
bind(res, func);
_subscribe(res);
return true;
return _subscribe(res);
}
template <typename... ARGS>
......@@ -190,7 +191,10 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
template <typename R, typename... ARGS>
R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) {
Peer *p = getPeer(pid);
if (p == nullptr) throw -1;
if (p == nullptr) {
LOG(ERROR) << "Attempting to call an unknown peer : " << pid.to_string();
throw -1;
}
return p->call<R>(name, args...);
}
......
......@@ -149,6 +149,8 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
status_ = kConnected;
version_ = version;
peerid_ = pid;
_trigger(open_handlers_);
}
});
......@@ -216,6 +218,8 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
version_ = version;
peerid_ = pid;
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
_trigger(open_handlers_);
}
});
}
......@@ -262,6 +266,8 @@ void Peer::close(bool retry) {
//auto i = find(sockets.begin(),sockets.end(),this);
//sockets.erase(i);
_trigger(close_handlers_);
}
}
......@@ -481,9 +487,9 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
_send();
}
void Peer::onConnect(std::function<void()> &f) {
void Peer::onConnect(const std::function<void(Peer&)> &f) {
if (status_ == kConnected) {
f();
f(*this);
} else {
open_handlers_.push_back(f);
}
......@@ -491,10 +497,7 @@ void Peer::onConnect(std::function<void()> &f) {
void Peer::_connected() {
status_ = kConnected;
for (auto h : open_handlers_) {
h();
}
//connect_handlers_.clear();
}
int Peer::_send() {
......
......@@ -72,6 +72,10 @@ bool Universe::connect(const string &addr) {
_installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
});
return p->status() == Peer::kConnecting;
}
......@@ -116,6 +120,8 @@ void Universe::_installBindings(Peer *p) {
void Universe::_installBindings() {
bind("__subscribe__", [this](const UUID &id, const string &uri) -> bool {
LOG(INFO) << "Subscription to " << uri << " by " << id.to_string();
subscribers_[uri].push_back(id);
return true;
});
bind("__owner__", [this](const std::string &res) -> optional<UUID> {
......@@ -138,15 +144,25 @@ optional<UUID> Universe::findOwner(const string &res) {
bool Universe::createResource(const std::string &uri) {
owned_.insert(uri);
subscribers_[uri];
return true;
}
int Universe::numberOfSubscribers(const std::string &res) const {
auto s = subscribers_.find(res);
if (s != subscribers_.end()) {
return s->second.size();
} else {
return -1;
}
}
bool Universe::_subscribe(const std::string &res) {
// Need to find who owns the resource
optional<UUID> pid = findOwner(res);
if (pid) {
return call<bool>(*pid, "__subscribe__", id_, res);
return call<bool>(*pid, "__subscribe__", ftl::net::this_peer, res);
} else {
// No resource found
LOG(WARNING) << "Subscribe to unknown resource: " << res;
......@@ -199,8 +215,10 @@ void Universe::_run() {
if (csock != INVALID_SOCKET) {
auto p = new Peer(csock, &disp_);
peers_.push_back(p);
_installBindings(p);
p->onConnect([this](Peer &p) {
peer_ids_[p.id()] = &p;
});
}
//}
}
......
......@@ -162,6 +162,34 @@ TEST_CASE("Universe::findOwner()", "") {
a.createResource("ftl://test");
REQUIRE( *(b.findOwner("ftl://test")) == ftl::net::this_peer );
}
SECTION("three peers and one owner") {
Universe c;
c.connect("tcp://localhost:7077");
while (a.numberOfPeers() < 2) sleep_for(milliseconds(20));
b.createResource("ftl://test");
REQUIRE( *(a.findOwner("ftl://test")) == ftl::net::this_peer );
}
}
TEST_CASE("Universe::subscribe()", "") {
Universe a;
Universe b;
a.listen("tcp://localhost:7077");
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) sleep_for(milliseconds(20));
SECTION("no resource exists") {
REQUIRE( !b.subscribe("ftl://test", []() {}) );
}
SECTION("one resource exists") {
a.createResource("ftl://test");
REQUIRE( b.subscribe("ftl://test", []() {}) );
sleep_for(milliseconds(50));
REQUIRE( a.numberOfSubscribers("ftl://test") == 1);
}
}
/*TEST_CASE("net::listen()", "[net]") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment