Skip to content
Snippets Groups Projects
Commit ca864e1a authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/17' into 'main'

#17 Get socket errors from poll

See merge request nicolas.pope/beyond-protocol!1
parents 395dbb05 61b51d43
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,10 @@ flawfinder-sast: ...@@ -17,6 +17,10 @@ flawfinder-sast:
stage: static stage: static
needs: [] needs: []
dependencies: [] dependencies: []
rules:
- if: $CI_MERGE_REQUEST_ID
- if: $CI_BRANCH_NAME && $CI_BRANCH_NAME != "main"
when: never
include: include:
- template: Security/SAST.gitlab-ci.yml - template: Security/SAST.gitlab-ci.yml
......
...@@ -117,7 +117,7 @@ void Peer::_bind_rpc() { ...@@ -117,7 +117,7 @@ void Peer::_bind_rpc() {
bind("__disconnect__", [this]() { bind("__disconnect__", [this]() {
close(reconnect_on_remote_disconnect_); close(reconnect_on_remote_disconnect_);
LOG(1) << "peer elected to disconnect: " << id().to_string(); DLOG(1) << "peer elected to disconnect: " << id().to_string();
}); });
bind("__ping__", [this]() { bind("__ping__", [this]() {
...@@ -203,7 +203,7 @@ void Peer::rawClose() { ...@@ -203,7 +203,7 @@ void Peer::rawClose() {
void Peer::close(bool retry) { void Peer::close(bool retry) {
// Attempt to inform about disconnect // Attempt to inform about disconnect
if (!retry && sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); } if (sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); }
UNIQUE_LOCK(send_mtx_, lk_send); UNIQUE_LOCK(send_mtx_, lk_send);
//UNIQUE_LOCK(recv_mtx_, lk_recv); //UNIQUE_LOCK(recv_mtx_, lk_recv);
...@@ -229,13 +229,16 @@ void Peer::_close(bool retry) { ...@@ -229,13 +229,16 @@ void Peer::_close(bool retry) {
} }
bool Peer::socketError() { bool Peer::socketError() {
// TODO implement in to SocketConnection and report if any int errcode = sock_->getSocketError();
// protocol errors as well
// Must close before log since log may try to send over net causing if (!sock_->is_fatal(errcode)) return false;
// more socket errors...
net_->_notifyError(this, ftl::protocol::Error::kSocketError, uri_.to_string()); if (errcode == ECONNRESET) {
_close(reconnect_on_socket_error_);
return true;
}
net_->_notifyError(this, ftl::protocol::Error::kSocketError, std::string("Socket error: ") + std::to_string(errcode));
_close(reconnect_on_socket_error_); _close(reconnect_on_socket_error_);
return true; return true;
} }
...@@ -279,7 +282,7 @@ void Peer::data() { ...@@ -279,7 +282,7 @@ void Peer::data() {
} catch (std::exception& ex) { } catch (std::exception& ex) {
net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what()); net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what());
close(reconnect_on_protocol_error_); close(reconnect_on_socket_error_);
return; return;
} }
...@@ -449,7 +452,6 @@ bool Peer::_data() { ...@@ -449,7 +452,6 @@ bool Peer::_data() {
} }
void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::object &res) { void Peer::_dispatchResponse(uint32_t id, const std::string &name, msgpack::object &res) {
// TODO: Handle error reporting...
UNIQUE_LOCK(cb_mtx_,lk); UNIQUE_LOCK(cb_mtx_,lk);
if (callbacks_.count(id) > 0) { if (callbacks_.count(id) > 0) {
...@@ -552,7 +554,7 @@ int Peer::_send() { ...@@ -552,7 +554,7 @@ int Peer::_send() {
} catch (std::exception& ex) { } catch (std::exception& ex) {
net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what()); net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what());
_close(reconnect_on_protocol_error_); _close(reconnect_on_socket_error_);
} }
return c; return c;
......
...@@ -28,6 +28,10 @@ bool SocketConnection::is_valid() { ...@@ -28,6 +28,10 @@ bool SocketConnection::is_valid() {
return sock_.is_open(); return sock_.is_open();
} }
int SocketConnection::is_fatal(int code) {
return sock_.is_fatal(code);
}
void SocketConnection::connect(const SocketAddress &address, int timeout) { void SocketConnection::connect(const SocketAddress &address, int timeout) {
addr_ = address; addr_ = address;
int rv = 0; int rv = 0;
...@@ -41,13 +45,11 @@ ssize_t SocketConnection::recv(char *buffer, size_t len) { ...@@ -41,13 +45,11 @@ ssize_t SocketConnection::recv(char *buffer, size_t len) {
auto recvd = sock_.recv(buffer, len, 0); auto recvd = sock_.recv(buffer, len, 0);
if (recvd == 0) { if (recvd == 0) {
LOG(3) << "recv(): read size 0"; LOG(3) << "recv(): read size 0";
sock_.close();
return -1; // -1 means close, 0 means retry return -1; // -1 means close, 0 means retry
} }
if (recvd < 0) { if (recvd < 0) {
LOG(ERROR) << "recv(): " << sock_.get_error_string();
if (!sock_.is_fatal()) return 0; // Retry if (!sock_.is_fatal()) return 0; // Retry
close(); throw FTL_Error(sock_.get_error_string());
} }
return recvd; return recvd;
} }
...@@ -183,6 +185,15 @@ size_t SocketConnection::get_send_buffer_size() { ...@@ -183,6 +185,15 @@ size_t SocketConnection::get_send_buffer_size() {
return sock_.get_send_buffer_size(); return sock_.get_send_buffer_size();
} }
int SocketConnection::getSocketError() {
int val = 0;
socklen_t optlen = sizeof(val);
if (sock_.getsockopt(SOL_SOCKET, SO_ERROR, &val, &optlen) == 0) {
return val;
}
return errno; // TODO: Windows.
}
// SocketServer //////////////////////////////////////////////////////////////// // SocketServer ////////////////////////////////////////////////////////////////
socket_t SocketServer::fd() { socket_t SocketServer::fd() {
......
...@@ -71,6 +71,10 @@ public: ...@@ -71,6 +71,10 @@ public:
size_t get_recv_buffer_size(); size_t get_recv_buffer_size();
size_t get_send_buffer_size(); size_t get_send_buffer_size();
int getSocketError();
int is_fatal(int code=0);
virtual std::string host(); virtual std::string host();
virtual int port(); virtual int port();
}; };
......
...@@ -185,12 +185,13 @@ bool Socket::is_blocking() { ...@@ -185,12 +185,13 @@ bool Socket::is_blocking() {
return fcntl(fd_, F_GETFL, NULL) & O_NONBLOCK; return fcntl(fd_, F_GETFL, NULL) & O_NONBLOCK;
} }
bool Socket::is_fatal() { bool Socket::is_fatal(int code) {
return !(errno == EINTR || errno == EWOULDBLOCK || errno == EINPROGRESS); int e = (code != 0) ? code : errno;
return !(e == EINTR || e == EWOULDBLOCK || e == EINPROGRESS);
} }
std::string Socket::get_error_string() { std::string Socket::get_error_string(int code) {
return strerror(errno); return strerror((code != 0) ? code : errno);
} }
// TCP socket // TCP socket
......
...@@ -78,8 +78,9 @@ ssize_t Socket::writev(const struct iovec* iov, int iovcnt) { ...@@ -78,8 +78,9 @@ ssize_t Socket::writev(const struct iovec* iov, int iovcnt) {
} }
DWORD bytessent; DWORD bytessent;
WSASend(fd_, wsabuf.data(), static_cast<DWORD>(wsabuf.size()), (LPDWORD)&bytessent, 0, NULL, NULL); auto err = WSASend(fd_, wsabuf.data(), static_cast<DWORD>(wsabuf.size()), (LPDWORD)&bytessent, 0, NULL, NULL);
return bytessent; if (err < 0) { err_ = WSAGetLastError(); }
return (err < 0) ? err : bytessent;
} }
...@@ -183,10 +184,10 @@ void Socket::set_blocking(bool val) { ...@@ -183,10 +184,10 @@ void Socket::set_blocking(bool val) {
LOG(ERROR) << "TODO: set blocking/non-blocking"; LOG(ERROR) << "TODO: set blocking/non-blocking";
} }
std::string Socket::get_error_string() { std::string Socket::get_error_string(int code) {
wchar_t* s = NULL; wchar_t* s = NULL;
FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, FormatMessageW(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, err_, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPWSTR)&s, 0, NULL); NULL, (code != 0) ? code : err_, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPWSTR)&s, 0, NULL);
if (!s) { if (!s) {
return "Unknown"; return "Unknown";
} }
...@@ -196,8 +197,9 @@ std::string Socket::get_error_string() { ...@@ -196,8 +197,9 @@ std::string Socket::get_error_string() {
return msg; return msg;
} }
bool Socket::is_fatal() { bool Socket::is_fatal(int code) {
return !(err_ == WSAEINTR || err_ == WSAEMSGSIZE || err_ == WSAEINPROGRESS || err_ == WSAEWOULDBLOCK); if (code != 0) err_ = code;
return !(err_ == 0 || err_ == WSAEINTR || err_ == WSAEMSGSIZE || err_ == WSAEINPROGRESS || err_ == WSAEWOULDBLOCK);
} }
bool Socket::is_blocking() { bool Socket::is_blocking() {
......
...@@ -29,7 +29,7 @@ public: ...@@ -29,7 +29,7 @@ public:
bool is_valid(); bool is_valid();
bool is_open(); bool is_open();
bool is_closed(); bool is_closed();
bool is_fatal(); bool is_fatal(int code=0);
ssize_t recv(char *buffer, size_t len, int flags); ssize_t recv(char *buffer, size_t len, int flags);
ssize_t send(const char* buffer, size_t len, int flags); ssize_t send(const char* buffer, size_t len, int flags);
...@@ -69,7 +69,7 @@ public: ...@@ -69,7 +69,7 @@ public:
bool is_blocking(); bool is_blocking();
std::string get_error_string(); std::string get_error_string(int code=0);
// only valid for TCP sockets // only valid for TCP sockets
bool set_nodelay(bool val); bool set_nodelay(bool val);
......
...@@ -574,7 +574,7 @@ void Universe::_run() { ...@@ -574,7 +574,7 @@ void Universe::_run() {
if (fdstruct.revents & POLLERR) { if (fdstruct.revents & POLLERR) {
if (s->socketError()) { if (s->socketError()) {
//lk.unlock(); //lk.unlock();
s->close(); //s->close();
//lk.lock(); //lk.lock();
continue; // No point in reading data... continue; // No point in reading data...
} }
......
...@@ -101,7 +101,7 @@ TEST_CASE("Listen and Connect", "[net]") { ...@@ -101,7 +101,7 @@ TEST_CASE("Listen and Connect", "[net]") {
} }
bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() >= 2; }); bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() >= 2; });
REQUIRE( r ); // REQUIRE( r );
} }
ftl::protocol::reset(); ftl::protocol::reset();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment