From 51e0b4ca7a79f4b0016f173d12adcbf2cbd92114 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Fri, 1 Mar 2019 08:53:33 +0200 Subject: [PATCH] Move p2p code into net module --- net/include/ftl/net/p2p.hpp | 184 ++++++++++++++++++ net/include/ftl/net/socket.hpp | 4 + {p2p-rm => net}/include/ftl/uuid.hpp | 10 + net/src/p2p.cpp | 44 +++++ net/test/CMakeLists.txt | 13 ++ .../test/p2p_base_unit.cpp | 20 +- p2p-rm/include/ftl/p2p-rm/p2p.hpp | 114 ----------- 7 files changed, 265 insertions(+), 124 deletions(-) create mode 100644 net/include/ftl/net/p2p.hpp rename {p2p-rm => net}/include/ftl/uuid.hpp (81%) create mode 100644 net/src/p2p.cpp rename p2p-rm/test/p2p_unit.cpp => net/test/p2p_base_unit.cpp (95%) delete mode 100644 p2p-rm/include/ftl/p2p-rm/p2p.hpp diff --git a/net/include/ftl/net/p2p.hpp b/net/include/ftl/net/p2p.hpp new file mode 100644 index 000000000..76f2ca0ca --- /dev/null +++ b/net/include/ftl/net/p2p.hpp @@ -0,0 +1,184 @@ +#ifndef _FTL_NET_P2P_HPP_ +#define _FTL_NET_P2P_HPP_ + +#include <ftl/uuid.hpp> +#include <optional> +#include <string> +#include <map> +#include <chrono> +#include <vector> +#include <memory> +#include <ftl/net/protocol.hpp> +#include <ftl/net/socket.hpp> + +namespace ftl { +namespace net { + +/** + * Provides the base for p2p calls such as "find a single result" or "find all + * results" across the peer network. It wraps the underlying rpc mechanism, + * allowing a p2p rpc broadcast search strategy. It also enables calls to + * specific peers by peer id and manages the process of finding or initiating + * the required network connections. + */ +class P2P : public ftl::net::Protocol { + public: + P2P(const char *uri); + P2P(const std::string &uri); + + void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); }; + + const UUID &id() const { return id_; } + + /** + * Bind a member function as an rpc "find one" across peers function. + * The function bound is the individual local case only, returning an + * optional value. The first peer to return an optional value with an + * actual value will be the one used. + */ + template <typename R, typename C, typename... Args> + void bind_find_one(const std::string &name, + std::optional<R>(C::*f)(Args...)); + + /** + * Bind a member function as an rpc "find all" across peers function. + */ + template <typename R, typename C, typename... Args> + void bind_find_all(const std::string &name, + std::optional<R>(C::*f)(Args...)); + + /** + * Call an rpc function on all peers (recursively if needed), until one + * provides a result. + */ + template <typename R, typename... Args> + std::optional<R> find_one(const std::string &name, Args... args); + + /** + * Call an rpc function on all peers (recursively), collating all + * results into a vector. + */ + template <typename R, typename... Args> + std::vector<R> find_all(const std::string &name, Args... args); + + /** + * Call an rpc function on a specific peer. + */ + template <typename R, typename... Args> + R call_peer(const ftl::UUID &peer, const std::string &name, Args... args); + + /** + * Send a raw message to a specific peer. It will attempt to make a direct + * connection to that peer if one does not exist (and if the data packet + * is sufficiently large, and there are enough connection slots). + */ + template <typename... Args> + void send_peer(const ftl::UUID &peer, uint32_t service, Args... args); + + std::vector<std::string> getAddresses(const ftl::UUID &peer); + std::optional<long int> ping(const ftl::UUID &peer); + + private: + std::unordered_map<ftl::UUID,long int> requests_; + std::vector<std::shared_ptr<ftl::net::Socket>> peers_; + + private: + template <typename R, typename... Args> + std::optional<R> _find_one(const std::string &name, const ftl::UUID &u, + const int &ttl, Args... args); + + template <typename R, typename... Args> + std::vector<R> _find_all(const std::string &name, const ftl::UUID &u, + const int &ttl, Args... args); + + std::optional<long int> _ping(const ftl::UUID &peer, long int time); + + void _registerRPC(); + + private: + ftl::UUID id_; +}; + +}; // namespace net +}; // namespace ftl + +// --- Template implementations ------------------------------------------------ + +template <typename R, typename C, typename... Args> +void ftl::net::P2P::bind_find_one(const std::string &name, std::optional<R>(C::*f)(Args...)) { + bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::optional<R> { + if (requests_.count(u) > 0) return {}; + requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...); + if (result) return result; + + // Otherwise we must search again + if (ttl == 0) return {}; + + return _find_one<R>(name, u, ttl-1, args...); + }); +} + +template <typename R, typename C, typename... Args> +void ftl::net::P2P::bind_find_all(const std::string &name, std::optional<R>(C::*f)(Args...)) { + bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::vector<R> { + std::vector<R> results; + + if (requests_.count(u) > 0) return results; + requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...); + if (result) results.push_back(*result); + + // Otherwise we must search again + if (ttl == 0) return results; + + auto cres = _find_all<R>(name, u, ttl-1, args...); + if (cres.size() > 0) { + results.insert(results.end(), cres.begin(), cres.end()); + } + + return results; + }); +} + +template <typename R, typename... Args> +std::optional<R> ftl::net::P2P::find_one(const std::string &name, Args... args) { + ftl::UUID req; + int ttl = 10; + return _find_one<R>(name, req, ttl, args...); +} + +template <typename R, typename... Args> +std::optional<R> ftl::net::P2P::_find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) { + // TODO Use an async approach + for (auto p : peers_) { + auto res = p->call<std::optional<R>>(name, u, ttl, args...); + if (res) return res; + } + return {}; +} + +template <typename R, typename... Args> +std::vector<R> ftl::net::P2P::find_all(const std::string &name, Args... args) { + ftl::UUID req; + int ttl = 10; + return _find_all<R>(name, req, ttl, std::forward<Args...>(args...)); +} + +template <typename R, typename... Args> +std::vector<R> ftl::net::P2P::_find_all(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) { + // TODO Use an async approach + std::vector<R> results; + for (auto p : peers_) { + auto res = p->call<std::vector<R>>(name, u, ttl, args...); + std::cout << "Result size = " << res.size() << std::endl; + if (res.size() > 0) + results.insert(results.end(), res.begin(), res.end()); + } + return results; +} + +#endif // _FTL_NET_P2P_HPP_ + diff --git a/net/include/ftl/net/socket.hpp b/net/include/ftl/net/socket.hpp index a3efdfc12..c9d05e7e6 100644 --- a/net/include/ftl/net/socket.hpp +++ b/net/include/ftl/net/socket.hpp @@ -41,6 +41,10 @@ struct caller : virtual_caller { }; typedef std::tuple<const char*,size_t> array; +/*struct compress{}; +struct encrypt{}; +struct decompress{}; +struct decrypt{};*/ /** * A single socket connection object, to be constructed using the connect() diff --git a/p2p-rm/include/ftl/uuid.hpp b/net/include/ftl/uuid.hpp similarity index 81% rename from p2p-rm/include/ftl/uuid.hpp rename to net/include/ftl/uuid.hpp index 8ec0931e6..4726ebd3b 100644 --- a/p2p-rm/include/ftl/uuid.hpp +++ b/net/include/ftl/uuid.hpp @@ -8,6 +8,9 @@ #include <msgpack.hpp> namespace ftl { + /** + * C++ Wrapper for libuuid. The default constructor generates a new UUID. + */ class UUID { public: UUID() { uuid_generate(uuid_); }; @@ -17,15 +20,22 @@ namespace ftl { bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; } bool operator!=(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) != 0; } + /** + * Get a raw data string. + */ std::string str() const { return std::string((char*)uuid_,16); }; const unsigned char *raw() const { return &uuid_[0]; } + /** + * Get a pretty string. + */ std::string to_string() const { char b[37]; uuid_unparse(uuid_, b); return std::string(b); } + /* Allow the UUID to be packed into an RPC message. */ MSGPACK_DEFINE(uuid_); private: diff --git a/net/src/p2p.cpp b/net/src/p2p.cpp new file mode 100644 index 000000000..fc12c2032 --- /dev/null +++ b/net/src/p2p.cpp @@ -0,0 +1,44 @@ +#include <ftl/net/p2p.hpp> + +using ftl::net::P2P; +using std::optional; +using std::tuple; +using ftl::UUID; +using std::get; +using namespace std::chrono; +using std::vector; +using std::string; + +P2P::P2P(const char *uri) : Protocol(uri) { + _registerRPC(); +} + +P2P::P2P(const string &uri) : Protocol(uri) { + _registerRPC(); +} + +void P2P::_registerRPC() { + bind_find_one("ping", &P2P::_ping); +} + +vector<string> P2P::getAddresses(const UUID &peer) { + vector<string> results; + return results; +} + +optional<long int> P2P::ping(const UUID &peer) { + long int time = duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count(); + auto p = find_one<long int>("ping", peer, time); + + if (!p) return {}; + return *p - time; +} + +optional<long int> P2P::_ping(const UUID &peer, long int time) { + if (id() == peer) { + return time; + } else { + return {}; + } +} + diff --git a/net/test/CMakeLists.txt b/net/test/CMakeLists.txt index f491fd685..ee774f88d 100644 --- a/net/test/CMakeLists.txt +++ b/net/test/CMakeLists.txt @@ -19,6 +19,19 @@ add_executable(uri_unit target_include_directories(uri_unit PUBLIC ${PROJECT_SOURCE_DIR}/include) target_link_libraries(uri_unit uriparser) +add_executable(p2p_base_unit + ./tests.cpp + ./p2p_base_unit.cpp + ../src/p2p.cpp + ../src/socket.cpp + ../src/dispatcher.cpp + ../src/protocol.cpp + ../src/net.cpp + ../src/listener.cpp +) +target_include_directories(p2p_base_unit PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(p2p_base_unit uriparser gflags glog uuid) + add_executable(net_integration ./tests.cpp ./net_integration.cpp diff --git a/p2p-rm/test/p2p_unit.cpp b/net/test/p2p_base_unit.cpp similarity index 95% rename from p2p-rm/test/p2p_unit.cpp rename to net/test/p2p_base_unit.cpp index 99b2d2943..c07c5593d 100644 --- a/p2p-rm/test/p2p_unit.cpp +++ b/net/test/p2p_base_unit.cpp @@ -80,14 +80,14 @@ Dispatcher::response_t get_response() { // --- Tests ------------------------------------------------------------------- -#include <ftl/p2p-rm/p2p.hpp> +#include <ftl/net/p2p.hpp> -using ftl::net::p2p; +using ftl::net::P2P; SCENARIO("p2p::bind_find_one()", "[find_one]") { - class Mock_p2p : public p2p { + class Mock_p2p : public P2P { public: - Mock_p2p() : p2p("mock://") { + Mock_p2p() : P2P("mock://") { bind_find_one("test", &Mock_p2p::test); } @@ -146,9 +146,9 @@ SCENARIO("p2p::bind_find_one()", "[find_one]") { } SCENARIO("p2p::bind_find_all()", "[find_one]") { - class Mock_p2p : public p2p { + class Mock_p2p : public P2P { public: - Mock_p2p() : p2p("mock://") { + Mock_p2p() : P2P("mock://") { bind_find_all("test", &Mock_p2p::test); } @@ -209,9 +209,9 @@ SCENARIO("p2p::bind_find_all()", "[find_one]") { } SCENARIO("p2p::find_one()", "[find_one]") { - class Mock_p2p : public p2p { + class Mock_p2p : public P2P { public: - Mock_p2p() : p2p("mock://") { + Mock_p2p() : P2P("mock://") { bind_find_one("test", &Mock_p2p::test); } @@ -244,9 +244,9 @@ SCENARIO("p2p::find_one()", "[find_one]") { } SCENARIO("p2p::find_all()", "[find_one]") { - class Mock_p2p : public p2p { + class Mock_p2p : public P2P { public: - Mock_p2p() : p2p("mock://") { + Mock_p2p() : P2P("mock://") { bind_find_all("test", &Mock_p2p::test); } diff --git a/p2p-rm/include/ftl/p2p-rm/p2p.hpp b/p2p-rm/include/ftl/p2p-rm/p2p.hpp deleted file mode 100644 index 5b3264fb0..000000000 --- a/p2p-rm/include/ftl/p2p-rm/p2p.hpp +++ /dev/null @@ -1,114 +0,0 @@ -#ifndef _FTL_RM_P2P_HPP_ -#define _FTL_RM_P2P_HPP_ - -#include <ftl/uuid.hpp> -#include <optional> -#include <string> -#include <map> -#include <chrono> -#include <vector> -#include <memory> -#include <ftl/net/protocol.hpp> -#include <ftl/net/socket.hpp> -#include <iostream> - -namespace ftl { -namespace net { - -class p2p : public ftl::net::Protocol { - public: - p2p(const char *uri) : Protocol(uri) {} - p2p(const std::string &uri) : Protocol(uri) {} - - void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); }; - - template <typename R, typename C, typename... Args> - void bind_find_one(const std::string &name, std::optional<R>(C::*f)(Args...)) { - bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::optional<R> { - if (requests_.count(u) > 0) return {}; - requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); - - auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...); - if (result) return result; - - // Otherwise we must search again - if (ttl == 0) return {}; - - return _find_one<R>(name, u, ttl-1, args...); - }); - } - - template <typename R, typename C, typename... Args> - void bind_find_all(const std::string &name, std::optional<R>(C::*f)(Args...)) { - bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::vector<R> { - std::vector<R> results; - - if (requests_.count(u) > 0) return results; - requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); - - auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...); - if (result) results.push_back(*result); - - // Otherwise we must search again - if (ttl == 0) return results; - - auto cres = _find_all<R>(name, u, ttl-1, args...); - if (cres.size() > 0) { - results.insert(results.end(), cres.begin(), cres.end()); - } - - return results; - }); - } - - template <typename R, typename... Args> - std::optional<R> find_one(const std::string &name, Args... args) { - ftl::UUID req; - int ttl = 10; - return _find_one<R>(name, req, ttl, args...); - } - - template <typename R, typename... Args> - std::optional<R> _find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) { - // TODO Use an async approach - for (auto p : peers_) { - auto res = p->call<std::optional<R>>(name, u, ttl, args...); - if (res) return res; - } - return {}; - } - - template <typename R, typename... Args> - std::vector<R> find_all(const std::string &name, Args... args) { - ftl::UUID req; - int ttl = 10; - return _find_all<R>(name, req, ttl, std::forward<Args...>(args...)); - } - - template <typename R, typename... Args> - std::vector<R> _find_all(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) { - // TODO Use an async approach - std::vector<R> results; - for (auto p : peers_) { - auto res = p->call<std::vector<R>>(name, u, ttl, args...); - std::cout << "Result size = " << res.size() << std::endl; - if (res.size() > 0) - results.insert(results.end(), res.begin(), res.end()); - } - return results; - } - - /*R find_all(const std::string &name, Args... args) { - - }*/ - - private: - std::unordered_map<ftl::UUID,long int> requests_; - std::vector<std::shared_ptr<ftl::net::Socket>> peers_; -}; - -}; // namespace net -}; // namespace ftl - -#endif // _FTL_RM_P2P_HPP_ - -- GitLab