diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 24384b39af91a9c655a27217b99741276d08a627..d2a3ce3b8957bb000f9770659e9f6df4d4ef1ca5 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -188,6 +188,9 @@ class Universe : public ftl::Configurable { ftl::net::callback_t onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)>&); void removeCallback(ftl::net::callback_t cbid); + + size_t getSendBufferSize() const { return send_size_; } + size_t getRecvBufferSize() const { return recv_size_; } private: void _run(); @@ -220,6 +223,13 @@ class Universe : public ftl::Configurable { std::list<ReconnectInfo> reconnects_; size_t phase_; std::list<ftl::net::Peer*> garbage_; + + size_t send_size_; + size_t recv_size_; + double periodic_time_; + size_t reconnect_attempts_; + + // NOTE: Must always be last member std::thread thread_; struct ConnHandler { diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 517273c7d41f6bd9261651944dd6d466153c68c4..6a864ac93fb9dd590f325c85d19b184977bce362 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -48,9 +48,6 @@ using ftl::net::Universe; using ftl::net::callback_t; using std::vector; -#define TCP_SEND_BUFFER_SIZE (1024*1024*1) -#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) - /*static std::string hexStr(const std::string &s) { const char *data = s.data(); @@ -70,7 +67,7 @@ ftl::UUID ftl::net::this_peer; //static ctpl::thread_pool pool(5); // TODO:(nick) Move to tcp_internal.cpp -static SOCKET tcpConnect(URI &uri) { +static SOCKET tcpConnect(URI &uri, int ssize, int rsize) { int rc; //sockaddr_in destAddr; @@ -93,11 +90,11 @@ static SOCKET tcpConnect(URI &uri) { int flags =1; if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; - int a = TCP_RECEIVE_BUFFER_SIZE; + int a = rsize; if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } - a = TCP_SEND_BUFFER_SIZE; + a = ssize; if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } @@ -185,11 +182,11 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals int flags =1; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; - int a = TCP_RECEIVE_BUFFER_SIZE; + int a = u->getRecvBufferSize(); if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } - a = TCP_SEND_BUFFER_SIZE; + a = u->getSendBufferSize(); if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); } @@ -242,12 +239,12 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) status_ = kConnecting; else status_ = kReconnecting; } else if (uri.getProtocol() == URI::SCHEME_WS) { LOG(INFO) << "Websocket connect " << uri.getPath(); - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, u->getSendBufferSize(), u->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { LOG(ERROR) << "Websocket connection failed"; @@ -310,7 +307,7 @@ bool Peer::reconnect() { LOG(INFO) << "Reconnecting to " << uri_ << " ..."; if (scheme_ == URI::SCHEME_TCP) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { status_ = kConnecting; is_waiting_ = true; @@ -319,7 +316,7 @@ bool Peer::reconnect() { return false; } } else if (scheme_ == URI::SCHEME_WS) { - sock_ = tcpConnect(uri); + sock_ = tcpConnect(uri, universe_->getSendBufferSize(), universe_->getRecvBufferSize()); if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { return false; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index ee6f8911ba186b6ad43670a993f89fb16cfbb1a4..03c611868c5af82908efd41ee7e446bae6585473 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -22,14 +22,34 @@ using std::optional; using ftl::config::json_t; using ftl::net::callback_t; +#define TCP_SEND_BUFFER_SIZE (512*1024) +#define TCP_RECEIVE_BUFFER_SIZE (1024*1024*1) + callback_t ftl::net::Universe::cbid__ = 0; -Universe::Universe() : Configurable(), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { +Universe::Universe() : + Configurable(), + active_(true), + this_peer(ftl::net::this_peer), + phase_(0), + send_size_(TCP_SEND_BUFFER_SIZE), + recv_size_(TCP_RECEIVE_BUFFER_SIZE), + periodic_time_(1.0), + reconnect_attempts_(50), + thread_(Universe::__start, this) { _installBindings(); } Universe::Universe(nlohmann::json &config) : - Configurable(config), active_(true), this_peer(ftl::net::this_peer), phase_(0), thread_(Universe::__start, this) { + Configurable(config), + active_(true), + this_peer(ftl::net::this_peer), + phase_(0), + send_size_(value("tcp_send_buffer",TCP_SEND_BUFFER_SIZE)), + recv_size_(value("tcp_recv_buffer",TCP_RECEIVE_BUFFER_SIZE)), + periodic_time_(value("periodics", 1.0)), + reconnect_attempts_(value("reconnect_attempts",50)), + thread_(Universe::__start, this) { _installBindings(); } @@ -186,7 +206,7 @@ void Universe::_cleanupPeers() { i = peers_.erase(i); if (p->status() == ftl::net::Peer::kReconnecting) { - reconnects_.push_back({50, 1.0f, p}); + reconnects_.push_back({reconnect_attempts_, 1.0f, p}); } else { //delete p; garbage_.push_back(p); @@ -250,14 +270,13 @@ void Universe::_run() { // Do periodics auto now = std::chrono::high_resolution_clock::now(); std::chrono::duration<double> elapsed = now - start; - if (elapsed.count() >= 1.0) { + if (elapsed.count() >= periodic_time_) { start = now; _periodic(); } // It is an error to use "select" with no sockets ... so just sleep if (n == 0) { - LOG(ERROR) << "NO SOCKETS"; std::this_thread::sleep_for(std::chrono::milliseconds(300)); continue; } diff --git a/components/net/cpp/test/peer_unit.cpp b/components/net/cpp/test/peer_unit.cpp index 09eb4bef9e70171b84e57f1e954e833aa9768566..c4ace71797063eef78bafee6e3784f0f5e4fa124 100644 --- a/components/net/cpp/test/peer_unit.cpp +++ b/components/net/cpp/test/peer_unit.cpp @@ -50,6 +50,9 @@ class Universe { callback_t onConnect(const std::function<void(Peer*)> &f) { return 0; } callback_t onDisconnect(const std::function<void(Peer*)> &f) { return 0; } + + size_t getSendBufferSize() const { return 10*1024; } + size_t getRecvBufferSize() const { return 10*1024; } }; } }