diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index a8b12288eff07d15c592ff381bb5a6261cb2130c..6ac41b2f7477fde4014f49a7e05ba06ba5724844 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -16,6 +16,7 @@ #include <ftl/uri.hpp> #include <ftl/uuid.hpp> #include <ftl/threads.hpp> +#include <ftl/timer.hpp> #include <iostream> #include <sstream> @@ -228,6 +229,8 @@ class Peer { void _updateURI(); int _send(); + + void _waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name); template<typename... ARGS> void _trigger(const std::vector<std::function<void(Peer &, ARGS...)>> &hs, ARGS... args) { @@ -319,35 +322,17 @@ void Peer::bind(const std::string &name, F func) { template <typename R, typename... ARGS> R Peer::call(const std::string &name, ARGS... args) { bool hasreturned = false; - std::mutex m; + //std::mutex m; std::condition_variable cv; R result; int id = asyncCall<R>(name, [&](const R &r) { - //UNIQUE_LOCK(m,lk); - std::unique_lock<std::mutex> lk(m); - hasreturned = true; result = r; - lk.unlock(); + hasreturned = true; cv.notify_one(); }, std::forward<ARGS>(args)...); - // While waiting, do some other thread jobs... - /*std::function<void(int)> j; - while (!hasreturned && (bool)(j=ftl::pool.pop())) { - LOG(INFO) << "Doing job whilst waiting..."; - j(-1); - }*/ - - { // Block thread until async callback notifies us - std::unique_lock<std::mutex> lk(m); - cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); - } - - if (!hasreturned) { - cancelCall(id); - throw FTL_Error("RPC failed with timeout: " << name); - } + _waitCall(id, cv, hasreturned, name); return result; } diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 90c12419b83da749178bd1d4f80790a68bd45fb3..f5af2edef3d8a90cf4d2f9a0b53f25538b30e051 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -142,6 +142,21 @@ class Universe : public ftl::Configurable { template <typename R, typename... ARGS> R call(const UUID &pid, const std::string &name, ARGS... args); + + /** + * Non-blocking Remote Procedure Call using a callback function. + * + * @param pid Peer GUID + * @param name RPC Function name. + * @param cb Completion callback. + * @param args A variable number of arguments for RPC function. + * + * @return A call id for use with cancelCall() if needed. + */ + template <typename R, typename... ARGS> + int asyncCall(const UUID &pid, const std::string &name, + std::function<void(const R&)> cb, + ARGS... args); template <typename... ARGS> bool send(const UUID &pid, const std::string &name, ARGS... args); @@ -284,7 +299,8 @@ template <typename... ARGS> void Universe::broadcast(const std::string &name, ARGS... args) { SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { - if (p->isConnected()) p->send(name, args...); + if (!p->waitConnection()) continue; + p->send(name, args...); } } @@ -310,8 +326,9 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { + if (!p->waitConnection()) continue; count++; - if (p->isConnected()) record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); + record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); } lk.unlock(); @@ -356,9 +373,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { std::map<Peer*, int> record; SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { - if (!p->isConnected()) { - continue; - } + if (!p->waitConnection()) continue; sentcount++; record[p] = p->asyncCall<std::vector<R>>(name, handler, args...); } @@ -392,6 +407,16 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { return p->call<R>(name, args...); } +template <typename R, typename... ARGS> +int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::function<void(const R&)> cb, ARGS... args) { + Peer *p = getPeer(pid); + if (p == nullptr || !p->isConnected()) { + if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string()); + else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string()); + } + return p->asyncCall(name, cb, args...); +} + template <typename... ARGS> bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) { Peer *p = getPeer(pid); diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 24aac7bb3eb6076d8f662fc4ed610e421d357028..b86b3553b6213920b890794308380511d751294b 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -519,9 +519,16 @@ bool Peer::_data() { obj.convert(hs); if (get<1>(hs) != "__handshake__") { - _badClose(false); - LOG(ERROR) << "Missing handshake - got '" << get<1>(hs) << "'"; - return false; + LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'"; + + // Allow a small delay in case another thread is doing the handshake + lk.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (status_ == kConnecting) { + LOG(ERROR) << "Failed to get handshake"; + _badClose(false); + return false; + } } else { // Must handle immediately with no other thread able // to read next message before completion. @@ -530,9 +537,16 @@ bool Peer::_data() { return true; } } catch(...) { - _badClose(false); - LOG(ERROR) << "Bad first message format"; - return false; + LOG(WARNING) << "Bad first message format... waiting"; + + // Allow a small delay in case another thread is doing the handshake + lk.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (status_ == kConnecting) { + LOG(ERROR) << "Failed to get handshake"; + _badClose(false); + return false; + } } } } @@ -580,8 +594,33 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { _send(); } +void Peer::_waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name) { + std::mutex m; + + int64_t beginat = ftl::timer::get_time(); + std::function<void(int)> j; + while (!hasreturned) { + // Attempt to do a thread pool job if available + if ((bool)(j=ftl::pool.pop())) { + j(-1); + } else { + // Block for a little otherwise + std::unique_lock<std::mutex> lk(m); + cv.wait_for(lk, std::chrono::milliseconds(2), [&hasreturned]{return hasreturned;}); + } + + if (ftl::timer::get_time() - beginat > 1000) break; + } + + if (!hasreturned) { + cancelCall(id); + throw FTL_Error("RPC failed with timeout: " << name); + } +} + bool Peer::waitConnection() { if (status_ == kConnected) return true; + else if (status_ != kConnecting) return false; std::mutex m; //UNIQUE_LOCK(m,lk); @@ -594,7 +633,7 @@ bool Peer::waitConnection() { } }); - cv.wait_for(lk, seconds(5)); + cv.wait_for(lk, seconds(1), [this](){return status_ == kConnected;}); universe_->removeCallback(h); return status_ == kConnected; } diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index 7cca78a481a33f57e037765b206d39bb77745867..200e3c5f999e1428128dc2f382d92dee425b88ba 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -1,5 +1,6 @@ #include "catch.hpp" #include <ftl/net.hpp> +#include <ftl/timer.hpp> #include <thread> #include <chrono> @@ -102,7 +103,7 @@ TEST_CASE("Universe::onConnect()", "[net]") { }); b.connect("tcp://localhost:7077")->waitConnection(); - sleep_for(milliseconds(100)); + //sleep_for(milliseconds(100)); REQUIRE( done ); } } @@ -124,7 +125,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") { p->waitConnection(); sleep_for(milliseconds(100)); p->close(); - sleep_for(milliseconds(1100)); + sleep_for(milliseconds(100)); REQUIRE( done ); } @@ -139,7 +140,7 @@ TEST_CASE("Universe::onDisconnect()", "[net]") { p->waitConnection(); sleep_for(milliseconds(100)); p->close(); - sleep_for(milliseconds(1100)); + sleep_for(milliseconds(100)); REQUIRE( done ); } } @@ -158,7 +159,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { b.broadcast("done"); - sleep_for(milliseconds(100)); + sleep_for(milliseconds(50)); REQUIRE( !done ); } @@ -172,7 +173,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { b.broadcast("hello"); - sleep_for(milliseconds(100)); + while (!done) sleep_for(milliseconds(5)); REQUIRE( done ); } @@ -187,7 +188,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { b.broadcast("hello", 676); - sleep_for(milliseconds(100)); + while (done == 0) sleep_for(milliseconds(5)); REQUIRE( done == 676 ); } @@ -209,7 +210,7 @@ TEST_CASE("Universe::broadcast()", "[net]") { }); REQUIRE( a.numberOfPeers() == 2 ); - sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready a.broadcast("hello", 676); @@ -251,10 +252,54 @@ TEST_CASE("Universe::findAll()", "") { return {6,7,8}; }); - sleep_for(milliseconds(100)); // NOTE: Binding might not be ready + //sleep_for(milliseconds(100)); // NOTE: Binding might not be ready auto res = a.findAll<int>("test_all"); REQUIRE( (res.size() == 6) ); REQUIRE( (res[0] == 3 || res[0] == 6) ); } } + +TEST_CASE("Peer::call() __ping__", "") { + Universe a; + Universe b; + Universe c; + + a.listen("tcp://localhost:7077"); + auto *p = b.connect("tcp://localhost:7077"); + p->waitConnection(); + + SECTION("single ping") { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE((res <= ftl::timer::get_time() && res > 0)); + } + + SECTION("large number of pings") { + for (int i=0; i<100; ++i) { + int64_t res = p->call<int64_t>("__ping__"); + REQUIRE(res > 0); + } + } + + SECTION("large number of parallel pings") { + std::atomic<int> count = 0; + for (int i=0; i<100; ++i) { + ftl::pool.push([&count, p](int id) { + int64_t res = p->call<int64_t>("__ping__"); + count++; + }); + } + + while (count < 100) std::this_thread::sleep_for(milliseconds(5)); + } + + SECTION("single invalid rpc") { + bool errored = false; + try { + int64_t res = p->call<int64_t>("__ping2__"); + } catch (const ftl::exception &e) { + errored = true; + } + REQUIRE(errored); + } +} diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index c91ca80aa03e9f4fa1990e05d9ddf1a2931f4d88..b12480d0e5126feba87e60d49458a487158d3e00 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -271,26 +271,27 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun net_->send(peer_, uri_, (short)0, spkt, pkt); - if (true) { // TODO: Not every time + // FIXME: Find a way to use this for correct stream latency info + if (false) { // TODO: Not every time auto start = std::chrono::high_resolution_clock::now(); - int64_t mastertime; + //int64_t mastertime; try { - mastertime = net_->call<int64_t>(peer_, "__ping__"); + net_->asyncCall<int64_t>(peer_, "__ping__", [this, start](const int64_t &mastertime) { + auto elapsed = std::chrono::high_resolution_clock::now() - start; + int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); + clock_adjust_ = mastertime - (ftl::timer::get_time() + (latency/2)); + + if (clock_adjust_ > 0) { + LOG(INFO) << "Clock adjustment: " << clock_adjust_; + } + }); } catch (...) { LOG(ERROR) << "Ping failed"; // Reset time peer and remove timer time_peer_ = ftl::UUID(0); return false; } - - auto elapsed = std::chrono::high_resolution_clock::now() - start; - int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); - clock_adjust_ = mastertime - (ftl::timer::get_time() + (latency/2)); - - if (clock_adjust_ > 0) { - LOG(INFO) << "Clock adjustment: " << clock_adjust_; - } } return true; } @@ -347,27 +348,26 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { // Sync clocks! if (ftl::timer::isClockSlave() && p.id() == time_peer_) { auto start = std::chrono::high_resolution_clock::now(); - int64_t mastertime; try { - mastertime = net_->call<int64_t>(time_peer_, "__ping__"); + net_->asyncCall<int64_t>(time_peer_, "__ping__", [this, start](const int64_t &mastertime) { + auto elapsed = std::chrono::high_resolution_clock::now() - start; + int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); + auto clock_adjust = mastertime - (ftl::timer::get_time() + (latency/2)); + + if (clock_adjust > 0) { + LOG(INFO) << "Clock adjustment: " << clock_adjust; + //LOG(INFO) << "Latency: " << (latency / 2); + //LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; + ftl::timer::setClockAdjustment(clock_adjust); + } + }); } catch (...) { LOG(ERROR) << "Ping failed"; // Reset time peer and remove timer time_peer_ = ftl::UUID(0); return false; } - - auto elapsed = std::chrono::high_resolution_clock::now() - start; - int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); - auto clock_adjust = mastertime - (ftl::timer::get_time() + (latency/2)); - - if (clock_adjust > 0) { - LOG(INFO) << "Clock adjustment: " << clock_adjust; - //LOG(INFO) << "Latency: " << (latency / 2); - //LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; - ftl::timer::setClockAdjustment(clock_adjust); - } } return false;