Skip to content
Snippets Groups Projects
Commit 51e0b4ca authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Move p2p code into net module

parent 0ce372d1
No related branches found
No related tags found
No related merge requests found
#ifndef _FTL_NET_P2P_HPP_
#define _FTL_NET_P2P_HPP_
#include <ftl/uuid.hpp>
#include <optional>
#include <string>
#include <map>
#include <chrono>
#include <vector>
#include <memory>
#include <ftl/net/protocol.hpp>
#include <ftl/net/socket.hpp>
namespace ftl {
namespace net {
/**
* Provides the base for p2p calls such as "find a single result" or "find all
* results" across the peer network. It wraps the underlying rpc mechanism,
* allowing a p2p rpc broadcast search strategy. It also enables calls to
* specific peers by peer id and manages the process of finding or initiating
* the required network connections.
*/
class P2P : public ftl::net::Protocol {
public:
P2P(const char *uri);
P2P(const std::string &uri);
void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); };
const UUID &id() const { return id_; }
/**
* Bind a member function as an rpc "find one" across peers function.
* The function bound is the individual local case only, returning an
* optional value. The first peer to return an optional value with an
* actual value will be the one used.
*/
template <typename R, typename C, typename... Args>
void bind_find_one(const std::string &name,
std::optional<R>(C::*f)(Args...));
/**
* Bind a member function as an rpc "find all" across peers function.
*/
template <typename R, typename C, typename... Args>
void bind_find_all(const std::string &name,
std::optional<R>(C::*f)(Args...));
/**
* Call an rpc function on all peers (recursively if needed), until one
* provides a result.
*/
template <typename R, typename... Args>
std::optional<R> find_one(const std::string &name, Args... args);
/**
* Call an rpc function on all peers (recursively), collating all
* results into a vector.
*/
template <typename R, typename... Args>
std::vector<R> find_all(const std::string &name, Args... args);
/**
* Call an rpc function on a specific peer.
*/
template <typename R, typename... Args>
R call_peer(const ftl::UUID &peer, const std::string &name, Args... args);
/**
* Send a raw message to a specific peer. It will attempt to make a direct
* connection to that peer if one does not exist (and if the data packet
* is sufficiently large, and there are enough connection slots).
*/
template <typename... Args>
void send_peer(const ftl::UUID &peer, uint32_t service, Args... args);
std::vector<std::string> getAddresses(const ftl::UUID &peer);
std::optional<long int> ping(const ftl::UUID &peer);
private:
std::unordered_map<ftl::UUID,long int> requests_;
std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
private:
template <typename R, typename... Args>
std::optional<R> _find_one(const std::string &name, const ftl::UUID &u,
const int &ttl, Args... args);
template <typename R, typename... Args>
std::vector<R> _find_all(const std::string &name, const ftl::UUID &u,
const int &ttl, Args... args);
std::optional<long int> _ping(const ftl::UUID &peer, long int time);
void _registerRPC();
private:
ftl::UUID id_;
};
}; // namespace net
}; // namespace ftl
// --- Template implementations ------------------------------------------------
template <typename R, typename C, typename... Args>
void ftl::net::P2P::bind_find_one(const std::string &name, std::optional<R>(C::*f)(Args...)) {
bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::optional<R> {
if (requests_.count(u) > 0) return {};
requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...);
if (result) return result;
// Otherwise we must search again
if (ttl == 0) return {};
return _find_one<R>(name, u, ttl-1, args...);
});
}
template <typename R, typename C, typename... Args>
void ftl::net::P2P::bind_find_all(const std::string &name, std::optional<R>(C::*f)(Args...)) {
bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::vector<R> {
std::vector<R> results;
if (requests_.count(u) > 0) return results;
requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...);
if (result) results.push_back(*result);
// Otherwise we must search again
if (ttl == 0) return results;
auto cres = _find_all<R>(name, u, ttl-1, args...);
if (cres.size() > 0) {
results.insert(results.end(), cres.begin(), cres.end());
}
return results;
});
}
template <typename R, typename... Args>
std::optional<R> ftl::net::P2P::find_one(const std::string &name, Args... args) {
ftl::UUID req;
int ttl = 10;
return _find_one<R>(name, req, ttl, args...);
}
template <typename R, typename... Args>
std::optional<R> ftl::net::P2P::_find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) {
// TODO Use an async approach
for (auto p : peers_) {
auto res = p->call<std::optional<R>>(name, u, ttl, args...);
if (res) return res;
}
return {};
}
template <typename R, typename... Args>
std::vector<R> ftl::net::P2P::find_all(const std::string &name, Args... args) {
ftl::UUID req;
int ttl = 10;
return _find_all<R>(name, req, ttl, std::forward<Args...>(args...));
}
template <typename R, typename... Args>
std::vector<R> ftl::net::P2P::_find_all(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) {
// TODO Use an async approach
std::vector<R> results;
for (auto p : peers_) {
auto res = p->call<std::vector<R>>(name, u, ttl, args...);
std::cout << "Result size = " << res.size() << std::endl;
if (res.size() > 0)
results.insert(results.end(), res.begin(), res.end());
}
return results;
}
#endif // _FTL_NET_P2P_HPP_
......@@ -41,6 +41,10 @@ struct caller : virtual_caller {
};
typedef std::tuple<const char*,size_t> array;
/*struct compress{};
struct encrypt{};
struct decompress{};
struct decrypt{};*/
/**
* A single socket connection object, to be constructed using the connect()
......
......@@ -8,6 +8,9 @@
#include <msgpack.hpp>
namespace ftl {
/**
* C++ Wrapper for libuuid. The default constructor generates a new UUID.
*/
class UUID {
public:
UUID() { uuid_generate(uuid_); };
......@@ -17,15 +20,22 @@ namespace ftl {
bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; }
bool operator!=(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) != 0; }
/**
* Get a raw data string.
*/
std::string str() const { return std::string((char*)uuid_,16); };
const unsigned char *raw() const { return &uuid_[0]; }
/**
* Get a pretty string.
*/
std::string to_string() const {
char b[37];
uuid_unparse(uuid_, b);
return std::string(b);
}
/* Allow the UUID to be packed into an RPC message. */
MSGPACK_DEFINE(uuid_);
private:
......
#include <ftl/net/p2p.hpp>
using ftl::net::P2P;
using std::optional;
using std::tuple;
using ftl::UUID;
using std::get;
using namespace std::chrono;
using std::vector;
using std::string;
P2P::P2P(const char *uri) : Protocol(uri) {
_registerRPC();
}
P2P::P2P(const string &uri) : Protocol(uri) {
_registerRPC();
}
void P2P::_registerRPC() {
bind_find_one("ping", &P2P::_ping);
}
vector<string> P2P::getAddresses(const UUID &peer) {
vector<string> results;
return results;
}
optional<long int> P2P::ping(const UUID &peer) {
long int time = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count();
auto p = find_one<long int>("ping", peer, time);
if (!p) return {};
return *p - time;
}
optional<long int> P2P::_ping(const UUID &peer, long int time) {
if (id() == peer) {
return time;
} else {
return {};
}
}
......@@ -19,6 +19,19 @@ add_executable(uri_unit
target_include_directories(uri_unit PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(uri_unit uriparser)
add_executable(p2p_base_unit
./tests.cpp
./p2p_base_unit.cpp
../src/p2p.cpp
../src/socket.cpp
../src/dispatcher.cpp
../src/protocol.cpp
../src/net.cpp
../src/listener.cpp
)
target_include_directories(p2p_base_unit PUBLIC ${PROJECT_SOURCE_DIR}/include)
target_link_libraries(p2p_base_unit uriparser gflags glog uuid)
add_executable(net_integration
./tests.cpp
./net_integration.cpp
......
......@@ -80,14 +80,14 @@ Dispatcher::response_t get_response() {
// --- Tests -------------------------------------------------------------------
#include <ftl/p2p-rm/p2p.hpp>
#include <ftl/net/p2p.hpp>
using ftl::net::p2p;
using ftl::net::P2P;
SCENARIO("p2p::bind_find_one()", "[find_one]") {
class Mock_p2p : public p2p {
class Mock_p2p : public P2P {
public:
Mock_p2p() : p2p("mock://") {
Mock_p2p() : P2P("mock://") {
bind_find_one("test", &Mock_p2p::test);
}
......@@ -146,9 +146,9 @@ SCENARIO("p2p::bind_find_one()", "[find_one]") {
}
SCENARIO("p2p::bind_find_all()", "[find_one]") {
class Mock_p2p : public p2p {
class Mock_p2p : public P2P {
public:
Mock_p2p() : p2p("mock://") {
Mock_p2p() : P2P("mock://") {
bind_find_all("test", &Mock_p2p::test);
}
......@@ -209,9 +209,9 @@ SCENARIO("p2p::bind_find_all()", "[find_one]") {
}
SCENARIO("p2p::find_one()", "[find_one]") {
class Mock_p2p : public p2p {
class Mock_p2p : public P2P {
public:
Mock_p2p() : p2p("mock://") {
Mock_p2p() : P2P("mock://") {
bind_find_one("test", &Mock_p2p::test);
}
......@@ -244,9 +244,9 @@ SCENARIO("p2p::find_one()", "[find_one]") {
}
SCENARIO("p2p::find_all()", "[find_one]") {
class Mock_p2p : public p2p {
class Mock_p2p : public P2P {
public:
Mock_p2p() : p2p("mock://") {
Mock_p2p() : P2P("mock://") {
bind_find_all("test", &Mock_p2p::test);
}
......
#ifndef _FTL_RM_P2P_HPP_
#define _FTL_RM_P2P_HPP_
#include <ftl/uuid.hpp>
#include <optional>
#include <string>
#include <map>
#include <chrono>
#include <vector>
#include <memory>
#include <ftl/net/protocol.hpp>
#include <ftl/net/socket.hpp>
#include <iostream>
namespace ftl {
namespace net {
class p2p : public ftl::net::Protocol {
public:
p2p(const char *uri) : Protocol(uri) {}
p2p(const std::string &uri) : Protocol(uri) {}
void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); };
template <typename R, typename C, typename... Args>
void bind_find_one(const std::string &name, std::optional<R>(C::*f)(Args...)) {
bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::optional<R> {
if (requests_.count(u) > 0) return {};
requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...);
if (result) return result;
// Otherwise we must search again
if (ttl == 0) return {};
return _find_one<R>(name, u, ttl-1, args...);
});
}
template <typename R, typename C, typename... Args>
void bind_find_all(const std::string &name, std::optional<R>(C::*f)(Args...)) {
bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::vector<R> {
std::vector<R> results;
if (requests_.count(u) > 0) return results;
requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...);
if (result) results.push_back(*result);
// Otherwise we must search again
if (ttl == 0) return results;
auto cres = _find_all<R>(name, u, ttl-1, args...);
if (cres.size() > 0) {
results.insert(results.end(), cres.begin(), cres.end());
}
return results;
});
}
template <typename R, typename... Args>
std::optional<R> find_one(const std::string &name, Args... args) {
ftl::UUID req;
int ttl = 10;
return _find_one<R>(name, req, ttl, args...);
}
template <typename R, typename... Args>
std::optional<R> _find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) {
// TODO Use an async approach
for (auto p : peers_) {
auto res = p->call<std::optional<R>>(name, u, ttl, args...);
if (res) return res;
}
return {};
}
template <typename R, typename... Args>
std::vector<R> find_all(const std::string &name, Args... args) {
ftl::UUID req;
int ttl = 10;
return _find_all<R>(name, req, ttl, std::forward<Args...>(args...));
}
template <typename R, typename... Args>
std::vector<R> _find_all(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) {
// TODO Use an async approach
std::vector<R> results;
for (auto p : peers_) {
auto res = p->call<std::vector<R>>(name, u, ttl, args...);
std::cout << "Result size = " << res.size() << std::endl;
if (res.size() > 0)
results.insert(results.end(), res.begin(), res.end());
}
return results;
}
/*R find_all(const std::string &name, Args... args) {
}*/
private:
std::unordered_map<ftl::UUID,long int> requests_;
std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
};
}; // namespace net
}; // namespace ftl
#endif // _FTL_RM_P2P_HPP_
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment