Skip to content
Snippets Groups Projects
Commit 1cbe69b1 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Change from log to error callbacks

parent f4bc7a98
No related branches found
No related tags found
No related merge requests found
......@@ -114,7 +114,7 @@ struct Handler : BaseHandler {
for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) {
bool keep = true;
try {
keep = i->second(std::forward<ARGS>(args)...);
keep = i->second(args...);
} catch(...) {
hadFault = true;
}
......@@ -136,7 +136,7 @@ struct Handler : BaseHandler {
for (auto i=callbacks_.begin(); i!=callbacks_.end(); ) {
bool keep = true;
try {
keep = i->second(std::forward<ARGS>(args)...);
keep = i->second(args...);
} catch (...) {
hadFault = true;
}
......@@ -159,7 +159,7 @@ struct Handler : BaseHandler {
for (auto i=callbacks_.begin(); i!=callbacks_.end(); ++i) {
ftl::pool.push([this, f = i->second, args...](int id) {
try {
f(std::forward<ARGS>(args)...);
f(args...);
} catch (const ftl::exception &e) {
--jobs_;
throw e;
......
#pragma once
namespace ftl {
namespace protocol {
enum struct Error {
kNoError = 0,
kUnknown = 1,
kPacketFailure,
kDispatchFailed,
kMissingHandshake,
kRPCResponse,
kSocketError,
kBufferSize,
kReconnectionFailed,
kBadHandshake,
kConnectionFailed,
kSelfConnect,
kListen
};
}
}
......@@ -9,6 +9,7 @@
#include <ftl/uuid.hpp>
#include <ftl/uri.hpp>
#include <ftl/handle.hpp>
#include <ftl/protocol/error.hpp>
#include <memory>
#include <string>
......@@ -23,10 +24,6 @@ namespace protocol {
class Node;
class Stream;
struct Error {
int errno;
};
class Self {
public:
/** Peer for outgoing connection: resolve address and connect */
......@@ -75,7 +72,7 @@ class Self {
ftl::Handle onConnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&);
ftl::Handle onDisconnect(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&)>&);
ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)>&);
ftl::Handle onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, ftl::protocol::Error, const std::string & )>&);
protected:
std::shared_ptr<ftl::net::Universe> universe_;
......
......@@ -49,8 +49,7 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) {
case 4:
dispatch_call(s, msg); break;
default:
LOG(ERROR) << "Unrecognised msgpack : " << msg.via.array.size;
return;
throw FTL_Error("Unrecognised msgpack : " << msg.via.array.size);
}
}
......@@ -60,8 +59,7 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
try {
msg.convert(the_call);
} catch(...) {
LOG(ERROR) << "Bad message format";
return;
throw FTL_Error("Bad message format");
}
// TODO: proper validation of protocol (and responding to it)
......@@ -72,7 +70,6 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
// assert(type == 0);
if (type == 1) {
//DLOG(INFO) << "RPC return for " << id;
s._dispatchResponse(id, name, args);
} else if (type == 0) {
DLOG(2) << "RPC " << name << "() <- " << s.getURI();
......@@ -85,15 +82,13 @@ void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) {
auto result = (*func)(s, args); //->get();
s._sendResponse(id, name, result->get());
} catch (const std::exception &e) {
//throw;
LOG(ERROR) << "Exception when attempting to call RPC " << name << " (" << e.what() << ")";
throw FTL_Error("Exception when attempting to call RPC " << name << " (" << e.what() << ")");
}
} else {
LOG(WARNING) << "No binding found for " << name;
throw FTL_Error("No binding found for " << name);
}
} else {
// TODO(nick) Some error
LOG(ERROR) << "Unrecognised message type";
throw FTL_Error("Unrecognised message type: " << type);
}
}
......@@ -131,7 +126,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
try {
auto result = (*binding)(s, args);
} catch (const int &e) {
LOG(ERROR) << "Exception in bound function";
//throw "Exception in bound function";
throw &e;
} catch (const std::bad_cast &e) {
std::string args_str = "";
......@@ -139,13 +134,13 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
args_str += object_type_to_string(args.via.array.ptr[i].type);
if ((i + 1) != args.via.array.size) args_str += ", ";
}
LOG(ERROR) << "Bad cast, got: " << args_str;
throw FTL_Error("Bad cast, got: " << args_str);
} catch (const std::exception &e) {
LOG(ERROR) << "Exception for '" << name << "' - " << e.what();
throw FTL_Error("Exception for '" << name << "' - " << e.what());
}
} else {
LOG(ERROR) << "Missing handler for incoming message (" << name << ")";
throw FTL_Error("Missing handler for incoming message (" << name << ")");
}
}
......
......@@ -86,13 +86,12 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) {
* (3). Listening side receives handshake and sets status to kConnected.
*/
if (magic != ftl::net::kMagic) {
net_->_notifyError(this, ftl::protocol::Error::kBadHandshake, "invalid magic during handshake");
_close(reconnect_on_protocol_error_);
LOG(ERROR) << "invalid magic during handshake";
} else {
if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!";
LOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening")
LOG(1) << "(" << (outgoing_ ? "connecting" : "listening")
<< " peer) handshake received from remote for " << pid.to_string();
status_ = NodeStatus::kConnected;
......@@ -117,7 +116,7 @@ void Peer::_bind_rpc() {
bind("__disconnect__", [this]() {
close(reconnect_on_remote_disconnect_);
LOG(INFO) << "peer elected to disconnect: " << id().to_string();
LOG(1) << "peer elected to disconnect: " << id().to_string();
});
bind("__ping__", [this]() {
......@@ -177,14 +176,14 @@ bool Peer::reconnect() {
URI uri(uri_);
LOG(INFO) << "Reconnecting to " << uri_.to_string() << " ...";
LOG(1) << "Reconnecting to " << uri_.to_string() << " ...";
try {
_connect();
return true;
} catch(const std::exception& ex) {
LOG(ERROR) << "reconnect failed: " << ex.what();
net_->_notifyError(this, ftl::protocol::Error::kReconnectionFailed, ex.what());
}
return false;
......@@ -239,8 +238,7 @@ bool Peer::socketError() {
// more socket errors...
_close(reconnect_on_socket_error_);
LOG(ERROR) << "Connection error: " << uri_.to_string() ; // << " - error " << err;
net_->_notifyError(this, ftl::protocol::Error::kSocketError, uri_.to_string());
return true;
}
......@@ -267,7 +265,7 @@ void Peer::data() {
recv_buf_.reserve_buffer(kMaxMessage);
if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) {
LOG(WARNING) << "Net buffer at capacity";
net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity");
return;
}
......@@ -285,19 +283,17 @@ void Peer::data() {
rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity());
if (rc >= cap - 1) {
LOG(WARNING) << "More than buffers worth of data received";
net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Too much data received");
}
if (cap < (kMaxMessage / 10)) {
LOG(WARNING) << "NO BUFFER";
net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity");
}
} catch (ftl::exception& ex) {
LOG(ERROR) << "connection error: " << ex.what() << ", disconnected";
} catch (std::exception& ex) {
net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what());
close(reconnect_on_protocol_error_);
return;
} catch (...) {
LOG(FATAL) << "unknown exception from SocketConnection::recv()";
}
if (rc == 0) { // retry later
......@@ -331,7 +327,7 @@ void Peer::data() {
try {
_data();
} catch (const std::exception &e) {
LOG(ERROR) << "Error processing packet: " << e.what();
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
......@@ -373,7 +369,7 @@ bool Peer::_data() {
return false;
}
} catch (const std::exception& ex) {
LOG(ERROR) << "decoding error: " << ex.what() << ", disconnected";
net_->_notifyError(this, ftl::protocol::Error::kPacketFailure, ex.what());
_close(reconnect_on_protocol_error_);
return false;
}
......@@ -394,13 +390,13 @@ bool Peer::_data() {
obj.convert(hs);
if (get<1>(hs) != "__handshake__") {
LOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'";
DLOG(WARNING) << "Missing handshake - got '" << get<1>(hs) << "'";
// Allow a small delay in case another thread is doing the handshake
//lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (status_ == NodeStatus::kConnecting) {
LOG(ERROR) << "failed to get handshake";
net_->_notifyError(this, ftl::protocol::Error::kMissingHandshake, "failed to get handshake");
close(reconnect_on_protocol_error_);
//lk.lock();
return false;
......@@ -413,13 +409,13 @@ bool Peer::_data() {
//return true;
}
} catch(...) {
LOG(WARNING) << "Bad first message format... waiting";
DLOG(WARNING) << "Bad first message format... waiting";
// Allow a small delay in case another thread is doing the handshake
//lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (status_ == NodeStatus::kConnecting) {
LOG(ERROR) << "failed to get handshake";
net_->_notifyError(this, ftl::protocol::Error::kMissingHandshake, "failed to get handshake");
close(reconnect_on_protocol_error_);
return false;
}
......@@ -435,12 +431,16 @@ bool Peer::_data() {
try {
_data();
} catch (const std::exception &e) {
LOG(ERROR) << "Error processing packet: " << e.what();
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
disp_->dispatch(*this, obj);
try {
disp_->dispatch(*this, obj);
} catch (const std::exception &e) {
net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what());
}
// Lock again before freeing msg_handle (destruction order).
// msgpack::object_handle destructor modifies recv_buffer_
......@@ -462,10 +462,10 @@ void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::obje
try {
(*cb)(res);
} catch(std::exception &e) {
LOG(ERROR) << "Exception in RPC response: " << e.what();
net_->_notifyError(this, ftl::protocol::Error::kRPCResponse, e.what());
}
} else {
LOG(WARNING) << "Missing RPC callback for result - discarding: " << name;
net_->_notifyError(this, ftl::protocol::Error::kRPCResponse, "Missing RPC callback for result - discarding: " + name);
}
}
......@@ -536,7 +536,7 @@ int Peer::_send() {
if (c <= 0) {
// writev() should probably throw exception which is reported here
// at the moment, error message is (should be) printed by writev()
LOG(ERROR) << "writev() failed";
net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev() failed");
return c;
}
......@@ -544,14 +544,14 @@ int Peer::_send() {
sz += send_buf_.vector()[i].iov_len;
}
if (c != sz) {
LOG(ERROR) << "writev(): incomplete send";
net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev(): incomplete send");
_close(reconnect_on_socket_error_);
}
send_buf_.clear();
} catch (std::exception& ex) {
LOG(ERROR) << "exception while sending data, closing connection";
net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what());
_close(reconnect_on_protocol_error_);
}
......@@ -568,7 +568,6 @@ Peer::~Peer() {
// Prevent deletion if there are any jobs remaining
while (job_count_ > 0 && ftl::pool.size() > 0) {
LOG(INFO) << "waiting for peer jobs...";
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
......@@ -90,9 +90,8 @@ ftl::Handle Self::onDisconnect(const std::function<bool(const std::shared_ptr<ft
});
}
ftl::Handle Self::onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, const ftl::protocol::Error &)> &cb) {
return universe_->onError([cb](const ftl::net::PeerPtr &p, const ftl::net::Error &err) {
ftl::protocol::Error perr = {};
return cb(std::make_shared<ftl::protocol::Node>(p), perr);
ftl::Handle Self::onError(const std::function<bool(const std::shared_ptr<ftl::protocol::Node>&, ftl::protocol::Error, const std::string &)> &cb) {
return universe_->onError([cb](const ftl::net::PeerPtr &p, ftl::protocol::Error e, const std::string &estr) {
return cb(std::make_shared<ftl::protocol::Node>(p), e, estr);
});
}
......@@ -160,7 +160,7 @@ void Universe::start() {
void Universe::shutdown() {
if (!active_) return;
LOG(INFO) << "Cleanup Network ...";
DLOG(1) << "Cleanup Network ...";
{
SHARED_LOCK(net_mutex_, lk);
......@@ -179,7 +179,6 @@ void Universe::shutdown() {
// FIXME: This shouldn't be needed
while (peer_instances_ > 0 && ftl::pool.size() > 0) {
LOG(INFO) << "waiting for peers to destroy...";
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
}
......@@ -191,14 +190,15 @@ bool Universe::listen(const ftl::URI &addr) {
{
UNIQUE_LOCK(net_mutex_,lk);
LOG(INFO) << "listening on " << l->uri().to_string();
DLOG(1) << "listening on " << l->uri().to_string();
listeners_.push_back(std::move(l));
}
socket_cv_.notify_one();
return true;
} catch (const std::exception &ex) {
LOG(ERROR) << "Can't listen " << addr.to_string() << ", " << ex.what();
DLOG(INFO) << "Can't listen " << addr.to_string() << ", " << ex.what();
_notifyError(nullptr, ftl::protocol::Error::kListen, ex.what());
return false;
}
}
......@@ -260,7 +260,7 @@ PeerPtr Universe::connect(const ftl::URI &u) {
_insertPeer(p);
}
else {
LOG(ERROR) << "Peer in invalid state";
DLOG(ERROR) << "Peer in invalid state";
}
_installBindings(p);
......@@ -359,7 +359,7 @@ void Universe::_removePeer(PeerPtr &p) {
p->status() == NodeStatus::kReconnecting ||
p->status() == NodeStatus::kDisconnected)) {
LOG(INFO) << "Removing disconnected peer: " << p->id().to_string();
DLOG(1) << "Removing disconnected peer: " << p->id().to_string();
on_disconnect_.triggerAsync(p);
auto ix = peer_ids_.find(p->id());
......@@ -429,8 +429,7 @@ void Universe::_periodic() {
if (u.getHost() == "localhost" || u.getHost() == "127.0.0.1") {
for (const auto &l : listeners_) {
if (l->port() == u.getPort()) {
// TODO: use UUID?
LOG(ERROR) << "Cannot connect to self";
_notifyError(nullptr, ftl::protocol::Error::kSelfConnect, "Cannot connect to self");
garbage_.push_back((*i).peer);
i = reconnects_.erase(i);
removed = true;
......@@ -453,7 +452,6 @@ void Universe::_periodic() {
else {
garbage_.push_back((*i).peer);
i = reconnects_.erase(i);
LOG(WARNING) << "Reconnection to peer failed";
}
}
}
......@@ -505,13 +503,13 @@ void Universe::_run() {
int errNum = WSAGetLastError();
switch (errNum) {
case WSAENOTSOCK : continue; // Socket was closed
default : LOG(WARNING) << "Unhandled poll error: " << errNum;
default : DLOG(WARNING) << "Unhandled poll error: " << errNum;
}
#else
switch (errno) {
case 9 : continue; // Bad file descriptor = socket closed
case 4 : continue; // Interrupted system call ... no problem
default : LOG(WARNING) << "Unhandled poll error: " << strerror(errno) << "(" << errno << ")";
default : DLOG(WARNING) << "Unhandled poll error: " << strerror(errno) << "(" << errno << ")";
}
#endif
continue;
......@@ -529,7 +527,7 @@ void Universe::_run() {
try {
csock = l->accept();
} catch (const std::exception &ex) {
LOG(ERROR) << "Connection failed: " << ex.what();
_notifyError(nullptr, ftl::protocol::Error::kConnectionFailed, ex.what());
}
lk.unlock();
......@@ -600,7 +598,7 @@ ftl::Handle Universe::onDisconnect(const std::function<bool(const PeerPtr&)> &cb
return on_disconnect_.on(cb);
}
ftl::Handle Universe::onError(const std::function<bool(const PeerPtr&, const ftl::net::Error &)> &cb) {
ftl::Handle Universe::onError(const std::function<bool(const PeerPtr&, ftl::protocol::Error, const std::string &)> &cb) {
return on_error_.on(cb);
}
......@@ -633,6 +631,12 @@ void Universe::_notifyDisconnect(Peer *p) {
on_disconnect_.triggerAsync(ptr);
}
void Universe::_notifyError(Peer *p, const ftl::net::Error &e) {
// TODO(Nick)
void Universe::_notifyError(Peer *p, ftl::protocol::Error e, const std::string &errstr) {
DLOG(ERROR) << "Net Error (" << int(e) << "): " << errstr;
const auto ptr = (p) ? _findPeer(p) : nullptr;
// Note: Net errors can have no peer
if (!ptr) return;
on_error_.triggerAsync(ptr, e, errstr);
}
......@@ -9,6 +9,7 @@
#include <msgpack.hpp>
#include <ftl/protocol.hpp>
#include <ftl/protocol/error.hpp>
#include "peer.hpp"
#include "dispatcher.hpp"
#include <ftl/uuid.hpp>
......@@ -163,7 +164,7 @@ public:
ftl::Handle onConnect(const std::function<bool(const ftl::net::PeerPtr&)>&);
ftl::Handle onDisconnect(const std::function<bool(const ftl::net::PeerPtr&)>&);
ftl::Handle onError(const std::function<bool(const ftl::net::PeerPtr&, const ftl::net::Error &)>&);
ftl::Handle onError(const std::function<bool(const ftl::net::PeerPtr&, ftl::protocol::Error, const std::string &)>&);
size_t getSendBufferSize(ftl::URI::scheme_t s);
size_t getRecvBufferSize(ftl::URI::scheme_t s);
......@@ -179,7 +180,7 @@ private:
void _cleanupPeers();
void _notifyConnect(ftl::net::Peer *);
void _notifyDisconnect(ftl::net::Peer *);
void _notifyError(ftl::net::Peer *, const ftl::net::Error &);
void _notifyError(ftl::net::Peer *, ftl::protocol::Error, const std::string &);
void _periodic();
ftl::net::PeerPtr _findPeer(const ftl::net::Peer *p);
void _removePeer(PeerPtr &p);
......@@ -214,7 +215,7 @@ private:
ftl::Handler<const ftl::net::PeerPtr&> on_connect_;
ftl::Handler<const ftl::net::PeerPtr&> on_disconnect_;
ftl::Handler<const ftl::net::PeerPtr&, const ftl::net::Error &> on_error_;
ftl::Handler<const ftl::net::PeerPtr&, ftl::protocol::Error, const std::string &> on_error_;
static std::shared_ptr<Universe> instance_;
......@@ -236,7 +237,7 @@ void Universe::bind(const std::string &name, F func) {
template <typename... ARGS>
void Universe::broadcast(const std::string &name, ARGS... args) {
SHARED_LOCK(net_mutex_,lk);
for (auto &p : peers_) {
for (const auto &p : peers_) {
if (!p || !p->waitConnection()) continue;
p->send(name, args...);
}
......@@ -265,7 +266,7 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) {
{
SHARED_LOCK(net_mutex_,lk);
for (auto &p : peers_) {
for (const auto &p : peers_) {
if (!p || !p->waitConnection()) continue;
p->asyncCall<std::optional<R>>(name, handler, args...);
}
......@@ -302,7 +303,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) {
{
SHARED_LOCK(net_mutex_,lk);
for (auto &p : peers_) {
for (const auto &p : peers_) {
if (!p || !p->waitConnection()) continue;
++sdata->sentcount;
p->asyncCall<std::vector<R>>(name, handler, args...);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment