From b8ac6d512ff7dbcde7d6ff50b906c152fca1ea5a Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Sat, 1 Oct 2022 13:29:08 +0000 Subject: [PATCH] #28 RPC returns errors --- src/dispatcher.cpp | 28 +++++++++---- src/dispatcher.hpp | 2 +- src/exception.cpp | 4 +- src/peer.cpp | 19 ++++++--- src/peer.hpp | 69 ++++++++++++------------------ src/protocol/connection.cpp | 24 +++++------ src/protocol/tcp.cpp | 6 +-- src/protocol/tls.cpp | 4 +- src/protocol/websocket.cpp | 2 +- src/socket/socket_linux.cpp | 10 ++--- src/socket/socket_windows.cpp | 4 +- src/streams/filestream.cpp | 12 +++--- src/streams/netstream.cpp | 2 +- src/streams/packetmanager.cpp | 4 +- src/universe.cpp | 2 +- src/universe.hpp | 79 ++++++++++++----------------------- test/mocks/connection.cpp | 2 +- test/mocks/connection.hpp | 2 +- test/peer_unit.cpp | 42 ++++++++++++++++--- 19 files changed, 164 insertions(+), 153 deletions(-) diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index e668d1e..375e72f 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -9,6 +9,7 @@ #include "dispatcher.hpp" #include "peer.hpp" #include <ftl/exception.hpp> +#include <msgpack.hpp> using ftl::net::Peer; using ftl::net::Dispatcher; @@ -48,7 +49,23 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { dispatch_notification(s, msg); break; case 4: - dispatch_call(s, msg); + if (msg.via.array.ptr[0].via.i64 == 1) { + response_t response; + try { + msg.convert(response); + } catch(...) { + throw FTL_Error("Bad response format"); + } + auto &&err = std::get<2>(response); + auto &&args = std::get<3>(response); + + s._dispatchResponse( + std::get<1>(response), + err, + args); + } else { + dispatch_call(s, msg); + } break; default: throw FTL_Error("Unrecognised msgpack : " << msg.via.array.size); @@ -71,9 +88,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { auto &&args = std::get<3>(the_call); // assert(type == 0); - if (type == 1) { - s._dispatchResponse(id, name, args); - } else if (type == 0) { + if (type == 0) { DLOG(2) << "RPC " << name << "() <- " << s.getURI(); auto func = _locateHandler(name); @@ -81,10 +96,9 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { if (func) { try { auto result = (*func)(s, args); - s._sendResponse(id, name, result->get()); + s._sendResponse(id, result->get()); } catch (const std::exception &e) { - throw FTL_Error("Exception when attempting to call RPC " << name << " (" << e.what() << ")"); - // FIXME: Send the error in the response. + s._sendErrorResponse(id, msgpack::object(e.what())); } } else { throw FTL_Error("No binding found for " << name); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 333c77d..ab0dbc2 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -293,7 +293,7 @@ class Dispatcher { using notification_t = std::tuple<int8_t, std::string, msgpack::object>; using response_t = - std::tuple<uint32_t, uint32_t, std::string, msgpack::object>; + std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>; private: Dispatcher *parent_; diff --git a/src/exception.cpp b/src/exception.cpp index 8ab442c..cdee668 100644 --- a/src/exception.cpp +++ b/src/exception.cpp @@ -96,9 +96,9 @@ exception::exception(const ftl::Formatter &msg) : msg_(msg.str()), processed_(fa exception::~exception() { if (!processed_) { // what() or ignore() have not been called. - LOG(ERROR) << "Unhandled exception: " << what(); + DLOG(ERROR) << "Unhandled exception: " << what(); #ifdef __GNUC__ - LOG(ERROR) << "Trace:\n" << decode_backtrace(); + DLOG(ERROR) << "Trace:\n" << decode_backtrace(); #endif } } diff --git a/src/peer.cpp b/src/peer.cpp index ba73a30..be479fd 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -92,7 +92,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, const UUID &pid) net_->_notifyError(this, ftl::protocol::Error::kBadHandshake, "invalid magic during handshake"); _close(reconnect_on_protocol_error_); } else { - if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!"; + if (version != ftl::net::kVersion) DLOG(WARNING) << "net protocol using different versions!"; DLOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening") << " peer) handshake received from remote for " << pid.to_string(); @@ -451,7 +451,7 @@ bool Peer::_data() { return true; } -void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::object &res) { +void Peer::_dispatchResponse(uint32_t id, msgpack::object &err, msgpack::object &res) { UNIQUE_LOCK(cb_mtx_, lk); if (callbacks_.count(id) > 0) { // Allow for unlock before callback @@ -461,12 +461,12 @@ void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::obje // Call the callback with unpacked return value try { - (*cb)(res); + cb(res, err); } catch(std::exception &e) { net_->_notifyError(this, Error::kRPCResponse, e.what()); } } else { - net_->_notifyError(this, Error::kRPCResponse, "Missing RPC callback for result - discarding: " + name); + net_->_notifyError(this, Error::kRPCResponse, "Missing RPC callback for result - discarding"); } } @@ -477,8 +477,15 @@ void Peer::cancelCall(int id) { } } -void Peer::_sendResponse(uint32_t id, const std::string &name, const msgpack::object &res) { - Dispatcher::response_t res_obj = std::make_tuple(1, id, name, res); +void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { + Dispatcher::response_t res_obj = std::make_tuple(1, id, msgpack::object(), res); + UNIQUE_LOCK(send_mtx_, lk); + msgpack::pack(send_buf_, res_obj); + _send(); +} + +void Peer::_sendErrorResponse(uint32_t id, const msgpack::object &res) { + Dispatcher::response_t res_obj = std::make_tuple(1, id, res, msgpack::object()); UNIQUE_LOCK(send_mtx_, lk); msgpack::pack(send_buf_, res_obj); _send(); diff --git a/src/peer.hpp b/src/peer.hpp index 7541d89..5058543 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -12,6 +12,7 @@ #include <tuple> #include <vector> +#include <future> #include <type_traits> #include <thread> #include <condition_variable> @@ -45,20 +46,6 @@ namespace net { class Universe; -struct virtual_caller { - virtual void operator()(msgpack::object &o) = 0; -}; - -template <typename T> -struct caller : virtual_caller { - explicit caller(const std::function<void(const T&)> &f) : f_(f) {} - void operator()(msgpack::object &o) override { - T r = o.as<T>(); - f_(r); - } - std::function<void(const T&)> f_; -}; - /** * To be constructed using the Universe::connect() method and not to be * created directly. @@ -155,9 +142,7 @@ class Peer { * @return A call id for use with cancelCall() if needed. */ template <typename T, typename... ARGS> - int asyncCall(const std::string &name, - std::function<void(const T&)> cb, - ARGS... args); + std::future<T> asyncCall(const std::string &name, ARGS... args); /** * Used to terminate an async call if the response is not required. @@ -232,8 +217,9 @@ class Peer { // close socket without sending disconnect message void _close(bool retry = true); - void _dispatchResponse(uint32_t id, const std::string &name, msgpack::object &obj); - void _sendResponse(uint32_t id, const std::string &name, const msgpack::object &obj); + void _dispatchResponse(uint32_t id, msgpack::object &err, msgpack::object &res); + void _sendResponse(uint32_t id, const msgpack::object &obj); + void _sendErrorResponse(uint32_t id, const msgpack::object &obj); /** * Get the internal OS dependent socket. @@ -287,7 +273,7 @@ class Peer { std::unique_ptr<internal::SocketConnection> sock_; std::unique_ptr<ftl::net::Dispatcher> disp_; // For RPC call dispatch - std::map<int, std::unique_ptr<virtual_caller>> callbacks_; + std::map<int, std::function<void(const msgpack::object&, const msgpack::object&)>> callbacks_; std::atomic_int job_count_ = 0; // Ensure threads are done before destructing std::atomic_int connection_count_ = 0; // Number of successful connections total @@ -325,36 +311,35 @@ void Peer::bind(const std::string &name, F func) { template <typename R, typename... ARGS> R Peer::call(const std::string &name, ARGS... args) { - bool hasreturned = false; - std::condition_variable cv; - - R result; - int id = asyncCall<R>(name, [&](const R &r) { - result = r; - hasreturned = true; - cv.notify_one(); - }, std::forward<ARGS>(args)...); - - _waitCall(id, cv, hasreturned, name); - - return result; + auto f = asyncCall<R>(name, std::forward<ARGS>(args)...); + if (f.wait_for(std::chrono::seconds(1)) != std::future_status::ready) { + throw FTL_Error("Call timeout: " << name); + } + return f.get(); } template <typename T, typename... ARGS> -int Peer::asyncCall( - const std::string &name, - // cppcheck-suppress * - std::function<void(const T&)> cb, - ARGS... args) { +std::future<T> Peer::asyncCall(const std::string &name, ARGS... args) { auto args_obj = std::make_tuple(args...); uint32_t rpcid = 0; + std::shared_ptr<std::promise<T>> promise = std::make_shared<std::promise<T>>(); + std::future<T> future = promise->get_future(); + { - // Could this be the problem???? UNIQUE_LOCK(cb_mtx_, lk); - // Register the CB rpcid = rpcid__++; - callbacks_[rpcid] = std::make_unique<caller<T>>(cb); + callbacks_[rpcid] = [promise](const msgpack::object &res, const msgpack::object &err) { + if (err.is_nil()) { + T value; + res.convert<T>(value); + promise->set_value(value); + } else { + std::string errmsg; + err.convert<std::string>(errmsg); + promise->set_exception(std::make_exception_ptr(ftl::exception(ftl::Formatter() << errmsg))); + } + }; } auto call_obj = std::make_tuple(0, rpcid, name, args_obj); @@ -362,7 +347,7 @@ int Peer::asyncCall( UNIQUE_LOCK(send_mtx_, lk); msgpack::pack(send_buf_, call_obj); _send(); - return rpcid; + return future; } using PeerPtr = std::shared_ptr<ftl::net::Peer>; diff --git a/src/protocol/connection.cpp b/src/protocol/connection.cpp index deab195..93d2d41 100644 --- a/src/protocol/connection.cpp +++ b/src/protocol/connection.cpp @@ -54,7 +54,7 @@ void SocketConnection::connect(const SocketAddress &address, int timeout) { ssize_t SocketConnection::recv(char *buffer, size_t len) { auto recvd = sock_.recv(buffer, len, 0); if (recvd == 0) { - LOG(3) << "recv(): read size 0"; + DLOG(3) << "recv(): read size 0"; return -1; // -1 means close, 0 means retry } if (recvd < 0) { @@ -75,7 +75,7 @@ ssize_t SocketConnection::writev(const struct iovec *iov, int iovcnt) { for (int i = 0; i < iovcnt; i++) { requested += iov[i].iov_len; } if (sent < 0) { - LOG(ERROR) << "writev(): " << sock_.get_error_string(); + DLOG(ERROR) << "writev(): " << sock_.get_error_string(); if (sock_.is_fatal()) { return sent; } @@ -109,7 +109,7 @@ ssize_t SocketConnection::writev(const struct iovec *iov, int iovcnt) { writev_calls++; if (sent < 0) { - LOG(ERROR) << "writev(): " << sock_.get_error_string(); + DLOG(ERROR) << "writev(): " << sock_.get_error_string(); if (sock_.is_fatal()) { return sent; } @@ -119,25 +119,25 @@ ssize_t SocketConnection::writev(const struct iovec *iov, int iovcnt) { sent_total += sent; } - LOG(2) << "message required " << writev_calls << " writev() calls"; + DLOG(2) << "message required " << writev_calls << " writev() calls"; if (can_increase_sock_buffer_) { auto send_buf_size = sock_.get_send_buffer_size(); auto send_buf_size_new = size_t(sock_.get_send_buffer_size() * 1.5); - LOG(WARNING) << "Send buffer size " + DLOG(WARNING) << "Send buffer size " << (send_buf_size >> 10) << " KiB. " << "Increasing socket buffer size to " << (send_buf_size_new >> 10) << "KiB."; if (!sock_.set_send_buffer_size(send_buf_size_new)) { - LOG(ERROR) << "could not increase send buffer size, " + DLOG(ERROR) << "could not increase send buffer size, " << "set_send_buffer_size() failed"; can_increase_sock_buffer_ = false; } else { send_buf_size = sock_.get_send_buffer_size(); bool error = send_buf_size < send_buf_size_new; - LOG_IF(WARNING, error) + DLOG_IF(WARNING, error) << "could not increase send buffer size " << "(buffer size: " << send_buf_size << ")"; can_increase_sock_buffer_ &= !error; @@ -156,7 +156,7 @@ std::string SocketConnection::host() { } int SocketConnection::port() { - LOG(ERROR) << "port() not implemented"; + DLOG(ERROR) << "port() not implemented"; return -1; } @@ -164,11 +164,11 @@ bool SocketConnection::set_recv_buffer_size(size_t sz) { auto old = get_recv_buffer_size(); auto ok = sock_.set_recv_buffer_size(sz); if (!ok) { - LOG(ERROR) << "setting socket send buffer size failed:" + DLOG(ERROR) << "setting socket send buffer size failed:" << sock_.get_error_string(); } if (get_recv_buffer_size() == old) { - LOG(ERROR) << "recv buffer size was not changed"; + DLOG(ERROR) << "recv buffer size was not changed"; } return ok; } @@ -177,11 +177,11 @@ bool SocketConnection::set_send_buffer_size(size_t sz) { auto old = get_send_buffer_size(); auto ok = sock_.set_send_buffer_size(sz); if (!ok) { - LOG(ERROR) << "setting socket send buffer size failed:" + DLOG(ERROR) << "setting socket send buffer size failed:" << sock_.get_error_string(); } if (get_send_buffer_size() == old) { - LOG(ERROR) << "send buffer size was not changed"; + DLOG(ERROR) << "send buffer size was not changed"; } return ok; diff --git a/src/protocol/tcp.cpp b/src/protocol/tcp.cpp index 06460f7..122759b 100644 --- a/src/protocol/tcp.cpp +++ b/src/protocol/tcp.cpp @@ -20,13 +20,13 @@ using ftl::net::internal::Socket; Connection_TCP::Connection_TCP(Socket sock, SocketAddress addr) : SocketConnection(sock, addr) { if (!sock_.set_nodelay(true) || !sock_.get_nodelay()) { - LOG(ERROR) << "Could not set TCP_NODELAY"; + DLOG(ERROR) << "Could not set TCP_NODELAY"; } } Connection_TCP::Connection_TCP() : SocketConnection(create_tcp_socket(), {}) { if (!sock_.set_nodelay(true) || !sock_.get_nodelay()) { - LOG(ERROR) << "Could not set TCP_NODELAY"; + DLOG(ERROR) << "Could not set TCP_NODELAY"; } } @@ -59,7 +59,7 @@ Server_TCP::Server_TCP(const std::string &hostname, int port) : int enable = 1; if (sock_.setsockopt(SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&enable), sizeof(int)) < 0) { - LOG(ERROR) << "Setting SO_REUSEADDR failed"; + DLOG(ERROR) << "Setting SO_REUSEADDR failed"; } } diff --git a/src/protocol/tls.cpp b/src/protocol/tls.cpp index cbc1738..abb8af3 100644 --- a/src/protocol/tls.cpp +++ b/src/protocol/tls.cpp @@ -97,7 +97,7 @@ bool Connection_TLS::connect(const std::string& hostname, int port, int timeout) check_gnutls_error_(gnutls_handshake(session_)); - LOG(INFO) << "TLS connection established: " + DLOG(INFO) << "TLS connection established: " << gnutls_session_get_desc(session_) << "; " << get_cert_info(session_); @@ -117,7 +117,7 @@ bool Connection_TLS::close() { ssize_t Connection_TLS::recv(char *buffer, size_t len) { auto recvd = gnutls_record_recv(session_, buffer, len); if (recvd == 0) { - LOG(1) << "recv returned 0 (buffer size " << len << "), closing connection"; + DLOG(1) << "recv returned 0 (buffer size " << len << "), closing connection"; close(); } diff --git a/src/protocol/websocket.cpp b/src/protocol/websocket.cpp index 696b474..6a157f0 100644 --- a/src/protocol/websocket.cpp +++ b/src/protocol/websocket.cpp @@ -225,7 +225,7 @@ void WebSocketBase<SocketT>::connect(const ftl::URI& uri, int timeout) { http += base64_encode(uri.getUserInfo()) + "\r\n"; // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Authorization if (uri.getProtocol() != URI::scheme_t::SCHEME_WSS) { - LOG(WARNING) << "HTTP Basic Auth is being sent without TLS"; + DLOG(WARNING) << "HTTP Basic Auth is being sent without TLS"; } } diff --git a/src/socket/socket_linux.cpp b/src/socket/socket_linux.cpp index d9b8b8f..daefdab 100644 --- a/src/socket/socket_linux.cpp +++ b/src/socket/socket_linux.cpp @@ -52,7 +52,7 @@ Socket::Socket(int domain, int type, int protocol) : if (retval > 0) { fd_ = retval; } else { - LOG(ERROR) << ("socket() failed"); + DLOG(ERROR) << ("socket() failed"); throw FTL_Error("socket: " + get_error_string()); } } @@ -92,7 +92,7 @@ Socket Socket::accept(SocketAddress &addr) { socket.fd_ = retval; socket.family_ = family_; } else { - LOG(ERROR) << "accept returned error: " << strerror(errno); + DLOG(ERROR) << "accept returned error: " << strerror(errno); socket.status_ = STATUS::INVALID; } return socket; @@ -152,7 +152,7 @@ int Socket::connect(const SocketAddress &address, int timeout) { if (rc < 0) { ::close(fd_); status_ = STATUS::CLOSED; - LOG(ERROR) << "socket error: " << strerror(errno); + DLOG(ERROR) << "socket error: " << strerror(errno); return rc; } @@ -165,7 +165,7 @@ bool Socket::close() { status_ = STATUS::CLOSED; return ::close(fd_) == 0; } else if (status_ != STATUS::CLOSED) { - LOG(INFO) << "close() on non-valid socket"; + DLOG(INFO) << "close() on non-valid socket"; } return false; } @@ -216,7 +216,7 @@ std::string ftl::net::internal::get_host(const SocketAddress& addr) { if (err == 0) { return std::string(hbuf); } else if (err == EAI_NONAME) return ftl::net::internal::get_ip(addr); else - LOG(WARNING) << "getnameinfo(): " << gai_strerror(err) << " (" << err << ")"; + DLOG(WARNING) << "getnameinfo(): " << gai_strerror(err) << " (" << err << ")"; return "unknown"; } diff --git a/src/socket/socket_windows.cpp b/src/socket/socket_windows.cpp index 8334b5c..dabcc4f 100644 --- a/src/socket/socket_windows.cpp +++ b/src/socket/socket_windows.cpp @@ -131,7 +131,7 @@ Socket Socket::accept(SocketAddress& addr) { socket.family_ = family_; } else { err_ = WSAGetLastError(); - LOG(ERROR) << "accept returned error: " << get_error_string(); + DLOG(ERROR) << "accept returned error: " << get_error_string(); socket.status_ = STATUS::INVALID; } return socket; @@ -188,7 +188,7 @@ int Socket::getsockopt(int level, int optname, void* optval, socklen_t* optlen) } void Socket::set_blocking(bool val) { - LOG(ERROR) << "TODO: set blocking/non-blocking"; + DLOG(ERROR) << "TODO: set blocking/non-blocking"; } std::string Socket::get_error_string(int code) { diff --git a/src/streams/filestream.cpp b/src/streams/filestream.cpp index ff09e65..335e9f2 100644 --- a/src/streams/filestream.cpp +++ b/src/streams/filestream.cpp @@ -193,7 +193,7 @@ bool File::readPacket(Packet &data) { obj.convert(pack); } } catch (std::exception &e) { - LOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what(); + DLOG(INFO) << "Corrupt message: " << buffer_in_.nonparsed_size() << " - " << e.what(); // active_ = false; return false; } @@ -232,7 +232,7 @@ void File::_patchPackets(StreamPacket *spkt, DataPacket *pkt) { bool File::tick(int64_t ts) { if (!active_) return false; if (mode_ != Mode::Read) { - LOG(ERROR) << "Cannot read from a write only file"; + DLOG(ERROR) << "Cannot read from a write only file"; return false; } @@ -371,7 +371,7 @@ bool File::tick(int64_t ts) { auto &fsdata = framesets_[data.streamID]; if (fsdata.first_ts < 0) { - LOG(WARNING) << "Bad first timestamp " << fsdata.first_ts << ", " << data.timestamp; + DLOG(WARNING) << "Bad first timestamp " << fsdata.first_ts << ", " << data.timestamp; } // Adjust timestamp @@ -438,7 +438,7 @@ bool File::_open() { istream_->open(uri_.toFilePath(), std::ifstream::in | std::ifstream::binary); if (!istream_->good()) { - LOG(ERROR) << "Could not open file: " << uri_.toFilePath(); + DLOG(ERROR) << "Could not open file: " << uri_.toFilePath(); return false; } } @@ -492,7 +492,7 @@ bool File::begin() { if (mode_ == Mode::Read) { if (!checked_) { if (!_checkFile()) { - LOG(ERROR) << "Could not open file: " << uri_.toFilePath(); + DLOG(ERROR) << "Could not open file: " << uri_.toFilePath(); return false; } } @@ -511,7 +511,7 @@ bool File::begin() { ostream_->open(uri_.toFilePath(), std::ifstream::out | std::ifstream::binary); if (!ostream_->good()) { - LOG(ERROR) << "Could not open file: '" << uri_.toFilePath() << "'"; + DLOG(ERROR) << "Could not open file: '" << uri_.toFilePath() << "'"; return false; } diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index c450135..1525892 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -559,7 +559,7 @@ bool Net::_processRequest(ftl::net::Peer *p, const StreamPacket *spkt, const Dat try { connect_cb_.trigger(p); } catch (const ftl::exception &e) { - LOG(ERROR) << "Exception in stream connect callback: " << e.what(); + DLOG(ERROR) << "Exception in stream connect callback: " << e.what(); } } diff --git a/src/streams/packetmanager.cpp b/src/streams/packetmanager.cpp index f964263..241cb7f 100644 --- a/src/streams/packetmanager.cpp +++ b/src/streams/packetmanager.cpp @@ -73,7 +73,7 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P } } } else if (state.timestamp > packets.first.timestamp) { - LOG(WARNING) << "Old packet received"; + DLOG(WARNING) << "Old packet received"; // Note: not ideal but still better than discarding cb(packets); return; @@ -104,7 +104,7 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P } if (state.bufferedEndFrames > 4) { - LOG(WARNING) << "Discarding incomplete frame: " << state.timestamp; + DLOG(WARNING) << "Discarding incomplete frame: " << state.timestamp; UNIQUE_LOCK(state.mtx, lk); if (state.bufferedEndFrames > 4) { state.processed = 0; diff --git a/src/universe.cpp b/src/universe.cpp index b770b46..1b98f7b 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -672,7 +672,7 @@ void Universe::_notifyDisconnect(Peer *p) { } void Universe::_notifyError(Peer *p, ftl::protocol::Error e, const std::string &errstr) { - LOG(ERROR) << "Net Error (" << int(e) << "): " << errstr; + DLOG(ERROR) << "Net Error (" << int(e) << "): " << errstr; const auto ptr = (p) ? _findPeer(p) : nullptr; on_error_.triggerAsync(ptr, e, errstr); diff --git a/src/universe.hpp b/src/universe.hpp index 5ad13c3..30b0ace 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -141,9 +141,7 @@ class Universe { * @return A call id for use with cancelCall() if needed. */ template <typename R, typename... ARGS> - int asyncCall(const UUID &pid, const std::string &name, - std::function<void(const R&)> cb, - ARGS... args); + std::future<R> asyncCall(const UUID &pid, const std::string &name, ARGS... args); template <typename... ARGS> bool send(const UUID &pid, const std::string &name, ARGS... args); @@ -261,74 +259,49 @@ void Universe::broadcast(const std::string &name, ARGS... args) { template <typename R, typename... ARGS> std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { - struct SharedData { - std::atomic_bool hasreturned = false; - std::mutex m; - std::condition_variable cv; - std::optional<R> result; - }; - - auto sdata = std::make_shared<SharedData>(); - - auto handler = [sdata](const std::optional<R> &r) { - std::unique_lock<std::mutex> lk(sdata->m); - if (r && !sdata->hasreturned) { - sdata->hasreturned = true; - sdata->result = r; - } - lk.unlock(); - sdata->cv.notify_one(); - }; + std::vector<std::future<std::optional<R>>> futures; { SHARED_LOCK(net_mutex_, lk); for (const auto &p : peers_) { if (!p || !p->waitConnection()) continue; - p->asyncCall<std::optional<R>>(name, handler, args...); + futures.push_back(std::move(p->asyncCall<std::optional<R>>(name, args...))); } } - // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(sdata->m); - sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata] { - return static_cast<bool>(sdata->hasreturned); - }); + for (auto &f : futures) { + if (f.wait_for(std::chrono::seconds(1)) != std::future_status::ready) { + continue; + } + return f.get(); + } - return sdata->result; + return {}; } template <typename R, typename... ARGS> std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { - struct SharedData { - std::atomic_int returncount = 0; - std::atomic_int sentcount = 0; - std::mutex m; - std::condition_variable cv; - std::vector<R> results; - }; - - auto sdata = std::make_shared<SharedData>(); - - auto handler = [sdata](const std::vector<R> &r) { - std::unique_lock<std::mutex> lk(sdata->m); - ++sdata->returncount; - sdata->results.insert(sdata->results.end(), r.begin(), r.end()); - lk.unlock(); - sdata->cv.notify_one(); - }; + std::vector<std::future<std::vector<R>>> futures; { SHARED_LOCK(net_mutex_, lk); for (const auto &p : peers_) { if (!p || !p->waitConnection()) continue; - ++sdata->sentcount; - p->asyncCall<std::vector<R>>(name, handler, args...); + futures.push_back(std::move(p->asyncCall<std::vector<R>>(name, args...))); + } + } + + std::vector<R> results; + + for (auto &f : futures) { + if (f.wait_for(std::chrono::seconds(1)) != std::future_status::ready) { + continue; } + auto v = f.get(); + results.insert(results.end(), v.begin(), v.end()); } - std::unique_lock<std::mutex> llk(sdata->m); - sdata->cv.wait_for(llk, std::chrono::seconds(1), [sdata]{return sdata->returncount == sdata->sentcount; }); - return sdata->results; + return results; } template <typename R, typename... ARGS> @@ -343,21 +316,21 @@ R Universe::call(const ftl::UUID &pid, const std::string &name, ARGS... args) { } template <typename R, typename... ARGS> -int Universe::asyncCall(const ftl::UUID &pid, const std::string &name, std::function<void(const R&)> cb, ARGS... args) { +std::future<R> Universe::asyncCall(const ftl::UUID &pid, const std::string &name, ARGS... args) { PeerPtr p = getPeer(pid); if (p == nullptr || !p->isConnected()) { if (p == nullptr) throw FTL_Error("Attempting to call an unknown peer : " << pid.to_string()); else throw FTL_Error("Attempting to call an disconnected peer : " << pid.to_string()); } - return p->asyncCall(name, cb, args...); + return p->asyncCall(name, args...); } template <typename... ARGS> bool Universe::send(const ftl::UUID &pid, const std::string &name, ARGS... args) { PeerPtr p = getPeer(pid); if (p == nullptr) { - LOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); + DLOG(WARNING) << "Attempting to call an unknown peer : " << pid.to_string(); return false; } diff --git a/test/mocks/connection.cpp b/test/mocks/connection.cpp index 98de3ef..1c79a7c 100644 --- a/test/mocks/connection.cpp +++ b/test/mocks/connection.cpp @@ -99,7 +99,7 @@ void provideResponses(const ftl::net::PeerPtr &p, int c, const std::vector<std:: if (name != expname) return; if (!notif) { - auto res_obj = std::make_tuple(1,id,name, resdata); + auto res_obj = std::make_tuple(1,id,msgpack::object(), resdata); std::stringstream buf; msgpack::pack(buf, res_obj); fakedata[c] = buf.str(); diff --git a/test/mocks/connection.hpp b/test/mocks/connection.hpp index 5700c8e..60ab22a 100644 --- a/test/mocks/connection.hpp +++ b/test/mocks/connection.hpp @@ -60,7 +60,7 @@ std::tuple<uint8_t, std::string, T> readNotifFull(int s) { template <typename T> T readRPCReturn(int s) { msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); - std::tuple<uint8_t, uint32_t, std::string, T> req; + std::tuple<uint8_t, uint32_t, msgpack::object, T> req; msg.get().convert(req); return std::get<3>(req); } diff --git a/test/peer_unit.cpp b/test/peer_unit.cpp index 3679e6d..ffc1151 100644 --- a/test/peer_unit.cpp +++ b/test/peer_unit.cpp @@ -114,7 +114,7 @@ TEST_CASE("Peer::call()", "[rpc]") { while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); auto [id,value] = readRPC<tuple<int>>(c); - auto res_obj = std::make_tuple(1,id,"__return__",get<0>(value)+22); + auto res_obj = std::make_tuple(1,id,msgpack::object(),get<0>(value)+22); std::stringstream buf; msgpack::pack(buf, res_obj); fakedata[c] = buf.str(); @@ -137,7 +137,7 @@ TEST_CASE("Peer::call()", "[rpc]") { while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); auto res = readRPC<tuple<>>(c); - auto res_obj = std::make_tuple(1,std::get<0>(res),"__return__",77); + auto res_obj = std::make_tuple(1,std::get<0>(res),msgpack::object(),77); std::stringstream buf; msgpack::pack(buf, res_obj); fakedata[c] = buf.str(); @@ -152,6 +152,38 @@ TEST_CASE("Peer::call()", "[rpc]") { REQUIRE( (res == 77) ); } + SECTION("exception call") { + REQUIRE( s->isConnected() ); + + fakedata[c] = ""; + + // Thread to provide response to otherwise blocking call + std::thread thr([&s, c]() { + while (fakedata[c].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + auto res = readRPC<tuple<>>(c); + auto res_obj = std::make_tuple(1,std::get<0>(res),"some error",msgpack::object()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + fakedata[c] = buf.str(); + s->data(); + sleep_for(milliseconds(50)); + }); + + bool hadException = false; + + try { + s->call<int>("test1"); + } catch(const std::exception &e) { + LOG(INFO) << "Expected exception: " << e.what(); + hadException = true; + } + + thr.join(); + + REQUIRE(hadException); + } + SECTION("vector return from call") { REQUIRE( s->isConnected() ); @@ -163,7 +195,7 @@ TEST_CASE("Peer::call()", "[rpc]") { auto res = readRPC<tuple<>>(c); vector<int> data = {44,55,66}; - auto res_obj = std::make_tuple(1,std::get<0>(res),"__return__",data); + auto res_obj = std::make_tuple(1,std::get<0>(res),msgpack::object(),data); std::stringstream buf; msgpack::pack(buf, res_obj); fakedata[c] = buf.str(); @@ -241,7 +273,7 @@ TEST_CASE("Peer::bind()", "[rpc]") { return a; }); - s->asyncCall<int>("hello", [](int a){}, 55); + s->asyncCall<int>("hello", 55); s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); @@ -258,7 +290,7 @@ TEST_CASE("Peer::bind()", "[rpc]") { return b; }); - s->asyncCall<int>("hello", [](int a){}, 55); + s->asyncCall<int>("hello", 55); s->data(); // Force it to read the fake send... sleep_for(milliseconds(50)); -- GitLab