diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp index dfeb5ffcd66aa3c51b01d2ddbd0fb758e066142a..4df425a6743713c92c96a29407f53e7fa12ca856 100644 --- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp +++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp @@ -6,6 +6,7 @@ #include <ftl/p2p-rm/protocol.hpp> #include <ftl/uri.hpp> +#include <ftl/uuid.hpp> #include <ftl/net/socket.hpp> #include <ftl/net/protocol.hpp> @@ -102,7 +103,7 @@ class Cluster : public ftl::net::Protocol { /** * Connect to a new peer using a URL string. */ - void addPeer(const char *url); + std::shared_ptr<ftl::net::Socket> addPeer(const char *url); /** * Allow member functions to be used for RPC calls by binding with 'this'. @@ -145,10 +146,16 @@ class Cluster : public ftl::net::Protocol { std::map<std::string, ftl::rm::Blob*> blobs_; std::map<int,std::vector<std::tuple<std::shared_ptr<ftl::net::Socket>,std::string>>> rpc_results_; + // Cache of seen requests. + std::unordered_map<ftl::UUID,long int> requests_; + ftl::rm::Blob *_lookup(const char *uri); Blob *_create(const char *uri, char *addr, size_t size, size_t count, ftl::rm::flags_t flags, const std::string &tname); void _registerRPC(ftl::net::Socket &s); + + private: + std::tuple<std::string,uint32_t> getOwner_RPC(const ftl::UUID &u, int ttl, const std::string &uri); }; }; diff --git a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp index ef3419d6650033abaa71ee3f8d8634c841d61392..3aae5f0ad919924113c50b8251aad9b1be541909 100644 --- a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp +++ b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp @@ -23,6 +23,8 @@ namespace ftl { T *get() { return blob->data_; } size_t size() const { return blob->size_; } + bool is_owner() const { return false; } + write_ref<T> operator*(); write_ref<T> operator[](ptrdiff_t idx); write_ref<T> writable() { return ftl::write_ref<T>(*this); } diff --git a/p2p-rm/include/ftl/p2p-rm/protocol.hpp b/p2p-rm/include/ftl/p2p-rm/protocol.hpp index 743e48dc03bfab755f2981a10f92f3b02e4b2968..392154f3689add50eec4dcd1c45f7a0b2bdc40f4 100644 --- a/p2p-rm/include/ftl/p2p-rm/protocol.hpp +++ b/p2p-rm/include/ftl/p2p-rm/protocol.hpp @@ -12,5 +12,19 @@ #define P2P_PEERSEARCH (FTL_PROTOCOL_FREE + 6) #define P2P_RPC_CALL (FTL_PROTOCOL_FREE + 7) +namespace ftl { +namespace rm { + struct P2PQuery { + char guid[16]; + uint8_t ttl; + }; + + struct MemOwner { + char peer[16]; + uint64_t age; + }; +}; +}; + #endif // _FTL_P2P_RM_PROTOCOL_HPP_ diff --git a/p2p-rm/include/ftl/uuid.hpp b/p2p-rm/include/ftl/uuid.hpp new file mode 100644 index 0000000000000000000000000000000000000000..674c274e22e52a8f7d8e55b1f283019a2ed82246 --- /dev/null +++ b/p2p-rm/include/ftl/uuid.hpp @@ -0,0 +1,38 @@ +#ifndef _FTL_UUID_HPP_ +#define _FTL_UUID_HPP_ + +#include <uuid/uuid.h> +#include <memory> +#include <string> +#include <functional> +#include <msgpack.hpp> + +namespace ftl { + class UUID { + public: + UUID() { uuid_generate(uuid_); }; + UUID(const UUID &u) { memcpy(uuid_,u.uuid_,16); } + + bool operator==(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) == 0; } + bool operator!=(const UUID &u) const { return memcmp(uuid_,u.uuid_,16) != 0; } + + std::string str() const { return std::string((char*)uuid_,16); }; + const unsigned char *raw() const { return &uuid_[0]; } + + MSGPACK_DEFINE(uuid_); + + private: + unsigned char uuid_[16]; + }; +}; + +namespace std { + template <> struct hash<ftl::UUID> { + size_t operator()(const ftl::UUID & x) const { + return std::hash<std::string>{}(x.str()); + } + }; +}; + +#endif // _FTL_UUID_HPP_ + diff --git a/p2p-rm/src/blob.cpp b/p2p-rm/src/blob.cpp index e4b3b60b1f97cc32163f01003f8ae0d29872e267..c322aeefc01919c875d925471d0320347b26c6c5 100644 --- a/p2p-rm/src/blob.cpp +++ b/p2p-rm/src/blob.cpp @@ -1,9 +1,12 @@ +#include <glog/logging.h> #include <ftl/p2p-rm/blob.hpp> #include <ftl/net/socket.hpp> #include <ftl/p2p-rm/protocol.hpp> #include <iostream> +using ftl::net::array; + struct SyncHeader { uint32_t blobid; uint32_t offset; @@ -18,6 +21,8 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { } // TODO Delay send to collate many write operations? + + LOG(INFO) << "Synchronise blob " << blob.blobid_; if (blob.sockets_.size() > 0) { SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)}; @@ -25,7 +30,7 @@ void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { for (auto s : blob.sockets_) { // Send over network s->send(P2P_SYNC, std::string((const char*)&header,sizeof(header)), - std::string(&blob.data_[offset],size)); + array{&blob.data_[offset],size}); } } } diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp index 44063b861564c6984060c65a8e92acb1f358c64f..f309b2be924dc263aa8becf3a4b4ce98dbbe2533 100644 --- a/p2p-rm/src/cluster.cpp +++ b/p2p-rm/src/cluster.cpp @@ -1,3 +1,4 @@ +#include <glog/logging.h> #include "ftl/p2p-rm.hpp" #include "ftl/p2p-rm/blob.hpp" #include "ftl/p2p-rm/protocol.hpp" @@ -11,16 +12,20 @@ #include <string> #include <iostream> #include <vector> +#include <chrono> using ftl::rm::Cluster; using ftl::net::Listener; using std::map; using std::vector; +using std::tuple; using std::shared_ptr; using std::string; using ftl::URI; using ftl::rm::Blob; using ftl::net::Socket; +using ftl::UUID; +using namespace std::chrono; Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : Protocol(uri.getBaseURI()), listener_(l) { //auto me = this; @@ -62,6 +67,12 @@ void Cluster::reset() { void Cluster::_registerRPC(Socket &s) { //s.bind("getowner", [this](const std::string &u) { getOwner(u.c_str()); }); bind("getowner", member(&Cluster::getOwner)); + + bind("nop", []() { return true; }); + + bind(P2P_SYNC, [this](uint32_t msg, Socket &s) { + LOG(INFO) << "Receive blob sync"; + }); } void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) { @@ -85,9 +96,10 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) { } } -void Cluster::addPeer(const char *url) { +shared_ptr<Socket> Cluster::addPeer(const char *url) { auto sock = ftl::net::connect(url); addPeer(sock); + return sock; } Blob *Cluster::_lookup(const char *uri) { @@ -107,19 +119,28 @@ Blob *Cluster::_lookup(const char *uri) { return b; } -std::string Cluster::getOwner(const std::string &uri) { - vector<string> results; +tuple<string,uint32_t> Cluster::getOwner_RPC(const UUID &u, int ttl, const std::string &uri) { + if (requests_.count(u) > 0) return {"",0}; + requests_[u] = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); std::cout << "GETOWNER" << std::endl; - if (blobs_.count(uri) != 0) return blobs_[uri]->owner_; - - broadcastCall("getowner", results, uri); + if (blobs_.count(uri) != 0) return {blobs_[uri]->owner_,0}; + + vector<tuple<string,uint32_t>> results; + broadcastCall("getowner", results, u, ttl-1, uri); // TODO Verify all results are equal or empty - if (results.size() == 0) return ""; + if (results.size() == 0) return {"",0}; return results[0]; } +std::string Cluster::getOwner(const std::string &uri) { + UUID u; + int ttl = 10; + + return std::get<0>(getOwner_RPC(u, ttl, uri)); +} + Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, ftl::rm::flags_t flags, const std::string &tname) { URI u(uri); diff --git a/p2p-rm/test/CMakeLists.txt b/p2p-rm/test/CMakeLists.txt index 9cb7a804966d4f00f4a27d9fede1b36b3f1a87ec..659c5234162281101b4ff51a7653c4bd07e735c8 100644 --- a/p2p-rm/test/CMakeLists.txt +++ b/p2p-rm/test/CMakeLists.txt @@ -1,18 +1,27 @@ -add_executable(mapped_ptr_unit EXCLUDE_FROM_ALL +add_executable(mapped_ptr_unit ./tests.cpp ./mapped_ptr_unit.cpp ../src/blob.cpp ) target_link_libraries(mapped_ptr_unit gflags glog) -add_executable(cluster_unit EXCLUDE_FROM_ALL +add_executable(cluster_unit ./tests.cpp ../src/cluster.cpp ./cluster_unit.cpp ) -target_link_libraries(cluster_unit uriparser ftlnet gflags glog) +target_link_libraries(cluster_unit uriparser ftlnet gflags glog uuid) -add_executable(peer_test EXCLUDE_FROM_ALL +add_executable(p2p_integration + ./tests.cpp + ../src/cluster.cpp + ../src/p2prm.cpp + ../src/blob.cpp + ./p2p_integration.cpp +) +target_link_libraries(p2p_integration uriparser ftlnet gflags glog uuid) + +add_executable(peer_test ./peer_cli.cpp ../src/p2prm.cpp ../src/cluster.cpp @@ -24,5 +33,5 @@ add_test(Mapped_ptrUnitTest mapped_ptr_unit) add_test(ClusterUnitTest cluster_unit) add_custom_target(tests) -add_dependencies(tests mapped_ptr_unit cluster_unit peer_test) +add_dependencies(tests mapped_ptr_unit cluster_unit peer_test p2p_integration) diff --git a/p2p-rm/test/p2p_integration.cpp b/p2p-rm/test/p2p_integration.cpp index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..4b4a4e8fb861f815aeee3fdfdd114216e6c8941b 100644 --- a/p2p-rm/test/p2p_integration.cpp +++ b/p2p-rm/test/p2p_integration.cpp @@ -0,0 +1,105 @@ +#include "catch.hpp" +#include <ftl/p2p-rm/cluster.hpp> +#include <ftl/net/socket.hpp> +#include <ftl/net/listener.hpp> +#include <ftl/net.hpp> +#include <vector> +#include <iostream> + +using ftl::rm::Cluster; + +SCENARIO( "Pre-connection ownership resolution", "[ownership]" ) { + Cluster c1("ftl://utu.fi", nullptr); + auto l = ftl::net::listen("tcp://localhost:9000"); + l->setProtocol(&c1); + Cluster c2("ftl://utu.fi", l); + + auto s = c1.addPeer("tcp://localhost:9000"); + + int data1 = 89; + int data2 = 99; + + auto m1 = c1.map<int>("ftl://utu.fi/memory/r1", &data1); + auto m2 = c2.map<int>("ftl://utu.fi/memory/r1", &data2); + REQUIRE( m1.is_valid() ); + REQUIRE( m2.is_valid() ); + + ftl::net::wait([&s]() { return s->isConnected(); }); + + REQUIRE( m2.is_owner() ); + REQUIRE( !m1.is_owner() ); + + l->close(); + ftl::net::stop(); +} + + +SCENARIO( "Post-connection ownership resolution", "[ownership]" ) { + Cluster c1("ftl://utu.fi", nullptr); + auto l = ftl::net::listen("tcp://localhost:9000"); + l->setProtocol(&c1); + Cluster c2("ftl://utu.fi", l); + + auto s = c1.addPeer("tcp://localhost:9000"); + + int data1 = 89; + int data2 = 99; + + ftl::net::wait([&s]() { return s->isConnected(); }); + + auto m1 = c1.map<int>("ftl://utu.fi/memory/r1", &data1); + auto m2 = c2.map<int>("ftl://utu.fi/memory/r1", &data2); + REQUIRE( m1.is_valid() ); + REQUIRE( m2.is_valid() ); + + REQUIRE( !m2.is_owner() ); + REQUIRE( m1.is_owner() ); + + l->close(); + ftl::net::stop(); +} + +SCENARIO( "Write change ownership", "[ownership]" ) { + Cluster c1("ftl://utu.fi", nullptr); + auto l = ftl::net::listen("tcp://localhost:9000"); + l->setProtocol(&c1); + Cluster c2("ftl://utu.fi", l); + + auto s = c1.addPeer("tcp://localhost:9000"); + + int data1 = 89; + int data2 = 99; + + ftl::net::wait([&s]() { return s->isConnected(); }); + + auto m1 = c1.map<int>("ftl://utu.fi/memory/r1", &data1); + auto m2 = c2.map<int>("ftl://utu.fi/memory/r1", &data2); + REQUIRE( m1.is_valid() ); + REQUIRE( m2.is_valid() ); + + REQUIRE( !m2.is_owner() ); + REQUIRE( m1.is_owner() ); + + *m2 = 676; + + REQUIRE( m2.is_owner() ); + REQUIRE( !m1.is_owner() ); + + l->close(); + ftl::net::stop(); +} + + + /* + + REQUIRE( *m2 == 99 ); + REQUIRE( *m1 == 89 ); + + *m2 = 77; + ftl::net::wait(); + REQUIRE( *m2 == 77 ); + REQUIRE( *m1 == 77 ); + + *m1 = 66; + REQUIRE( *m2 == 66 ); + REQUIRE( *m1 == 66 );*/