diff --git a/include/ftl/handle.hpp b/include/ftl/handle.hpp index bbcaea46b87925345faaa519966d5b76cd6a5257..c30bae558a9f413d6ddd3421b9b5c30ce582da78 100644 --- a/include/ftl/handle.hpp +++ b/include/ftl/handle.hpp @@ -114,7 +114,7 @@ struct Handler : BaseHandler { for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) { bool keep = true; try { - keep = i->second(std::forward<ARGS>(args)...); + keep = i->second(args...); } catch(...) { hadFault = true; } @@ -136,7 +136,7 @@ struct Handler : BaseHandler { for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) { bool keep = true; try { - keep = i->second(std::forward<ARGS>(args)...); + keep = i->second(args...); } catch (...) { hadFault = true; } @@ -159,7 +159,7 @@ struct Handler : BaseHandler { for (auto i=callbacks_.begin(); i!=callbacks_.end(); ++i) { ftl::pool.push([this, f = i->second, args...](int id) { try { - f(std::forward<ARGS>(args)...); + f(args...); } catch (const ftl::exception &e) { --jobs_; throw e; diff --git a/include/ftl/protocol/error.hpp b/include/ftl/protocol/error.hpp new file mode 100644 index 0000000000000000000000000000000000000000..52575e4f845e266fb268c130e453f77eac45fc9b --- /dev/null +++ b/include/ftl/protocol/error.hpp @@ -0,0 +1,23 @@ +#pragma once + +namespace ftl { +namespace protocol { + +enum struct Error { + kNoError = 0, + kUnknown = 1, + kPacketFailure, + kDispatchFailed, + kMissingHandshake, + kRPCResponse, + kSocketError, + kBufferSize, + kReconnectionFailed, + kBadHandshake, + kConnectionFailed, + kSelfConnect, + kListen +}; + +} +} diff --git a/include/ftl/protocol/self.hpp b/include/ftl/protocol/self.hpp index 80e1273b12310361edbaecd531efabef954395d7..fcb697de5dff2c93e6066e9edebe1096f642c859 100644 --- a/include/ftl/protocol/self.hpp +++ b/include/ftl/protocol/self.hpp @@ -9,6 +9,7 @@ #include <ftl/uuid.hpp> #include <ftl/uri.hpp> #include <ftl/handle.hpp> +#include <ftl/protocol/error.hpp> #include <memory> #include <string> @@ -23,10 +24,6 @@ namespace protocol { class Node; class Stream; -struct Error { - int errno; -}; - class Self { public: /** Peer for outgoing connection: resolve address and connect */ @@ -75,7 +72,7 @@ class Self { ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&); - ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)>&); + ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, ftl::protocol::Error, const std::string & )>&); protected: std::shared_ptr<ftl::net::Universe> universe_; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 00dc234ef88931b74c3e9bde671f1b4efeaee993..038417d1f499006d2dabdc39f9d694a79ed35dd3 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -49,8 +49,7 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { case 4: dispatch_call(s, msg); break; default: - LOG(ERROR) << "Unrecognised msgpack : " << msg.via.array.size; - return; + throw FTL_Error("Unrecognised msgpack : " << msg.via.array.size); } } @@ -60,8 +59,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { try { msg.convert(the_call); } catch(...) { - LOG(ERROR) << "Bad message format"; - return; + throw FTL_Error("Bad message format"); } // TODO: proper validation of protocol (and responding to it) @@ -72,7 +70,6 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { // assert(type == 0); if (type == 1) { - //DLOG(INFO) << "RPC return for " << id; s._dispatchResponse(id, name, args); } else if (type == 0) { DLOG(2) << "RPC " << name << "() <- " << s.getURI(); @@ -85,15 +82,13 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { auto result = (*func)(s, args); //->get(); s._sendResponse(id, name, result->get()); } catch (const std::exception &e) { - //throw; - LOG(ERROR) << "Exception when attempting to call RPC " << name << " (" << e.what() << ")"; + throw FTL_Error("Exception when attempting to call RPC " << name << " (" << e.what() << ")"); } } else { - LOG(WARNING) << "No binding found for " << name; + throw FTL_Error("No binding found for " << name); } } else { - // TODO(nick) Some error - LOG(ERROR) << "Unrecognised message type"; + throw FTL_Error("Unrecognised message type: " << type); } } @@ -131,7 +126,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const try { auto result = (*binding)(s, args); } catch (const int &e) { - LOG(ERROR) << "Exception in bound function"; + //throw "Exception in bound function"; throw &e; } catch (const std::bad_cast &e) { std::string args_str = ""; @@ -139,13 +134,13 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const args_str += object_type_to_string(args.via.array.ptr[i].type); if ((i + 1) != args.via.array.size) args_str += ", "; } - LOG(ERROR) << "Bad cast, got: " << args_str; + throw FTL_Error("Bad cast, got: " << args_str); } catch (const std::exception &e) { - LOG(ERROR) << "Exception for '" << name << "' - " << e.what(); + throw FTL_Error("Exception for '" << name << "' - " << e.what()); } } else { - LOG(ERROR) << "Missing handler for incoming message (" << name << ")"; + throw FTL_Error("Missing handler for incoming message (" << name << ")"); } } diff --git a/src/peer.cpp b/src/peer.cpp index b6725875660b99895aa3ec5a704cbdad5426b87a..8342b8f7099efefb2c60e202855cd1ef93b940f4 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -86,13 +86,12 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) { * (3). Listening side receives handshake and sets status to kConnected. */ if (magic != ftl::net::kMagic) { + net_->_notifyError(this, ftl::protocol::Error::kBadHandshake, "invalid magic during handshake"); _close(reconnect_on_protocol_error_); - LOG(ERROR) << "invalid magic during handshake"; - } else { if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!"; - LOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening") + LOG(1) << "(" << (outgoing_ ? "connecting" : "listening") << " peer) handshake received from remote for " << pid.to_string(); status_ = NodeStatus::kConnected; @@ -117,7 +116,7 @@ void Peer::_bind_rpc() { bind("__disconnect__", [this]() { close(reconnect_on_remote_disconnect_); - LOG(INFO) << "peer elected to disconnect: " << id().to_string(); + LOG(1) << "peer elected to disconnect: " << id().to_string(); }); bind("__ping__", [this]() { @@ -177,14 +176,14 @@ bool Peer::reconnect() { URI uri(uri_); - LOG(INFO) << "Reconnecting to " << uri_.to_string() << " ..."; + LOG(1) << "Reconnecting to " << uri_.to_string() << " ..."; try { _connect(); return true; } catch(const std::exception& ex) { - LOG(ERROR) << "reconnect failed: " << ex.what(); + net_->_notifyError(this, ftl::protocol::Error::kReconnectionFailed, ex.what()); } return false; @@ -239,8 +238,7 @@ bool Peer::socketError() { // more socket errors... _close(reconnect_on_socket_error_); - - LOG(ERROR) << "Connection error: " << uri_.to_string() ; // << " - error " << err; + net_->_notifyError(this, ftl::protocol::Error::kSocketError, uri_.to_string()); return true; } @@ -267,7 +265,7 @@ void Peer::data() { recv_buf_.reserve_buffer(kMaxMessage); if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { - LOG(WARNING) << "Net buffer at capacity"; + net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity"); return; } @@ -285,19 +283,17 @@ void Peer::data() { rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity()); if (rc >= cap - 1) { - LOG(WARNING) << "More than buffers worth of data received"; + net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Too much data received"); } if (cap < (kMaxMessage / 10)) { - LOG(WARNING) << "NO BUFFER"; + net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity"); } - } catch (ftl::exception& ex) { - LOG(ERROR) << "connection error: " << ex.what() << ", disconnected"; + } catch (std::exception& ex) { + net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what()); close(reconnect_on_protocol_error_); return; - } catch (...) { - LOG(FATAL) << "unknown exception from SocketConnection::recv()"; } if (rc == 0) { // retry later @@ -331,7 +327,7 @@ void Peer::data() { try { _data(); } catch (const std::exception &e) { - LOG(ERROR) << "Error processing packet: " << e.what(); + net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); } --job_count_; }); @@ -373,7 +369,7 @@ bool Peer::_data() { return false; } } catch (const std::exception& ex) { - LOG(ERROR) << "decoding error: " << ex.what() << ", disconnected"; + net_->_notifyError(this, ftl::protocol::Error::kPacketFailure, ex.what()); _close(reconnect_on_protocol_error_); return false; } @@ -394,13 +390,13 @@ bool Peer::_data() { obj.convert(hs); if (get<1>(hs) != "__handshake__") { - LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'"; + DLOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'"; // Allow a small delay in case another thread is doing the handshake //lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (status_ == NodeStatus::kConnecting) { - LOG(ERROR) << "failed to get handshake"; + net_->_notifyError(this, ftl::protocol::Error::kMissingHandshake, "failed to get handshake"); close(reconnect_on_protocol_error_); //lk.lock(); return false; @@ -413,13 +409,13 @@ bool Peer::_data() { //return true; } } catch(...) { - LOG(WARNING) << "Bad first message format... waiting"; + DLOG(WARNING) << "Bad first message format... waiting"; // Allow a small delay in case another thread is doing the handshake //lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); if (status_ == NodeStatus::kConnecting) { - LOG(ERROR) << "failed to get handshake"; + net_->_notifyError(this, ftl::protocol::Error::kMissingHandshake, "failed to get handshake"); close(reconnect_on_protocol_error_); return false; } @@ -435,12 +431,16 @@ bool Peer::_data() { try { _data(); } catch (const std::exception &e) { - LOG(ERROR) << "Error processing packet: " << e.what(); + net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); } --job_count_; }); - disp_->dispatch(*this, obj); + try { + disp_->dispatch(*this, obj); + } catch (const std::exception &e) { + net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what()); + } // Lock again before freeing msg_handle (destruction order). // msgpack::object_handle destructor modifies recv_buffer_ @@ -462,10 +462,10 @@ void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::obje try { (*cb)(res); } catch(std::exception &e) { - LOG(ERROR) << "Exception in RPC response: " << e.what(); + net_->_notifyError(this, ftl::protocol::Error::kRPCResponse, e.what()); } } else { - LOG(WARNING) << "Missing RPC callback for result - discarding: " << name; + net_->_notifyError(this, ftl::protocol::Error::kRPCResponse, "Missing RPC callback for result - discarding: " + name); } } @@ -536,7 +536,7 @@ int Peer::_send() { if (c <= 0) { // writev() should probably throw exception which is reported here // at the moment, error message is (should be) printed by writev() - LOG(ERROR) << "writev() failed"; + net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev() failed"); return c; } @@ -544,14 +544,14 @@ int Peer::_send() { sz += send_buf_.vector()[i].iov_len; } if (c != sz) { - LOG(ERROR) << "writev(): incomplete send"; + net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev(): incomplete send"); _close(reconnect_on_socket_error_); } send_buf_.clear(); } catch (std::exception& ex) { - LOG(ERROR) << "exception while sending data, closing connection"; + net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what()); _close(reconnect_on_protocol_error_); } @@ -568,7 +568,6 @@ Peer::~Peer() { // Prevent deletion if there are any jobs remaining while (job_count_ > 0 && ftl::pool.size() > 0) { - LOG(INFO) << "waiting for peer jobs..."; std::this_thread::sleep_for(std::chrono::milliseconds(2)); } } diff --git a/src/self.cpp b/src/self.cpp index bab56a0f73361d1c586247d1aaf3a0c6109703a2..d7e5e8f73176b9654ad68647704ebce311c0cb74 100644 --- a/src/self.cpp +++ b/src/self.cpp @@ -90,9 +90,8 @@ ftl::Handle Self::onDisconnect(const std::function<bool(const std::shared_ptr<ft }); } -ftl::Handle Self::onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)> &cb) { - return universe_->onError([cb](const ftl::net::PeerPtr &p, const ftl::net::Error &err) { - ftl::protocol::Error perr = {}; - return cb(std::make_shared<ftl::protocol::Node>(p), perr); +ftl::Handle Self::onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, ftl::protocol::Error, const std::string &)> &cb) { + return universe_->onError([cb](const ftl::net::PeerPtr &p, ftl::protocol::Error e, const std::string &estr) { + return cb(std::make_shared<ftl::protocol::Node>(p), e, estr); }); } diff --git a/src/universe.cpp b/src/universe.cpp index dcf5f01d12fb779dc99b826b70889a3f89c60261..d333395b9ade22867ef14451f2d8dda67cbc9362 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -160,7 +160,7 @@ void Universe::start() { void Universe::shutdown() { if (!active_) return; - LOG(INFO) << "Cleanup Network ..."; + DLOG(1) << "Cleanup Network ..."; { SHARED_LOCK(net_mutex_, lk); @@ -179,7 +179,6 @@ void Universe::shutdown() { // FIXME: This shouldn't be needed while (peer_instances_ > 0 && ftl::pool.size() > 0) { - LOG(INFO) << "waiting for peers to destroy..."; std::this_thread::sleep_for(std::chrono::milliseconds(2)); } } @@ -191,14 +190,15 @@ bool Universe::listen(const ftl::URI &addr) { { UNIQUE_LOCK(net_mutex_,lk); - LOG(INFO) << "listening on " << l->uri().to_string(); + DLOG(1) << "listening on " << l->uri().to_string(); listeners_.push_back(std::move(l)); } socket_cv_.notify_one(); return true; } catch (const std::exception &ex) { - LOG(ERROR) << "Can't listen " << addr.to_string() << ", " << ex.what(); + DLOG(INFO) << "Can't listen " << addr.to_string() << ", " << ex.what(); + _notifyError(nullptr, ftl::protocol::Error::kListen, ex.what()); return false; } } @@ -260,7 +260,7 @@ PeerPtr Universe::connect(const ftl::URI &u) { _insertPeer(p); } else { - LOG(ERROR) << "Peer in invalid state"; + DLOG(ERROR) << "Peer in invalid state"; } _installBindings(p); @@ -359,7 +359,7 @@ void Universe::_removePeer(PeerPtr &p) { p->status() == NodeStatus::kReconnecting || p->status() == NodeStatus::kDisconnected)) { - LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); + DLOG(1) << "Removing disconnected peer: " << p->id().to_string(); on_disconnect_.triggerAsync(p); auto ix = peer_ids_.find(p->id()); @@ -429,8 +429,7 @@ void Universe::_periodic() { if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") { for (const auto &l : listeners_) { if (l->port() == u.getPort()) { - // TODO: use UUID? - LOG(ERROR) << "Cannot connect to self"; + _notifyError(nullptr, ftl::protocol::Error::kSelfConnect, "Cannot connect to self"); garbage_.push_back((*i).peer); i = reconnects_.erase(i); removed = true; @@ -453,7 +452,6 @@ void Universe::_periodic() { else { garbage_.push_back((*i).peer); i = reconnects_.erase(i); - LOG(WARNING) << "Reconnection to peer failed"; } } } @@ -505,13 +503,13 @@ void Universe::_run() { int errNum = WSAGetLastError(); switch (errNum) { case WSAENOTSOCK : continue; // Socket was closed - default : LOG(WARNING) << "Unhandled poll error: " << errNum; + default : DLOG(WARNING) << "Unhandled poll error: " << errNum; } #else switch (errno) { case 9 : continue; // Bad file descriptor = socket closed case 4 : continue; // Interrupted system call ... no problem - default : LOG(WARNING) << "Unhandled poll error: " << strerror(errno) << "(" << errno << ")"; + default : DLOG(WARNING) << "Unhandled poll error: " << strerror(errno) << "(" << errno << ")"; } #endif continue; @@ -529,7 +527,7 @@ void Universe::_run() { try { csock = l->accept(); } catch (const std::exception &ex) { - LOG(ERROR) << "Connection failed: " << ex.what(); + _notifyError(nullptr, ftl::protocol::Error::kConnectionFailed, ex.what()); } lk.unlock(); @@ -600,7 +598,7 @@ ftl::Handle Universe::onDisconnect(const std::function<bool(const PeerPtr&)> &cb return on_disconnect_.on(cb); } -ftl::Handle Universe::onError(const std::function<bool(const PeerPtr&, const ftl::net::Error &)> &cb) { +ftl::Handle Universe::onError(const std::function<bool(const PeerPtr&, ftl::protocol::Error, const std::string &)> &cb) { return on_error_.on(cb); } @@ -633,6 +631,12 @@ void Universe::_notifyDisconnect(Peer *p) { on_disconnect_.triggerAsync(ptr); } -void Universe::_notifyError(Peer *p, const ftl::net::Error &e) { - // TODO(Nick) +void Universe::_notifyError(Peer *p, ftl::protocol::Error e, const std::string &errstr) { + DLOG(ERROR) << "Net Error (" << int(e) << "): " << errstr; + const auto ptr = (p) ? _findPeer(p) : nullptr; + + // Note: Net errors can have no peer + if (!ptr) return; + + on_error_.triggerAsync(ptr, e, errstr); } diff --git a/src/universe.hpp b/src/universe.hpp index 688292fa69755870e39ef21aaf791963f920d018..1930d21e417f5fc193f137a91dd66ed9a9e858e1 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -9,6 +9,7 @@ #include <msgpack.hpp> #include <ftl/protocol.hpp> +#include <ftl/protocol/error.hpp> #include "peer.hpp" #include "dispatcher.hpp" #include <ftl/uuid.hpp> @@ -163,7 +164,7 @@ public: ftl::Handle onConnect(const std::function<bool(const ftl::net::PeerPtr&)>&); ftl::Handle onDisconnect(const std::function<bool(const ftl::net::PeerPtr&)>&); - ftl::Handle onError(const std::function<bool(const ftl::net::PeerPtr&, const ftl::net::Error &)>&); + ftl::Handle onError(const std::function<bool(const ftl::net::PeerPtr&, ftl::protocol::Error, const std::string &)>&); size_t getSendBufferSize(ftl::URI::scheme_t s); size_t getRecvBufferSize(ftl::URI::scheme_t s); @@ -179,7 +180,7 @@ private: void _cleanupPeers(); void _notifyConnect(ftl::net::Peer *); void _notifyDisconnect(ftl::net::Peer *); - void _notifyError(ftl::net::Peer *, const ftl::net::Error &); + void _notifyError(ftl::net::Peer *, ftl::protocol::Error, const std::string &); void _periodic(); ftl::net::PeerPtr _findPeer(const ftl::net::Peer *p); void _removePeer(PeerPtr &p); @@ -214,7 +215,7 @@ private: ftl::Handler<const ftl::net::PeerPtr&> on_connect_; ftl::Handler<const ftl::net::PeerPtr&> on_disconnect_; - ftl::Handler<const ftl::net::PeerPtr&, const ftl::net::Error &> on_error_; + ftl::Handler<const ftl::net::PeerPtr&, ftl::protocol::Error, const std::string &> on_error_; static std::shared_ptr<Universe> instance_; @@ -236,7 +237,7 @@ void Universe::bind(const std::string &name, F func) { template <typename... ARGS> void Universe::broadcast(const std::string &name, ARGS... args) { SHARED_LOCK(net_mutex_,lk); - for (auto &p : peers_) { + for (const auto &p : peers_) { if (!p || !p->waitConnection()) continue; p->send(name, args...); } @@ -265,7 +266,7 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { { SHARED_LOCK(net_mutex_,lk); - for (auto &p : peers_) { + for (const auto &p : peers_) { if (!p || !p->waitConnection()) continue; p->asyncCall<std::optional<R>>(name, handler, args...); } @@ -302,7 +303,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { { SHARED_LOCK(net_mutex_,lk); - for (auto &p : peers_) { + for (const auto &p : peers_) { if (!p || !p->waitConnection()) continue; ++sdata->sentcount; p->asyncCall<std::vector<R>>(name, handler, args...);