diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index b9ded39e2ff4919fe4f82a7f772d17ba17b064e1..f5af2edef3d8a90cf4d2f9a0b53f25538b30e051 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -142,6 +142,21 @@ class Universe : public ftl::Configurable { template <typename R, typename... ARGS> R call(const UUID &pid, const std::string &name, ARGS... args); + + /** + * Non-blocking Remote Procedure Call using a callback function. + * + * @param pid Peer GUID + * @param name RPC Function name. + * @param cb Completion callback. + * @param args A variable number of arguments for RPC function. + * + * @return A call id for use with cancelCall() if needed. + */ + template <typename R, typename... ARGS> + int asyncCall(const UUID &pid, const std::string &name, + std::function<void(const R&)> cb, + ARGS... args); template <typename... ARGS> bool send(const UUID &pid, const std::string &name, ARGS... args); @@ -392,6 +407,16 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { return p->call<R>(name, args...); } +template <typename R, typename... ARGS> +int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::function<void(const R&)> cb, ARGS... args) { + Peer *p = getPeer(pid); + if (p == nullptr || !p->isConnected()) { + if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string()); + else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string()); + } + return p->asyncCall(name, cb, args...); +} + template <typename... ARGS> bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) { Peer *p = getPeer(pid); diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index c91ca80aa03e9f4fa1990e05d9ddf1a2931f4d88..b12480d0e5126feba87e60d49458a487158d3e00 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -271,26 +271,27 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun net_->send(peer_, uri_, (short)0, spkt, pkt); - if (true) { // TODO: Not every time + // FIXME: Find a way to use this for correct stream latency info + if (false) { // TODO: Not every time auto start = std::chrono::high_resolution_clock::now(); - int64_t mastertime; + //int64_t mastertime; try { - mastertime = net_->call<int64_t>(peer_, "__ping__"); + net_->asyncCall<int64_t>(peer_, "__ping__", [this, start](const int64_t &mastertime) { + auto elapsed = std::chrono::high_resolution_clock::now() - start; + int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); + clock_adjust_ = mastertime - (ftl::timer::get_time() + (latency/2)); + + if (clock_adjust_ > 0) { + LOG(INFO) << "Clock adjustment: " << clock_adjust_; + } + }); } catch (...) { LOG(ERROR) << "Ping failed"; // Reset time peer and remove timer time_peer_ = ftl::UUID(0); return false; } - - auto elapsed = std::chrono::high_resolution_clock::now() - start; - int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); - clock_adjust_ = mastertime - (ftl::timer::get_time() + (latency/2)); - - if (clock_adjust_ > 0) { - LOG(INFO) << "Clock adjustment: " << clock_adjust_; - } } return true; } @@ -347,27 +348,26 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { // Sync clocks! if (ftl::timer::isClockSlave() && p.id() == time_peer_) { auto start = std::chrono::high_resolution_clock::now(); - int64_t mastertime; try { - mastertime = net_->call<int64_t>(time_peer_, "__ping__"); + net_->asyncCall<int64_t>(time_peer_, "__ping__", [this, start](const int64_t &mastertime) { + auto elapsed = std::chrono::high_resolution_clock::now() - start; + int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); + auto clock_adjust = mastertime - (ftl::timer::get_time() + (latency/2)); + + if (clock_adjust > 0) { + LOG(INFO) << "Clock adjustment: " << clock_adjust; + //LOG(INFO) << "Latency: " << (latency / 2); + //LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; + ftl::timer::setClockAdjustment(clock_adjust); + } + }); } catch (...) { LOG(ERROR) << "Ping failed"; // Reset time peer and remove timer time_peer_ = ftl::UUID(0); return false; } - - auto elapsed = std::chrono::high_resolution_clock::now() - start; - int64_t latency = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count(); - auto clock_adjust = mastertime - (ftl::timer::get_time() + (latency/2)); - - if (clock_adjust > 0) { - LOG(INFO) << "Clock adjustment: " << clock_adjust; - //LOG(INFO) << "Latency: " << (latency / 2); - //LOG(INFO) << "Local: " << std::chrono::time_point_cast<std::chrono::milliseconds>(start).time_since_epoch().count() << ", master: " << mastertime; - ftl::timer::setClockAdjustment(clock_adjust); - } } return false;