Skip to content
Snippets Groups Projects
Commit ed7cf66a authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Allow setting of tcp buffers in config

parent 8a0b7a60
No related branches found
No related tags found
1 merge request!95Implements #156 expose config options
...@@ -189,6 +189,9 @@ class Universe : public ftl::Configurable { ...@@ -189,6 +189,9 @@ class Universe : public ftl::Configurable {
void removeCallback(ftl::net::callback_t cbid); void removeCallback(ftl::net::callback_t cbid);
size_t getSendBufferSize() const { return send_size_; }
size_t getRecvBufferSize() const { return recv_size_; }
private: private:
void _run(); void _run();
int _setDescriptors(); int _setDescriptors();
...@@ -220,6 +223,13 @@ class Universe : public ftl::Configurable { ...@@ -220,6 +223,13 @@ class Universe : public ftl::Configurable {
std::list<ReconnectInfo> reconnects_; std::list<ReconnectInfo> reconnects_;
size_t phase_; size_t phase_;
std::list<ftl::net::Peer*> garbage_; std::list<ftl::net::Peer*> garbage_;
size_t send_size_;
size_t recv_size_;
double periodic_time_;
size_t reconnect_attempts_;
// NOTE: Must always be last member
std::thread thread_; std::thread thread_;
struct ConnHandler { struct ConnHandler {
......
...@@ -48,9 +48,6 @@ using ftl::net::Universe; ...@@ -48,9 +48,6 @@ using ftl::net::Universe;
using ftl::net::callback_t; using ftl::net::callback_t;
using std::vector; using std::vector;
#define TCP_SEND_BUFFER_SIZE (1024*1024*1)
#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1)
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
const char *data = s.data(); const char *data = s.data();
...@@ -70,7 +67,7 @@ ftl::UUID ftl::net::this_peer; ...@@ -70,7 +67,7 @@ ftl::UUID ftl::net::this_peer;
//static ctpl::thread_pool pool(5); //static ctpl::thread_pool pool(5);
// TODO:(nick) Move to tcp_internal.cpp // TODO:(nick) Move to tcp_internal.cpp
static SOCKET tcpConnect(URI &uri) { static SOCKET tcpConnect(URI &uri, int ssize, int rsize) {
int rc; int rc;
//sockaddr_in destAddr; //sockaddr_in destAddr;
...@@ -93,11 +90,11 @@ static SOCKET tcpConnect(URI &uri) { ...@@ -93,11 +90,11 @@ static SOCKET tcpConnect(URI &uri) {
int flags =1; int flags =1;
if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_RECEIVE_BUFFER_SIZE; int a = rsize;
if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
} }
a = TCP_SEND_BUFFER_SIZE; a = ssize;
if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
} }
...@@ -185,11 +182,11 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals ...@@ -185,11 +182,11 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
int flags =1; int flags =1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_RECEIVE_BUFFER_SIZE; int a = u->getRecvBufferSize();
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
} }
a = TCP_SEND_BUFFER_SIZE; a = u->getSendBufferSize();
if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
} }
...@@ -242,12 +239,12 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), ...@@ -242,12 +239,12 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
scheme_ = uri.getProtocol(); scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) { if (uri.getProtocol() == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) status_ = kConnecting; if (sock_ != INVALID_SOCKET) status_ = kConnecting;
else status_ = kReconnecting; else status_ = kReconnecting;
} else if (uri.getProtocol() == URI::SCHEME_WS) { } else if (uri.getProtocol() == URI::SCHEME_WS) {
LOG(INFO) << "Websocket connect " << uri.getPath(); LOG(INFO) << "Websocket connect " << uri.getPath();
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) { if (sock_ != INVALID_SOCKET) {
if (!ws_connect(sock_, uri)) { if (!ws_connect(sock_, uri)) {
LOG(ERROR) << "Websocket connection failed"; LOG(ERROR) << "Websocket connection failed";
...@@ -310,7 +307,7 @@ bool Peer::reconnect() { ...@@ -310,7 +307,7 @@ bool Peer::reconnect() {
LOG(INFO) << "Reconnecting to " << uri_ << " ..."; LOG(INFO) << "Reconnecting to " << uri_ << " ...";
if (scheme_ == URI::SCHEME_TCP) { if (scheme_ == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) { if (sock_ != INVALID_SOCKET) {
status_ = kConnecting; status_ = kConnecting;
is_waiting_ = true; is_waiting_ = true;
...@@ -319,7 +316,7 @@ bool Peer::reconnect() { ...@@ -319,7 +316,7 @@ bool Peer::reconnect() {
return false; return false;
} }
} else if (scheme_ == URI::SCHEME_WS) { } else if (scheme_ == URI::SCHEME_WS) {
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize());
if (sock_ != INVALID_SOCKET) { if (sock_ != INVALID_SOCKET) {
if (!ws_connect(sock_, uri)) { if (!ws_connect(sock_, uri)) {
return false; return false;
......
...@@ -22,14 +22,34 @@ using std::optional; ...@@ -22,14 +22,34 @@ using std::optional;
using ftl::config::json_t; using ftl::config::json_t;
using ftl::net::callback_t; using ftl::net::callback_t;
#define TCP_SEND_BUFFER_SIZE (512*1024)
#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1)
callback_t ftl::net::Universe::cbid__ = 0; callback_t ftl::net::Universe::cbid__ = 0;
Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { Universe::Universe() :
Configurable(),
active_(true),
this_peer(ftl::net::this_peer),
phase_(0),
send_size_(TCP_SEND_BUFFER_SIZE),
recv_size_(TCP_RECEIVE_BUFFER_SIZE),
periodic_time_(1.0),
reconnect_attempts_(50),
thread_(Universe::__start, this) {
_installBindings(); _installBindings();
} }
Universe::Universe(nlohmann::json &config) : Universe::Universe(nlohmann::json &config) :
Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { Configurable(config),
active_(true),
this_peer(ftl::net::this_peer),
phase_(0),
send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)),
recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)),
periodic_time_(value("periodics", 1.0)),
reconnect_attempts_(value("reconnect_attempts",50)),
thread_(Universe::__start, this) {
_installBindings(); _installBindings();
} }
...@@ -186,7 +206,7 @@ void Universe::_cleanupPeers() { ...@@ -186,7 +206,7 @@ void Universe::_cleanupPeers() {
i = peers_.erase(i); i = peers_.erase(i);
if (p->status() == ftl::net::Peer::kReconnecting) { if (p->status() == ftl::net::Peer::kReconnecting) {
reconnects_.push_back({50, 1.0f, p}); reconnects_.push_back({reconnect_attempts_, 1.0f, p});
} else { } else {
//delete p; //delete p;
garbage_.push_back(p); garbage_.push_back(p);
...@@ -250,14 +270,13 @@ void Universe::_run() { ...@@ -250,14 +270,13 @@ void Universe::_run() {
// Do periodics // Do periodics
auto now = std::chrono::high_resolution_clock::now(); auto now = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = now - start; std::chrono::duration<double> elapsed = now - start;
if (elapsed.count() >= 1.0) { if (elapsed.count() >= periodic_time_) {
start = now; start = now;
_periodic(); _periodic();
} }
// It is an error to use "select" with no sockets ... so just sleep // It is an error to use "select" with no sockets ... so just sleep
if (n == 0) { if (n == 0) {
LOG(ERROR) << "NO SOCKETS";
std::this_thread::sleep_for(std::chrono::milliseconds(300)); std::this_thread::sleep_for(std::chrono::milliseconds(300));
continue; continue;
} }
......
...@@ -50,6 +50,9 @@ class Universe { ...@@ -50,6 +50,9 @@ class Universe {
callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; } callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; }
callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; } callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; }
size_t getSendBufferSize() const { return 10*1024; }
size_t getRecvBufferSize() const { return 10*1024; }
}; };
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment