From defbdd517064f08d4f35ecacc24219278a74b7bb Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nwpope@utu.fi> Date: Thu, 28 Feb 2019 14:39:41 +0200 Subject: [PATCH] Fix p2p unit test --- p2p-rm/include/ftl/p2p-rm/p2p.hpp | 47 ++++++++++- p2p-rm/test/p2p_unit.cpp | 130 +++++++++++++++++++++++++++--- 2 files changed, 164 insertions(+), 13 deletions(-) diff --git a/p2p-rm/include/ftl/p2p-rm/p2p.hpp b/p2p-rm/include/ftl/p2p-rm/p2p.hpp index badde7ca7..35cf3b970 100644 --- a/p2p-rm/include/ftl/p2p-rm/p2p.hpp +++ b/p2p-rm/include/ftl/p2p-rm/p2p.hpp @@ -36,10 +36,28 @@ class p2p : public ftl::net::Protocol { }); } - /*template <typename R, typename C, typename ...Args> - void bind_find_all(const std::string &name, R(C::*f)(Args...)) { - - }*/ + template <typename R, typename C, typename... Args> + void bind_find_all(const std::string &name, std::optional<R>(C::*f)(Args...)) { + bind(name, [this,name,f](const ftl::UUID &u, int ttl, Args... args) -> std::vector<R> { + std::vector<R> results; + + if (requests_.count(u) > 0) return results; + requests_[u] = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); + + auto result = (static_cast<C*>(this)->*f)(std::forward<Args>(args)...); + if (result) results.push_back(*result); + + // Otherwise we must search again + if (ttl == 0) return results; + + auto cres = _find_all<R>(name, u, ttl-1, args...); + if (cres.size() > 0) { + results.insert(results.end(), cres.begin(), cres.end()); + } + + return results; + }); + } template <typename R, typename... Args> std::optional<R> find_one(const std::string &name, Args... args) { @@ -50,6 +68,7 @@ class p2p : public ftl::net::Protocol { template <typename R, typename... Args> std::optional<R> _find_one(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) { + // TODO Use an async approach for (auto p : peers_) { auto res = p->call<std::optional<R>>(name, u, ttl, args...); if (res) return res; @@ -57,6 +76,26 @@ class p2p : public ftl::net::Protocol { return {}; } + template <typename R, typename... Args> + std::vector<R> find_all(const std::string &name, Args... args) { + ftl::UUID req; + int ttl = 10; + return _find_all<R>(name, req, ttl, std::forward<Args...>(args...)); + } + + template <typename R, typename... Args> + std::vector<R> _find_all(const std::string &name, const ftl::UUID &u, const int &ttl, Args... args) { + // TODO Use an async approach + std::vector<R> results; + for (auto p : peers_) { + auto res = p->call<std::vector<R>>(name, u, ttl, args...); + std::cout << "Result size = " << res.size() << std::endl; + if (res.size() > 0) + results.insert(results.end(), res.begin(), res.end()); + } + return results; + } + /*R find_all(const std::string &name, Args... args) { }*/ diff --git a/p2p-rm/test/p2p_unit.cpp b/p2p-rm/test/p2p_unit.cpp index 62c0732c2..99b2d2943 100644 --- a/p2p-rm/test/p2p_unit.cpp +++ b/p2p-rm/test/p2p_unit.cpp @@ -13,7 +13,8 @@ using ftl::net::Socket; // --- Mock -------------------------------------------------------------------- -static std::string last_send; +static std::vector<std::string> last_send; +static int last_pos = 0; using ftl::net::Socket; @@ -28,16 +29,21 @@ class MockSocket : public Socket { extern int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) { - std::cout << "SELECT CALLED" << std::endl; + // std::cout << "SELECT CALLED" << std::endl; + FD_ZERO(exceptfds); + FD_ZERO(readfds); + if ((size_t)last_pos < last_send.size()) FD_SET(0, readfds); return 1; } extern ssize_t recv(int sd, void *buf, size_t n, int f) { - std::cout << "Recv called : " << last_send.size() << std::endl; - int l = last_send.size(); + if ((size_t)last_pos >= last_send.size()) return 0; + //std::cout << "Recv called : " << last_send[last_pos].size() << std::endl; + int l = last_send[last_pos].size(); if (l == 0) return 0; - std::memcpy(buf, last_send.c_str(), l); - last_send = ""; + std::memcpy(buf, last_send[last_pos].c_str(), l); + //last_send.erase(last_send.begin()); + last_pos++; return l; } @@ -52,7 +58,10 @@ extern ssize_t writev(int sd, const struct iovec *v, int cnt) { bufp += v[i].iov_len; } - last_send = std::string(&buf[0], len); + //std::cout << "WRITEV " << len << std::endl; + + //if (last_send.size() != 0) std::cout << "ERROR MISSED MESSAGE " << last_send.size() << std::endl; + last_send.push_back(std::string(&buf[0], len)); return len; } @@ -61,8 +70,8 @@ extern std::vector<std::shared_ptr<ftl::net::Socket>> sockets; // --- Support ----------------------------------------------------------------- Dispatcher::response_t get_response() { - auto h = (ftl::net::Header*)last_send.data(); - const char *data = last_send.data() + sizeof(ftl::net::Header); + auto h = (ftl::net::Header*)last_send[last_pos].data(); + const char *data = last_send[last_pos].data() + sizeof(ftl::net::Header); auto unpacked = msgpack::unpack(data, h->size-4); Dispatcher::response_t the_result; unpacked.get().convert(the_result); @@ -91,6 +100,7 @@ SCENARIO("p2p::bind_find_one()", "[find_one]") { Mock_p2p p; std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket()); s->setProtocol(&p); + sockets.push_back(s); p.addPeer(s); GIVEN("a query that expects a valid result") { @@ -130,6 +140,71 @@ SCENARIO("p2p::bind_find_one()", "[find_one]") { REQUIRE( err.type == 0 ); } + last_pos = 0; + last_send.clear(); + ftl::net::stop(); +} + +SCENARIO("p2p::bind_find_all()", "[find_one]") { + class Mock_p2p : public p2p { + public: + Mock_p2p() : p2p("mock://") { + bind_find_all("test", &Mock_p2p::test); + } + + std::optional<int> test(int a) { + if (a == 2) return 44; + else return {}; + } + }; + + Mock_p2p p,p2; + std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket()); + s->setProtocol(&p); + sockets.push_back(s); + p.addPeer(s); + + GIVEN("a query that expects valid results") { + // Create a mock RPC message with expected result + ftl::UUID req; + int ttl = 10; + auto args_obj = std::make_tuple(req, ttl, 2); + auto call_obj = std::make_tuple(0,0,"test",args_obj); + std::stringstream buf; + msgpack::pack(buf, call_obj); + + s->mock_dispatchRPC(buf.str()); + + // Make sure we get a response + auto [kind,id,err,res] = get_response(); + auto vec = res.as<std::vector<int>>(); + REQUIRE( vec.size() == 1 ); + REQUIRE( vec[0] == 44 ); + REQUIRE( kind == 1 ); + REQUIRE( id == 0 ); + REQUIRE( err.type == 0 ); + } + + GIVEN("a query that expects no result") { + // Create a mock RPC message with expected result + ftl::UUID req; + int ttl = 10; + auto args_obj = std::make_tuple(req, ttl, 3); + auto call_obj = std::make_tuple(0,0,"test",args_obj); + std::stringstream buf; + msgpack::pack(buf, call_obj); + s->mock_dispatchRPC(buf.str()); + + // Make sure we get a response + auto [kind,id,err,res] = get_response(); + REQUIRE( res.as<std::vector<int>>().size() == 0 ); + REQUIRE( kind == 1 ); + REQUIRE( id == 0 ); + REQUIRE( err.type == 0 ); + } + + last_pos = 0; + last_send.clear(); ftl::net::stop(); } @@ -163,6 +238,43 @@ SCENARIO("p2p::find_one()", "[find_one]") { REQUIRE( !res.has_value() ); } + last_pos = 0; + last_send.clear(); + ftl::net::stop(); +} + +SCENARIO("p2p::find_all()", "[find_one]") { + class Mock_p2p : public p2p { + public: + Mock_p2p() : p2p("mock://") { + bind_find_all("test", &Mock_p2p::test); + } + + std::optional<int> test(int a) { + if (a == 2) return 44; + else return {}; + } + }; + + Mock_p2p p; + std::shared_ptr<MockSocket> s = std::shared_ptr<MockSocket>(new MockSocket()); + sockets.push_back(s); + s->setProtocol(&p); + p.addPeer(s); + + GIVEN("a query that expects a valid result") { + auto res = p.find_all<int>("test", 2); + REQUIRE( res.size() == 1 ); + REQUIRE( res[0] == 44 ); + } + + GIVEN("a query that expects no result") { + auto res = p.find_all<int>("test", 3); + REQUIRE( res.size() == 0 ); + } + + last_pos = 0; + last_send.clear(); ftl::net::stop(); } -- GitLab