Skip to content
Snippets Groups Projects
Commit 86d7dfc6 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

P2P Protocol and unit tests

parent 48ed2181
No related branches found
No related tags found
No related merge requests found
#ifndef _FTL_RM_P2P_HPP_
#define _FTL_RM_P2P_HPP_
#include <ftl/uuid.hpp>
#include <optional>
#include <string>
#include <map>
#include <chrono>
#include <vector>
#include <memory>
#include <ftl/net/protocol.hpp>
#include <iostream>
namespace ftl {
namespace net {
class p2p : public ftl::net::Protocol {
public:
p2p(const char *uri) : Protocol(uri) {}
void addPeer(std::shared_ptr<ftl::net::Socket> s) { peers_.push_back(s); };
template <typename R, typename C, typename... Args>
void bind_find_one(const std::string &name, std::optional<R>(C::*f)(Args...)) {
bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::optional<R> {
if (requests_.count(u) > 0) return {};
requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...);
if (result) return result;
// Otherwise we must search again
if (ttl == 0) return {};
return _find_one<R>(name, u, ttl-1, args...);
});
}
/*template <typename R, typename C, typename ...Args>
void bind_find_all(const std::string &name, R(C::*f)(Args...)) {
}*/
template <typename R, typename... Args>
std::optional<R> find_one(const std::string &name, Args... args) {
ftl::UUID req;
int ttl = 10;
return _find_one<R>(name, req, ttl, args...);
}
template <typename R, typename... Args>
std::optional<R> _find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) {
for (auto p : peers_) {
auto res = p->call<std::optional<R>>(name, u, ttl, args...);
if (res) return res;
}
return {};
}
/*R find_all(const std::string &name, Args... args) {
}*/
private:
std::unordered_map<ftl::UUID,long int> requests_;
std::vector<std::shared_ptr<ftl::net::Socket>> peers_;
};
}; // namespace net
}; // namespace ftl
#endif // _FTL_RM_P2P_HPP_
...@@ -74,8 +74,7 @@ void Cluster::reset() { ...@@ -74,8 +74,7 @@ void Cluster::reset() {
} }
void Cluster::_registerRPC() { void Cluster::_registerRPC() {
bind("getowner", [this](const UUID &u, int ttl, const std::string &uri) { return getOwner_RPC(u,ttl,uri); }); bind("getowner", member(&Cluster::getOwner_RPC));
//bind("getowner", member(&Cluster::getOwner_RPC));
bind("nop", []() { return true; }); bind("nop", []() { return true; });
...@@ -90,6 +89,7 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) { ...@@ -90,6 +89,7 @@ void Cluster::addPeer(shared_ptr<Socket> &p, bool incoming) {
//p.setProtocol(this); //p.setProtocol(this);
peers_.push_back(p); peers_.push_back(p);
//p2p::addPeer(p);
if (!incoming) { if (!incoming) {
p->onConnect([this](Socket &s) { p->onConnect([this](Socket &s) {
...@@ -119,7 +119,7 @@ Blob *Cluster::_lookup(const char *uri) { ...@@ -119,7 +119,7 @@ Blob *Cluster::_lookup(const char *uri) {
if (!u.isValid()) return NULL; if (!u.isValid()) return NULL;
if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL; if (u.getScheme() != ftl::URI::SCHEME_FTL) return NULL;
//if (u.getPathSegment(0) != "memory") return NULL; //if (u.getPathSegment(0) != "memory") return NULL;
if (u.getHost() != root_) { LOG(ERROR) << "Non matching host : " << u.getHost() << " - " << root_ << std::endl; return NULL; } if (u.getHost() != root_) { LOG(ERROR) << "Non matching URI base : " << u.getHost() << " - " << root_ << std::endl; return NULL; }
auto b = blobs_[u.getBaseURI()]; auto b = blobs_[u.getBaseURI()];
std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl; std::cout << "Blob Found for " << u.getBaseURI() << " = " << (b != nullptr) << std::endl;
......
...@@ -3,7 +3,7 @@ add_executable(mapped_ptr_unit ...@@ -3,7 +3,7 @@ add_executable(mapped_ptr_unit
./mapped_ptr_unit.cpp ./mapped_ptr_unit.cpp
../src/blob.cpp ../src/blob.cpp
) )
target_link_libraries(mapped_ptr_unit gflags glog) target_link_libraries(mapped_ptr_unit gflags glog uuid)
add_executable(cluster_unit add_executable(cluster_unit
./tests.cpp ./tests.cpp
...@@ -12,6 +12,12 @@ add_executable(cluster_unit ...@@ -12,6 +12,12 @@ add_executable(cluster_unit
) )
target_link_libraries(cluster_unit uriparser ftlnet gflags glog uuid) target_link_libraries(cluster_unit uriparser ftlnet gflags glog uuid)
add_executable(p2p_unit
./tests.cpp
./p2p_unit.cpp
)
target_link_libraries(p2p_unit ftlnet uriparser gflags glog uuid)
add_executable(p2p_integration add_executable(p2p_integration
./tests.cpp ./tests.cpp
../src/cluster.cpp ../src/cluster.cpp
...@@ -27,10 +33,11 @@ add_executable(peer_test ...@@ -27,10 +33,11 @@ add_executable(peer_test
../src/cluster.cpp ../src/cluster.cpp
../src/blob.cpp ../src/blob.cpp
) )
target_link_libraries(peer_test uriparser ftlnet gflags glog) target_link_libraries(peer_test uriparser ftlnet gflags glog uuid)
add_test(Mapped_ptrUnitTest mapped_ptr_unit) add_test(Mapped_ptrUnitTest mapped_ptr_unit)
add_test(ClusterUnitTest cluster_unit) add_test(ClusterUnitTest cluster_unit)
add_test(P2PUnitTest p2p_unit)
add_custom_target(tests) add_custom_target(tests)
add_dependencies(tests mapped_ptr_unit cluster_unit peer_test p2p_integration) add_dependencies(tests mapped_ptr_unit cluster_unit peer_test p2p_integration)
......
#include "catch.hpp"
#include <ftl/net/dispatcher.hpp>
#include <ftl/net/protocol.hpp>
#include <ftl/net/socket.hpp>
#include <memory>
#include <iostream>
#include <sys/select.h>
using ftl::net::Dispatcher;
using ftl::net::Protocol;
using ftl::net::Socket;
// --- Mock --------------------------------------------------------------------
static std::string last_send;
using ftl::net::Socket;
class MockSocket : public Socket {
public:
MockSocket() : Socket(0) {}
void mock_dispatchRPC(const std::string &d) { protocol()->dispatchRPC(*this,d); }
void mock_data() { data(); }
};
extern int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout) {
std::cout << "SELECT CALLED" << std::endl;
return 1;
}
extern ssize_t recv(int sd, void *buf, size_t n, int f) {
std::cout << "Recv called : " << last_send.size() << std::endl;
int l = last_send.size();
if (l == 0) return 0;
std::memcpy(buf, last_send.c_str(), l);
last_send = "";
return l;
}
extern ssize_t writev(int sd, const struct iovec *v, int cnt) {
size_t len = 0; //v[0].iov_len+v[1].iov_len;
char buf[1000];
char *bufp = &buf[0];
for (auto i=0; i<cnt; i++) {
std::memcpy(bufp,v[i].iov_base,v[i].iov_len);
len += v[i].iov_len;
bufp += v[i].iov_len;
}
last_send = std::string(&buf[0], len);
return len;
}
extern std::vector<std::shared_ptr<ftl::net::Socket>> sockets;
// --- Support -----------------------------------------------------------------
Dispatcher::response_t get_response() {
auto h = (ftl::net::Header*)last_send.data();
const char *data = last_send.data() + sizeof(ftl::net::Header);
auto unpacked = msgpack::unpack(data, h->size-4);
Dispatcher::response_t the_result;
unpacked.get().convert(the_result);
return the_result;
}
// --- Tests -------------------------------------------------------------------
#include <ftl/p2p-rm/p2p.hpp>
using ftl::net::p2p;
SCENARIO("p2p::bind_find_one()", "[find_one]") {
class Mock_p2p : public p2p {
public:
Mock_p2p() : p2p("mock://") {
bind_find_one("test", &Mock_p2p::test);
}
std::optional<int> test(int a) {
if (a == 2) return 44;
else return {};
}
};
Mock_p2p p;
std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket());
s->setProtocol(&p);
p.addPeer(s);
GIVEN("a query that expects a valid result") {
// Create a mock RPC message with expected result
ftl::UUID req;
int ttl = 10;
auto args_obj = std::make_tuple(req, ttl, 2);
auto call_obj = std::make_tuple(0,0,"test",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
s->mock_dispatchRPC(buf.str());
// Make sure we get a response
auto [kind,id,err,res] = get_response();
REQUIRE( *(res.as<std::optional<int>>()) == 44 );
REQUIRE( kind == 1 );
REQUIRE( id == 0 );
REQUIRE( err.type == 0 );
}
GIVEN("a query that expects no result") {
// Create a mock RPC message with expected result
ftl::UUID req;
int ttl = 10;
auto args_obj = std::make_tuple(req, ttl, 3);
auto call_obj = std::make_tuple(0,0,"test",args_obj);
std::stringstream buf;
msgpack::pack(buf, call_obj);
s->mock_dispatchRPC(buf.str());
// Make sure we get a response
auto [kind,id,err,res] = get_response();
REQUIRE( !res.as<std::optional<int>>() );
REQUIRE( kind == 1 );
REQUIRE( id == 0 );
REQUIRE( err.type == 0 );
}
ftl::net::stop();
}
SCENARIO("p2p::find_one()", "[find_one]") {
class Mock_p2p : public p2p {
public:
Mock_p2p() : p2p("mock://") {
bind_find_one("test", &Mock_p2p::test);
}
std::optional<int> test(int a) {
if (a == 2) return 44;
else return {};
}
};
Mock_p2p p;
std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket());
sockets.push_back(s);
s->setProtocol(&p);
p.addPeer(s);
GIVEN("a query that expects a valid result") {
auto res = p.find_one<int>("test", 2);
REQUIRE( res.has_value() );
REQUIRE( *res == 44 );
}
GIVEN("a query that expects no result") {
auto res = p.find_one<int>("test", 3);
REQUIRE( !res.has_value() );
}
ftl::net::stop();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment