Skip to content
Snippets Groups Projects
Commit 5688da11 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

First attempt at peer 2 peer remote memory

parent 3a1bd82a
No related branches found
No related tags found
No related merge requests found
net/build
*/build
*/node_modules
......@@ -6,7 +6,7 @@
namespace ftl {
namespace net {
raw::Socket *connect(const char *uri) { return raw::connect(uri); }
inline raw::Socket *connect(const char *uri) { return raw::connect(uri); }
}
}
......
......@@ -86,6 +86,7 @@ namespace ftl {
std::string &getPath() { return m_path; };
std::string &getQuery() { return m_query; };
std::string &getBaseURI() { return m_base; };
std::string &getPathSegment(int n) { return m_pathseg[n]; };
private:
bool m_valid;
......
cmake_minimum_required (VERSION 2.8.11)
include (CheckIncludeFile)
include (CheckIncludeFileCXX)
include (CheckFunctionExists)
project (ftlp2prm)
#find_package(PkgConfig)
#pkg_check_modules(GTKMM gtkmm-3.0)
# Need to include staged files and libs
include_directories(${PROJECT_SOURCE_DIR}/include)
include_directories(${PROJECT_BINARY_DIR})
set(ftl_VERSION_MAJOR "1")
set(ftl_VERSION_MINOR "0")
set(ftl_VERSION_PATCH "0")
set(CMAKE_CXX_FLAGS "-pthread -fopenmp -std=c++17 -Wall -Wno-deprecated -Werror -Wno-psabi")
set(CMAKE_CXX_FLAGS_DEBUG "-D_DEBUG -pg -Wall -Werror")
set(CMAKE_CXX_FLAGS_RELEASE "-O3")
SET(CMAKE_USE_RELATIVE_PATHS ON)
set(FTLSOURCE
src/blob.cpp
src/p2prm.cpp
)
add_library(ftl-p2prm ${FTLSOURCE})
check_include_file("uriparser/Uri.h" HAVE_URI_H)
if(NOT HAVE_URI_H)
message(FATAL_ERROR "Uriparser is required")
endif()
check_function_exists(uriParseSingleUriA HAVE_URIPARSESINGLE)
# Look for FTL Net
find_path(FTL_NET "ftl/net.hpp")
if (NOT FTL_NET)
message(STATUS "FTL Net is not installed")
find_path(FTL_NET "ftl/net.hpp" PATHS ../net/include)
if (NOT FTL_NET)
message(FATAL_ERROR "FTL Net is required")
endif()
#TODO Ensure this is built
message(STATUS ${FTL_NET})
include_directories(${FTL_NET})
endif()
target_include_directories(ftl-p2prm PUBLIC ${PROJECT_SOURCE_DIR}/include)
#target_link_libraries(libftlp2pra pthread)
ADD_SUBDIRECTORY(test)
File moved
#ifndef _FTL_P2P_RA_HPP_
#define _FTL_P2P_RA_HPP_
#include "ftl/p2p-rm/remote_ptr.hpp"
namespace ftl {
namespace rm {
void reset();
ftl::rm::Blob *_lookupBlob(const char *uri);
ftl::rm::Blob *_createBlob(const char *uri, size_t size);
template <typename T>
ftl::remote_ptr<T> getPointer(const char *uri) {
auto b = _lookupBlob(uri);
// TODO Verify type and size
return ftl::remote_ptr<T>{b,0};
}
template <typename T>
ftl::read_ref<T> getReadable(const char *uri) {
return getPointer<T>(uri).readable();
}
template <typename T>
ftl::write_ref<T> getWritable(const char *uri) {
return getPointer<T>(uri).writable();
}
//template <typename T>
//ftl::rw_ref<T> getReadWrite(const char *uri);
/**
* Get a remote write only reference filled with a provided empty value
* rather than first performing a remote read to populate.
*/
//template <typename T>
//ftl::write_ref<T> getWritable(const char *uri, T nullvalue);
/**
* Make a new memory allocation locally mapped to a given URI. The URI
* must not already exist within the peer group, otherwise a nullptr is
* returned.
*/
template <typename T>
ftl::remote_ptr<T> alloc(const char *uri, size_t size) {
auto b = _createBlob(uri, size*sizeof(T));
return ftl::remote_ptr<T>{b,0};
}
void free(const char *uri);
template <typename T>
void free(ftl::remote_ptr<T> p) {
}
/**
* Obtain a list or URI memory blocks in the current peer group that match
* the provided base URI.
*/
std::vector<std::string> list(const char *partial_uri);
void addPeer(ftl::net::raw::Socket *s);
}
}
#endif // _FTL_P2P_RA_HPP_
#ifndef _FTL_P2P_RA_BLOB_HPP_
#define _FTL_P2P_RA_BLOB_HPP_
#include <mutex>
#include <shared_mutex>
#include <ftl/net.hpp>
namespace ftl {
namespace rm {
/* NOT TO BE USED DIRECTLY */
struct Blob {
ftl::net::raw::Socket *socket_;
char *data_;
size_t size_;
void finished();
void write(size_t offset, const char *data, size_t size);
void read(size_t offset, char *data, size_t size);
mutable std::shared_mutex mutex_;
};
}
}
#endif // _FTL_P2P_RA_CACHE_HPP_
#ifndef _FTL_P2P_RA_REMOTE_PTR_HPP_
#define _FTL_P2P_RA_REMOTE_PTR_HPP_
#include "ftl/p2p-rm/blob.hpp"
namespace ftl {
template <typename T> struct write_ref;
template <typename T> struct read_ref;
template <typename T> struct write_ptr;
template <typename T> struct read_ptr;
template <typename T>
struct remote_ptr {
rm::Blob *blob;
size_t offset;
bool is_null() const { return blob == NULL; }
bool is_local() const { return blob->data_ != NULL; }
bool is_valid() const {
return !is_null() && offset+sizeof(T) <= blob->size_;
}
T *get() { return blob->data_; }
size_t size() const { return blob->size_; }
write_ref<T> operator*();
write_ref<T> operator[](ptrdiff_t idx);
write_ref<T> writable() { return ftl::write_ref<T>(*this); }
read_ref<T> operator*() const;
read_ref<T> operator[](ptrdiff_t idx) const;
read_ref<T> readable() { return ftl::read_ref<T>(*this); }
remote_ptr<T> operator+(std::ptrdiff_t diff) const {
size_t new_offset = offset + sizeof(T)*diff;
return remote_ptr<T>{blob, new_offset};
}
/** Allow pointer casting if a local blob */
template <typename U>
operator U*() {
if (is_local()) return (U*)blob->data_;
return NULL;
}
};
template <typename T>
struct read_ref {
remote_ptr<T> ptr_;
// Constructor
read_ref(remote_ptr<T> ptr) : ptr_(ptr), rlock_(ptr.blob->mutex_) {}
bool is_valid() const { return !ptr_.is_null(); }
remote_ptr<T> pointer() const { return ptr_; }
void reset() { rlock_.unlock(); }
void finish() { reset(); }
operator T() const {
//return static_cast<T>(ptr_.blob->data_[ptr_.offset]);
T t;
ptr_.blob->read(ptr_.offset, (char*)&t, sizeof(T));
return t;
}
read_ref &operator=(const T &value) {
// silent fail!
return *this;
}
std::shared_lock<std::shared_mutex> rlock_;
};
template <typename T>
struct write_ref {
remote_ptr<T> ptr_;
// Constructor
write_ref(remote_ptr<T> ptr) : ptr_(ptr), wlock_(ptr.blob->mutex_) {}
~write_ref() { ptr_.blob->finished(); }
bool is_valid() const { return !ptr_.is_null(); }
remote_ptr<T> pointer() const { return ptr_; }
void reset() { ptr_.blob->finished(); wlock_.unlock(); }
void finish() { reset(); }
/** Cast to type reads the value */
operator T() const {
//return static_cast<T>(ptr_.blob->data_[ptr_.offset]);
T t;
ptr_.blob->read(ptr_.offset, (char*)&t, sizeof(T));
return t;
}
write_ref &operator=(const T &value) {
ptr_.blob->write(ptr_.offset, (char*)(&value), sizeof(T));
return *this;
}
std::unique_lock<std::shared_mutex> wlock_;
};
}
template <typename T>
ftl::read_ref<T> ftl::remote_ptr<T>::operator*() const {
return ftl::read_ref<T>(*this);
}
template <typename T>
ftl::read_ref<T> ftl::remote_ptr<T>::operator[](ptrdiff_t idx) const {
return ftl::read_ref<T>(*this + idx);
}
template <typename T>
ftl::write_ref<T> ftl::remote_ptr<T>::operator*() {
return ftl::write_ref<T>(*this);
}
template <typename T>
ftl::write_ref<T> ftl::remote_ptr<T>::operator[](ptrdiff_t idx) {
return ftl::write_ref<T>(*this + idx);
}
#endif // _FTL_P2P_RA_REMOTE_PTR_HPP_
#include <memory.h>
#include <ftl/net.hpp>
#include "ftl/p2p-rm/blob.hpp"
void ftl::rm::Blob::write(size_t offset, const char *data, size_t size) {
// Sanity check
if (offset + size > size_) throw -1;
// If local, write direct to data_, otherwise send over network
if (socket_ != NULL) {
// Send over network
//socket_->send(ftl::rm::MEMORY_WRITE, std::string(data,size));
} else {
// Copy locally
memcpy(data_+offset, data, size);
}
}
void ftl::rm::Blob::read(size_t offset, char *data, size_t size) {
// Sanity check
if (offset + size > size_) throw -1;
// If local, write direct to data_, otherwise send over network
if (socket_ != NULL) {
// Send over network
//socket_->send(ftl::rm::MEMORY_WRITE, std::string(data,size));
} else {
// Copy locally
memcpy(data,data_+offset, size);
}
}
#include "ftl/p2p-rm.hpp"
#include <ftl/uri.hpp>
#include <map>
static std::map<std::string, ftl::rm::Blob*> blobs;
void ftl::rm::reset() {
// TODO Loop delete
blobs.clear();
}
ftl::rm::Blob *ftl::rm::_lookupBlob(const char *uri) {
URI u(uri);
if (!u.isValid()) return NULL;
if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
if (u.getPathSegment(0) != "memory") return NULL;
return blobs[u.getBaseURI()];
}
ftl::rm::Blob *ftl::rm::_createBlob(const char *uri, size_t size) {
URI u(uri);
if (!u.isValid()) return NULL;
if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
if (u.getPathSegment(0) != "memory") return NULL;
if (blobs[u.getBaseURI()] != NULL) return NULL;
ftl::rm::Blob *b = new ftl::rm::Blob;
b->data_ = new char[size];
b->size_ = size;
b->socket_ = NULL;
blobs[u.getBaseURI()] = b;
return b;
}
include(CTest)
enable_testing()
add_executable(remote_ptr EXCLUDE_FROM_ALL
./tests.cpp
./remote_ptr.cpp
)
add_executable(p2p_rm EXCLUDE_FROM_ALL
./tests.cpp
../src/p2prm.cpp
../src/blob.cpp
./p2p-rm.cpp
)
target_link_libraries(p2p_rm uriparser)
add_test(Remote_ptr remote_ptr)
add_test(RM_API p2p_rm)
add_custom_target(tests)
add_dependencies(tests remote_ptr p2p_rm)
#include "catch.hpp"
#include <ftl/p2p-ra.hpp>
// ---- MOCK THE SOCKET --------------------------------------------------------
namespace ftl {
namespace net {
namespace raw {
class Socket {
public:
int close();
int send(uint32_t service, std::string &data);
//int send(uint32_t service, std::ostringstream &data);
//int send(uint32_t service, void *data, int length);
bool isConnected() { return true; };
void onMessage(sockdatahandler_t handler) { m_handler = handler; }
//void onError(sockerrorhandler_t handler) {}
//void onConnect(sockconnecthandler_t handler) {}
//void onDisconnect(sockdisconnecthandler_t handler) {}
};
}
}
}
// -----------------------------------------------------------------------------
SCENARIO( "Can get a remote array object", "[array]" ) {
GIVEN( "a valid uri" ) {
Array a = ftl::p2p::get("ftl://utu.fi/array/test1");
REQUIRE( a.isValid() );
}
}
This diff is collapsed.
#include "catch.hpp"
#include <ftl/p2p-rm.hpp>
SCENARIO( "ftl::rm::alloc()", "[alloc]" ) {
ftl::rm::reset();
GIVEN( "a valid URI and size" ) {
auto r = ftl::rm::alloc<int>("ftl://uti.fi/memory/test0", 10);
REQUIRE( r.is_valid() );
REQUIRE( r.size() == 10*sizeof(int) );
REQUIRE( r.is_local() );
}
GIVEN( "a valid URI and invalid size" ) {
auto r = ftl::rm::alloc<int>("ftl://uti.fi/memory/test0", 0);
REQUIRE( !r.is_valid() );
}
GIVEN( "an empty URI" ) {
auto r = ftl::rm::alloc<int>("", 10);
REQUIRE( !r.is_valid() );
}
GIVEN( "an invalid URI" ) {
auto r = ftl::rm::alloc<int>("noschema/test", 10);
REQUIRE( !r.is_valid() );
}
GIVEN( "an invalid URI schema" ) {
auto r = ftl::rm::alloc<int>("http://uti.fi/memory/test0", 10);
REQUIRE( !r.is_valid() );
}
GIVEN( "an invalid URI path segment" ) {
auto r = ftl::rm::alloc<int>("ftl://uti.fi/wrong/test0", 10);
REQUIRE( !r.is_valid() );
}
GIVEN( "a duplicate URI" ) {
auto a = ftl::rm::alloc<int>("ftl://uti.fi/memory/test0", 10);
auto b = ftl::rm::alloc<int>("ftl://uti.fi/memory/test0", 10);
REQUIRE( a.is_valid() );
REQUIRE( !b.is_valid() );
}
}
SCENARIO( "Getting a read_ref", "[get]" ) {
ftl::rm::reset();
auto a = ftl::rm::alloc<int>("ftl://uti.fi/memory/test1", 10);
REQUIRE( a.is_valid() );
GIVEN( "a valid URI to local memory" ) {
const auto r = ftl::rm::getReadable<int>("ftl://uti.fi/memory/test1");
REQUIRE( r.is_valid() );
REQUIRE( r.pointer().is_local() );
REQUIRE( r.pointer().blob == a.blob );
}
}
#include "catch.hpp"
#include <ftl/p2p-rm/remote_ptr.hpp>
#include <memory.h>
// Mock the BLOB
static bool is_finished = false;
void ftl::rm::Blob::finished() {
is_finished = true;
}
static bool blob_size;
static const int *blob_data;
void ftl::rm::Blob::write(size_t offset, const char *data, size_t size) {
blob_size = size;
blob_data = (const int*)data;
memcpy(data_+offset,data,size);
}
void ftl::rm::Blob::read(size_t offset, char *data, size_t size) {
memcpy(data,data_+offset,size);
}
SCENARIO( "Reading from a remote pointer", "[remote_ptr]" ) {
// Make a dummy blob
auto blob = new ftl::rm::Blob();
blob->data_ = (char*)(new int[5]);
((int*)(blob->data_))[0] = 55;
((int*)(blob->data_))[1] = 66;
GIVEN( "a valid POD const remote pointer" ) {
const ftl::remote_ptr<int> pa{blob,0};
REQUIRE( *pa == 55 );
REQUIRE( pa[0] == 55 );
REQUIRE( pa[1] == 66 );
}
}
SCENARIO( "Writing to a remote pointer", "[remote_ptr]" ) {
// Make a dummy blob
auto blob = new ftl::rm::Blob();
blob->data_ = (char*)(new int[5]);
((int*)(blob->data_))[0] = 55;
((int*)(blob->data_))[1] = 66;
GIVEN( "a valid POD remote pointer" ) {
ftl::remote_ptr<int> pa{blob,0};
is_finished = false;
*pa = 23;
REQUIRE( *pa == 23 );
REQUIRE( is_finished );
REQUIRE( pa[0] == 23 );
pa[1] = 25;
REQUIRE( pa[1] == 25 );
}
GIVEN( "a persistent write_ref" ) {
ftl::remote_ptr<int> pa{blob,0};
is_finished = false;
auto ra = *pa;
ra = 23;
REQUIRE( ra == 23 );
REQUIRE( !is_finished );
ra.reset();
REQUIRE( is_finished );
}
}
SCENARIO( "Writing to readonly pointer fails", "[remote_ptr]" ) {
// Make a dummy blob
auto blob = new ftl::rm::Blob();
blob->data_ = (char*)(new int[5]);
((int*)(blob->data_))[0] = 55;
((int*)(blob->data_))[1] = 66;
GIVEN( "a valid POD const remote pointer" ) {
const ftl::remote_ptr<int> pa{blob,0};
*pa = 23;
REQUIRE( *pa == 55 );
}
}
#define CATCH_CONFIG_MAIN
#include "catch.hpp"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment