diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index bf0aae4c9c937d02aef8566a1cc365e373b9c416..9f0d25d2beafa63b2562e6ea752e3b30e31bcfc1 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -195,22 +195,6 @@ class Peer { h(*this, args...); } } - - /*template <typename... ARGS> - int _send(const std::string &t, ARGS... args); - - template <typename... ARGS> - int _send(const array &b, ARGS... args); - - template <typename T, typename... ARGS> - int _send(const std::vector<T> &t, ARGS... args); - - template <typename... Types, typename... ARGS> - int _send(const std::tuple<Types...> &t, ARGS... args); - - template <typename T, typename... ARGS, - ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> - int _send(const T &t, ARGS... args);*/ private: // Data Status status_; diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index fb69ce699850bed6550098fdfae0e3a652ba3ae6..186b4ba7853947bf49de96c96325e565ddd3e675 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -310,7 +310,7 @@ void Peer::data() { } bool Peer::_data() { - // std::unique_lock<std::mutex> lk(recv_mtx_); + std::unique_lock<std::mutex> lk(recv_mtx_); recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); @@ -556,6 +556,8 @@ int Peer::_send() { } Peer::~Peer() { + std::unique_lock<std::mutex> lk1(send_mtx_); + std::unique_lock<std::mutex> lk2(recv_mtx_); close(); delete disp_; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 2cee6dab1728908b6ce81054f729952761f8fbec..12a391923ad491f70ace9c890cf83ad77555f919 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -250,7 +250,10 @@ void Universe::_run() { //Some kind of error occured, it is usually possible to recover from this. if (selres < 0) { - std::cout << "SELECT ERROR " << selres << " - " << strerror(errno) << std::endl; + switch (errno) { + case 9 : continue; // Bad file descriptor = socket closed + default : std::cout << "Unknown select error: " << strerror(errno) << std::endl; + } continue; } else if (selres == 0) { // Timeout, nothing to do... @@ -266,24 +269,17 @@ void Universe::_run() { int rsize = sizeof(sockaddr_storage); sockaddr_storage addr; - //int freeclient = freeSocket(); - - //if (freeclient >= 0) { - // TODO Limit connection rate or allow a pause in accepting - // TODO Send auto reject message under heavy load - - //Finally accept this client connection. - int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); - - if (csock != INVALID_SOCKET) { - auto p = new Peer(csock, &disp_); - peers_.push_back(p); - _installBindings(p); - p->onConnect([this](Peer &p) { - peer_ids_[p.id()] = &p; - }); - } - //} + //Finally accept this client connection. + int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); + + if (csock != INVALID_SOCKET) { + auto p = new Peer(csock, &disp_); + peers_.push_back(p); + _installBindings(p); + p->onConnect([this](Peer &p) { + peer_ids_[p.id()] = &p; + }); + } } } } @@ -293,12 +289,7 @@ void Universe::_run() { if (s != NULL && s->isValid()) { //If message received from this client then deal with it if (FD_ISSET(s->_socket(), &sfdread_)) { - //s->data(); - //std::cout << "QUEUE DATA PROC" << std::endl; - //p.push([](int id, Peer *s) { - // std::cout << "Processing in thread " << std::to_string(id) << std::endl; - s->data(); - //}, s); + s->data(); } if (FD_ISSET(s->_socket(), &sfderror_)) { s->socketError(); diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index c6720808c49e2a2ef4fd0c3f163d2d4f396ba219..d714ca0b9847bca2acbeed8b13f5f38f2e488d31 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -238,7 +238,7 @@ TEST_CASE("Universe::publish()", "") { sleep_for(milliseconds(50)); p->close(); - sleep_for(milliseconds(500)); + sleep_for(milliseconds(100)); a.publish("ftl://test2", 56); sleep_for(milliseconds(50));