Skip to content
Snippets Groups Projects
Commit 2e607bb6 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Refactor to broadcastCall and allow member functions to be RPC functions

parent 8136448d
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ struct Blob { ...@@ -19,6 +19,7 @@ struct Blob {
char *data_; char *data_;
size_t size_; size_t size_;
std::string uri_; std::string uri_;
std::string owner_;
uint32_t blobid_; uint32_t blobid_;
void finished(); void finished();
......
...@@ -95,17 +95,51 @@ class Cluster { ...@@ -95,17 +95,51 @@ class Cluster {
/** /**
* Connect to a new peer node using the specified socket. * Connect to a new peer node using the specified socket.
*/ */
void addPeer(std::shared_ptr<ftl::net::Socket> &s); void addPeer(std::shared_ptr<ftl::net::Socket> &s, bool incoming=false);
/** /**
* Connect to a new peer using a URL string. * Connect to a new peer using a URL string.
*/ */
void addPeer(const char *url); void addPeer(const char *url);
std::string getOwner(const char *uri); /**
* Allow member functions to be used for RPC calls by binding with 'this'.
*/
template <typename R, typename C, typename ...Args>
auto bind(R(C::*f)(Args...)) {
return [this,f](Args... args) -> R { return (this->*f)(std::forward<Args>(args)...); };
}
std::string getOwner(const std::string &uri);
/**
* Make an RPC call to all connected peers and put into a results vector.
* This function blocks until all peers have responded or an error /
* timeout occurs. The return value indicates the number of failed peers, or
* is 0 if all returned.
*/
template <typename T, typename... ARGS>
int broadcastCall(const std::string &name, std::vector<T> &results,
ARGS... args) {
int count = 0;
auto f = [&count,&results](msgpack::object &r) {
count--;
results.push_back(r.as<T>());
};
for (auto p : peers_) {
count++;
p->async_call(name, f, std::forward<ARGS>(args)...);
}
// TODO Limit in case of no return.
while (count > 0) {
ftl::net::wait();
}
return count;
}
private: private:
static int rpcid_;
std::string root_; std::string root_;
std::shared_ptr<ftl::net::Listener> listener_; std::shared_ptr<ftl::net::Listener> listener_;
std::vector<std::shared_ptr<ftl::net::Socket>> peers_; std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
...@@ -115,6 +149,7 @@ class Cluster { ...@@ -115,6 +149,7 @@ class Cluster {
ftl::rm::Blob *_lookup(const char *uri); ftl::rm::Blob *_lookup(const char *uri);
Blob *_create(const char *uri, char *addr, size_t size, size_t count, Blob *_create(const char *uri, char *addr, size_t size, size_t count,
ftl::rm::flags_t flags, const std::string &tname); ftl::rm::flags_t flags, const std::string &tname);
void _registerRPC(ftl::net::Socket &s);
}; };
}; };
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#include <ftl/net/socket.hpp> #include <ftl/net/socket.hpp>
#include <ftl/p2p-rm/protocol.hpp> #include <ftl/p2p-rm/protocol.hpp>
#include <iostream>
struct SyncHeader { struct SyncHeader {
uint32_t blobid; uint32_t blobid;
uint32_t offset; uint32_t offset;
...@@ -10,7 +12,10 @@ struct SyncHeader { ...@@ -10,7 +12,10 @@ struct SyncHeader {
void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) {
// Sanity check // Sanity check
if (offset + size > blob.size_) throw -1; if (offset + size > blob.size_) {
LOG(ERROR) << "Memory overrun during sync";
return;
}
// TODO Delay send to collate many write operations? // TODO Delay send to collate many write operations?
......
...@@ -10,24 +10,25 @@ ...@@ -10,24 +10,25 @@
#include <map> #include <map>
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <vector>
using ftl::rm::Cluster; using ftl::rm::Cluster;
using ftl::net::Listener; using ftl::net::Listener;
using std::map; using std::map;
using std::vector;
using std::shared_ptr; using std::shared_ptr;
using std::string;
using ftl::URI; using ftl::URI;
using ftl::rm::Blob; using ftl::rm::Blob;
using ftl::net::Socket; using ftl::net::Socket;
int Cluster::rpcid_ = 0;
Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) { Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) {
//auto me = this; //auto me = this;
root_ = uri.getHost(); root_ = uri.getHost();
if (l != nullptr) { if (l != nullptr) {
l->onConnection([&](shared_ptr<Socket> &s) { l->onConnection([&](shared_ptr<Socket> &s) {
addPeer(s); addPeer(s, true);
}); });
} }
} }
...@@ -43,21 +44,27 @@ void Cluster::reset() { ...@@ -43,21 +44,27 @@ void Cluster::reset() {
blobs_.clear(); blobs_.clear();
} }
void Cluster::addPeer(shared_ptr<Socket> &p) { void Cluster::_registerRPC(Socket &s) {
LOG(INFO) << "Peer added: " << p->getURI(); //s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); });
//auto me = this; s.bind("getowner", bind(&Cluster::getOwner));
}
void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
LOG(INFO) << ((incoming) ? "Incoming peer added: " : "Peer added: ") << p->getURI();
peers_.push_back(p); peers_.push_back(p);
/*p->onMessage([&](int service, const std::string &data) { _registerRPC(*p);
std::cout << "MSG " << service << std::endl;
});*/
p->bind("getowner", [](const std::string &uri) -> std::string {
std::cout << "GETOWNER" << std::endl;
return "";
});
// TODO Check ownership of my blobs. if (!incoming) {
for (auto b : blobs_) { p->onConnect([this](Socket &s) {
getOwner(b.first.c_str()); for (auto b : blobs_) {
auto o = s.call<string>("getowner", b.first);
if (o.size() > 0) {
b.second->owner_ = o;
LOG(INFO) << "Lost ownership of " << b.first.c_str() << " to " << o;
}
}
});
} }
} }
...@@ -77,34 +84,23 @@ Blob *Cluster::_lookup(const char *uri) { ...@@ -77,34 +84,23 @@ Blob *Cluster::_lookup(const char *uri) {
std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl; std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl;
if (!b) { if (!b) {
// Whoops, need to map it first! LOG(WARNING) << "Unmapped memory requested: " << uri;
} }
return b; return b;
} }
std::string Cluster::getOwner(const char *uri) { std::string Cluster::getOwner(const std::string &uri) {
std::string result; vector<string> results;
int count = 0;
auto f = [&](msgpack::object &r) {
count--;
std::string res = r.as<std::string>();
if (res.size() > 0) result = res;
};
for (auto p : peers_) {
count++;
LOG(INFO) << "Request owner of " << uri << " from " << p->getURI();
p->async_call("getowner", f, std::string(uri));
}
// TODO Limit in case of no return. std::cout << "GETOWNER" << std::endl;
while (count > 0) { if (blobs_.count(uri) != 0) return blobs_[uri]->owner_;
ftl::net::wait();
} broadcastCall("getowner", results, uri);
return result; // TODO Verify all results are equal or empty
if (results.size() == 0) return "";
return results[0];
} }
Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
...@@ -121,14 +117,17 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, ...@@ -121,14 +117,17 @@ Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count,
b->data_ = addr; b->data_ = addr;
b->size_ = size; b->size_ = size;
b->uri_ = std::string(uri); b->uri_ = std::string(uri);
b->owner_ = "";
blobs_[u.getBaseURI()] = b; blobs_[u.getBaseURI()] = b;
std::string o = getOwner(uri); std::string o = getOwner(uri);
if (o.size() == 0) { if (o.size() == 0) {
// I am the owner! // I am the owner!
std::cout << "I own " << uri << std::endl; std::cout << "I own " << uri << std::endl;
b->owner_ = "me";
} else { } else {
std::cout << "I do not own " << uri << std::endl; std::cout << "I do not own " << uri << std::endl;
b->owner_ = o;
} }
//std::cout << owners << std::endl; //std::cout << owners << std::endl;
......
...@@ -6,6 +6,7 @@ add_executable(mapped_ptr EXCLUDE_FROM_ALL ...@@ -6,6 +6,7 @@ add_executable(mapped_ptr EXCLUDE_FROM_ALL
./mapped_ptr.cpp ./mapped_ptr.cpp
../src/blob.cpp ../src/blob.cpp
) )
target_link_libraries(mapped_ptr gflags glog)
add_executable(p2p_rm EXCLUDE_FROM_ALL add_executable(p2p_rm EXCLUDE_FROM_ALL
./tests.cpp ./tests.cpp
...@@ -15,7 +16,7 @@ add_executable(p2p_rm EXCLUDE_FROM_ALL ...@@ -15,7 +16,7 @@ add_executable(p2p_rm EXCLUDE_FROM_ALL
./p2p-rm.cpp ./p2p-rm.cpp
) )
target_link_libraries(p2p_rm uriparser) target_link_libraries(p2p_rm uriparser ftlnet gflags glog)
add_executable(peer_test EXCLUDE_FROM_ALL add_executable(peer_test EXCLUDE_FROM_ALL
./peer.cpp ./peer.cpp
......
...@@ -22,6 +22,7 @@ SCENARIO( "Reading from a remote pointer", "[remote_ptr]" ) { ...@@ -22,6 +22,7 @@ SCENARIO( "Reading from a remote pointer", "[remote_ptr]" ) {
// Make a dummy blob // Make a dummy blob
auto blob = new ftl::rm::Blob(); auto blob = new ftl::rm::Blob();
blob->data_ = (char*)(new int[5]); blob->data_ = (char*)(new int[5]);
blob->size_ = 5*sizeof(int);
((int*)(blob->data_))[0] = 55; ((int*)(blob->data_))[0] = 55;
((int*)(blob->data_))[1] = 66; ((int*)(blob->data_))[1] = 66;
...@@ -37,6 +38,7 @@ SCENARIO( "Writing to a remote pointer", "[remote_ptr]" ) { ...@@ -37,6 +38,7 @@ SCENARIO( "Writing to a remote pointer", "[remote_ptr]" ) {
// Make a dummy blob // Make a dummy blob
auto blob = new ftl::rm::Blob(); auto blob = new ftl::rm::Blob();
blob->data_ = (char*)(new int[5]); blob->data_ = (char*)(new int[5]);
blob->size_ = 5*sizeof(int);
((int*)(blob->data_))[0] = 55; ((int*)(blob->data_))[0] = 55;
((int*)(blob->data_))[1] = 66; ((int*)(blob->data_))[1] = 66;
...@@ -63,6 +65,7 @@ SCENARIO( "Writing to readonly pointer fails", "[remote_ptr]" ) { ...@@ -63,6 +65,7 @@ SCENARIO( "Writing to readonly pointer fails", "[remote_ptr]" ) {
// Make a dummy blob // Make a dummy blob
auto blob = new ftl::rm::Blob(); auto blob = new ftl::rm::Blob();
blob->data_ = (char*)(new int[5]); blob->data_ = (char*)(new int[5]);
blob->size_ = 5*sizeof(int);
((int*)(blob->data_))[0] = 55; ((int*)(blob->data_))[0] = 55;
((int*)(blob->data_))[1] = 66; ((int*)(blob->data_))[1] = 66;
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
// --- Mock Socket Send // --- Mock Socket Send
static std::vector<uint32_t> msgs; /*static std::vector<uint32_t> msgs;
int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) { int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) {
msgs.push_back(service); msgs.push_back(service);
...@@ -29,12 +29,6 @@ int ftl::net::Socket::send(uint32_t service, const std::string &data) { ...@@ -29,12 +29,6 @@ int ftl::net::Socket::send(uint32_t service, const std::string &data) {
msgs.push_back(service); msgs.push_back(service);
std::cout << "SEND (" << service << ")" << std::endl; std::cout << "SEND (" << service << ")" << std::endl;
/*if (service == P2P_RPC_CALL) {
// UNPACK
// PACK RETURN
message(P2P_RPC_RETURN, rdata);
}*/
return 0; return 0;
} }
...@@ -44,7 +38,7 @@ bool ftl::net::wait() { ...@@ -44,7 +38,7 @@ bool ftl::net::wait() {
std::shared_ptr<ftl::net::Socket> ftl::net::connect(const char *url) { std::shared_ptr<ftl::net::Socket> ftl::net::connect(const char *url) {
return nullptr; return nullptr;
} }*/
// --- End Mock Socket Send // --- End Mock Socket Send
...@@ -103,7 +97,8 @@ SCENARIO( "Cluster::map()", "[map]" ) { ...@@ -103,7 +97,8 @@ SCENARIO( "Cluster::map()", "[map]" ) {
SCENARIO( "Getting a read_ref", "[get]" ) { SCENARIO( "Getting a read_ref", "[get]" ) {
auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr); auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr);
// Add fake peer // Add fake peer
cluster->addPeer(std::shared_ptr<ftl::net::Socket>(new ftl::net::Socket(0))); auto p = std::make_shared<ftl::net::Socket>(0);
cluster->addPeer(p);
int data = 89; int data = 89;
int data2 = 99; int data2 = 99;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment