diff --git a/p2p-rm/include/ftl/p2p-rm.hpp b/p2p-rm/include/ftl/p2p-rm.hpp index 00ea8d20eec849dd2d6ec3d36e06cec575cd07c1..7ea03199f052c3c9d9e634a6d282586ea6e3aaa0 100644 --- a/p2p-rm/include/ftl/p2p-rm.hpp +++ b/p2p-rm/include/ftl/p2p-rm.hpp @@ -2,11 +2,16 @@ #define _FTL_P2P_RM_HPP_ #include <ftl/p2p-rm/cluster.hpp> +#include <memory> namespace ftl { +namespace net { +class Listener; +}; + namespace rm { - std::shared_ptr<Cluster> cluster(const char *uri); + std::shared_ptr<Cluster> cluster(const char *uri, std::shared_ptr<ftl::net::Listener> l); } } diff --git a/p2p-rm/include/ftl/p2p-rm/cluster.hpp b/p2p-rm/include/ftl/p2p-rm/cluster.hpp index b60de3f99655e6d315cf52485929d13ca24be4dd..f4e7d4860d7e493999e36e408e874757ea5e5285 100644 --- a/p2p-rm/include/ftl/p2p-rm/cluster.hpp +++ b/p2p-rm/include/ftl/p2p-rm/cluster.hpp @@ -4,9 +4,12 @@ #include "ftl/p2p-rm/mapped_ptr.hpp" #include "ftl/p2p-rm/internal.hpp" +#include <ftl/uri.hpp> + #include <type_traits> #include <memory> #include <vector> +#include <map> namespace ftl { namespace net { @@ -16,9 +19,13 @@ namespace net { namespace rm { +class Blob; + +void _sync(const Blob &blob, size_t offset, size_t size); + class Cluster { public: - Cluster(const char *uri, std::shared_ptr<ftl::net::Listener> l); + Cluster(const ftl::URI &uri, std::shared_ptr<ftl::net::Listener> l); ~Cluster(); void reset(); @@ -65,7 +72,7 @@ class Cluster { if (addr == NULL) return ftl::null_ptr<T>; - return ftl::mapped_ptr<T>{_create(this, uri, (char*)addr, sizeof(T), size, + return ftl::mapped_ptr<T>{_create(uri, (char*)addr, sizeof(T), size, static_cast<flags_t>(std::is_integral<T>::value * ftl::rm::FLAG_INTEGER | std::is_signed<T>::value * ftl::rm::FLAG_SIGNED | std::is_trivial<T>::value * ftl::rm::FLAG_TRIVIAL), @@ -94,10 +101,14 @@ class Cluster { void addPeer(const char *url); private: - std::string uri_; std::string root_; std::shared_ptr<ftl::net::Listener> listener_; std::vector<std::shared_ptr<ftl::net::Socket>> peers_; + std::map<std::string, ftl::rm::Blob*> blobs_; + + ftl::rm::Blob *_lookup(const char *uri); + Blob *_create(const char *uri, char *addr, size_t size, size_t count, + ftl::rm::flags_t flags, const std::string &tname); }; }; diff --git a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp index c6cc0422443ea108d4081d2dee1a08634cfab80e..38843ea5d59ac579cba43343d5327a6198211fe1 100644 --- a/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp +++ b/p2p-rm/include/ftl/p2p-rm/mapped_ptr.hpp @@ -52,7 +52,9 @@ namespace ftl { mapped_ptr<T> ptr_; // Constructor - read_ref(mapped_ptr<T> ptr) : ptr_(ptr), rlock_(ptr.blob->mutex_) {} + read_ref(mapped_ptr<T> ptr) : ptr_(ptr) { + if (ptr_.is_valid()) rlock_ = std::shared_lock<std::shared_mutex>(ptr.blob->mutex_); + } bool is_valid() const { return !ptr_.is_null(); } mapped_ptr<T> pointer() const { return ptr_; } diff --git a/p2p-rm/src/cluster.cpp b/p2p-rm/src/cluster.cpp index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..91f6002d841c27b92b9e44183e4c53eab9796f34 100644 --- a/p2p-rm/src/cluster.cpp +++ b/p2p-rm/src/cluster.cpp @@ -0,0 +1,102 @@ +#include "ftl/p2p-rm.hpp" +#include "ftl/p2p-rm/blob.hpp" +#include "ftl/p2p-rm/protocol.hpp" +#include <ftl/p2p-rm/cluster.hpp> + +#include <ftl/net.hpp> +#include <ftl/net/listener.hpp> +#include <ftl/net/socket.hpp> + +#include <map> +#include <string> +#include <iostream> + +using ftl::rm::Cluster; +using ftl::net::Listener; +using std::map; +using std::shared_ptr; +using ftl::URI; +using ftl::rm::Blob; +using ftl::net::Socket; + +Cluster::Cluster(const URI &uri, shared_ptr<Listener> l) : listener_(l) { + auto me = this; + root_ = uri.getHost(); + + if (l != nullptr) { + l->onConnection([&](shared_ptr<Socket> s) { + me->addPeer(s); + }); + } +} + +Cluster::~Cluster() { + reset(); +} + +void Cluster::reset() { + for (auto x : blobs_) { + delete x.second; + } + blobs_.clear(); +} + +void Cluster::addPeer(shared_ptr<Socket> p) { + +} + +struct SyncHeader { + uint32_t blobid; + uint32_t offset; + uint32_t size; +}; + +void ftl::rm::_sync(const Blob &blob, size_t offset, size_t size) { + // Sanity check + if (offset + size > blob.size_) throw -1; + + // TODO Delay send to collate many write operations? + + if (blob.sockets_.size() > 0) { + SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)}; + + for (auto s : blob.sockets_) { + // Send over network + s->send2(P2P_SYNC, std::string((const char*)&header,sizeof(header)), + std::string(&blob.data_[offset],size)); + } + } +} + +Blob *Cluster::_lookup(const char *uri) { + URI u(uri); + if (!u.isValid()) return NULL; + if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; + //if (u.getPathSegment(0) != "memory") return NULL; + if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; } + + auto b = blobs_[u.getBaseURI()]; + std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl; + return b; +} + +Blob *Cluster::_create(const char *uri, char *addr, size_t size, size_t count, + ftl::rm::flags_t flags, const std::string &tname) { + URI u(uri); + if (!u.isValid()) return NULL; + if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; + if (u.getHost() != root_) { std::cerr << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; } + + if (blobs_[u.getBaseURI()] != NULL) return NULL; + + Blob *b = new Blob; + + b->data_ = addr; + b->size_ = size; + b->uri_ = std::string(uri); + blobs_[u.getBaseURI()] = b; + + // TODO : Perhaps broadcast this new allocation? + return b; +} + diff --git a/p2p-rm/src/p2prm.cpp b/p2p-rm/src/p2prm.cpp index 6f91b1da4431e9ba5332cf235b3b4bffcb2b31f1..596d510fd77b05dc53d4ce3a214b4cff6137deb9 100644 --- a/p2p-rm/src/p2prm.cpp +++ b/p2p-rm/src/p2prm.cpp @@ -8,64 +8,18 @@ #include <map> #include <string> -static std::map<std::string, ftl::rm::Blob*> blobs; +using std::shared_ptr; +using ftl::rm::Cluster; +using ftl::URI; +using ftl::net::Listener; -void ftl::rm::reset() { - for (auto x : blobs) { - delete x.second; - } - blobs.clear(); -} - -struct SyncHeader { - uint32_t blobid; - uint32_t offset; - uint32_t size; -}; - -void ftl::rm::_sync(const ftl::rm::Blob &blob, size_t offset, size_t size) { - // Sanity check - if (offset + size > size_) throw -1; - - // TODO Delay send to collate many write operations? - - if (blob.sockets_.size() > 0) { - SyncHeader header{blob.blobid_,static_cast<uint32_t>(offset),static_cast<uint32_t>(size)}; - - for (auto s : blob.sockets_) { - // Send over network - s->send2(P2P_SYNC, std::string((const char*)&header,sizeof(header)), - std::string(&data_[offset],size)); - } - } -} - -ftl::rm::Blob *ftl::rm::_lookup(const char *uri) { +shared_ptr<Cluster> ftl::rm::cluster(const char *uri, shared_ptr<Listener> l) { URI u(uri); - if (!u.isValid()) return NULL; - if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; - if (u.getPathSegment(0) != "memory") return NULL; - - return blobs[u.getBaseURI()]; -} - -ftl::rm::Blob *ftl::rm::_create(const char *uri, char *addr, size_t size, size_t count, - ftl::rm::flags_t flags, const std::string &tname) { - URI u(uri); - if (!u.isValid()) return NULL; - if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; - if (u.getPathSegment(0) != "memory") return NULL; - - if (blobs[u.getBaseURI()] != NULL) return NULL; - - ftl::rm::Blob *b = new ftl::rm::Blob; - - b->data_ = addr; - b->size_ = size; - b->uri_ = std::string(uri); - blobs[u.getBaseURI()] = b; + if (!u.isValid()) return nullptr; + if (u.getScheme() != ftl::URI::SCHEME_FTL) return nullptr; + if (u.getPath().size() > 0) return nullptr; - // TODO : Perhaps broadcast this new allocation? - return b; + shared_ptr<Cluster> c(new Cluster(u, l)); + return c; } diff --git a/p2p-rm/test/p2p-rm.cpp b/p2p-rm/test/p2p-rm.cpp index 663e3574d8cde97a060e025f515e3897334c8c6d..8aae7527901ecc581387f35d568b0176bdb4b5cc 100644 --- a/p2p-rm/test/p2p-rm.cpp +++ b/p2p-rm/test/p2p-rm.cpp @@ -1,85 +1,88 @@ #include "catch.hpp" #include <ftl/p2p-rm.hpp> +#include <ftl/net/socket.hpp> #include <vector> +#include <iostream> // --- Mock Socket Send static std::vector<uint32_t> msgs; -int ftl::net::raw::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) { +int ftl::net::Socket::send2(uint32_t service, const std::string &data1, const std::string &data2) { msgs.push_back(service); + std::cout << "SEND2 (" << service << ")" << std::endl; return 0; } // --- End Mock Socket Send -SCENARIO( "ftl::rm::map()", "[map]" ) { - ftl::rm::reset(); +SCENARIO( "Cluster::map()", "[map]" ) { + auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr); GIVEN( "a valid URI and array datatype" ) { int data[10]; - auto m = ftl::rm::map<int[10]>("ftl://uti.fi/memory/test0", &data); + auto m = cluster->map<int[10]>("ftl://utu.fi/memory/test0", &data); REQUIRE( m.is_valid() ); - auto r = ftl::rm::get<int[10]>("ftl://uti.fi/memory/test0"); + auto r = cluster->get<int[10]>("ftl://utu.fi/memory/test0"); REQUIRE( r.is_valid() ); REQUIRE( r.size() == 10*sizeof(int) ); REQUIRE( r.is_local() ); } GIVEN( "a valid URI and invalid data" ) { - auto m = ftl::rm::map<int>("ftl://uti.fi/memory/test0", NULL); + auto m = cluster->map<int>("ftl://utu.fi/memory/test0", NULL); REQUIRE( !m.is_valid() ); } GIVEN( "an empty URI" ) { int data; - auto m = ftl::rm::map<int>("", &data); + auto m = cluster->map<int>("", &data); REQUIRE( !m.is_valid() ); } GIVEN( "an invalid URI" ) { int data; - auto m = ftl::rm::map<int>("noschema/test", &data); + auto m = cluster->map<int>("noschema/test", &data); REQUIRE( !m.is_valid() ); } GIVEN( "an invalid URI schema" ) { int data; - auto m = ftl::rm::map<int>("http://uti.fi/memory/test0", &data); + auto m = cluster->map<int>("http://utu.fi/memory/test0", &data); REQUIRE( !m.is_valid() ); } - GIVEN( "an invalid URI path segment" ) { + GIVEN( "an invalid URI host" ) { int data; - auto m = ftl::rm::map<int>("ftl://uti.fi/wrong/test0", &data); + auto m = cluster->map<int>("ftl://yle.fi/wrong/test0", &data); REQUIRE( !m.is_valid() ); } GIVEN( "a duplicate URI" ) { int data; - auto a = ftl::rm::map<int>("ftl://uti.fi/memory/test0", &data); - auto b = ftl::rm::map<int>("ftl://uti.fi/memory/test0", &data); + auto a = cluster->map<int>("ftl://utu.fi/memory/test0", &data); + auto b = cluster->map<int>("ftl://utu.fi/memory/test0", &data); REQUIRE( !b.is_valid() ); REQUIRE( a.is_valid() ); } } SCENARIO( "Getting a read_ref", "[get]" ) { - ftl::rm::reset(); + auto cluster = ftl::rm::cluster("ftl://utu.fi", nullptr); int data = 89; - auto m = ftl::rm::map<int>("ftl://uti.fi/memory/test1", &data); + auto m = cluster->map<int>("ftl://utu.fi/memory/test1", &data); REQUIRE( m.is_valid() ); GIVEN( "a valid URI to local memory" ) { - const auto r = ftl::rm::getReadable<int>("ftl://uti.fi/memory/test1"); + const auto r = cluster->getReadable<int>("ftl://utu.fi/memory/test1"); REQUIRE( r.is_valid() ); REQUIRE( r.pointer().is_local() ); REQUIRE( r == 89 ); } GIVEN( "a valid URI to remote memory" ) { - const auto r = ftl::rm::getReadable<int>("ftl://uti.fi/memory/remote0"); + const auto r = cluster->getReadable<int>("ftl://utu.fi/memory/remote0"); REQUIRE( r.is_valid() ); REQUIRE( !r.pointer().is_local() ); REQUIRE( r == 888 );