diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 8b109651125e5db091de14ea03b2312e397829ae..52e082373276edc14ea553fb0dc13ee6f8a37e20 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -113,6 +113,7 @@ static SOCKET tcpConnect(URI &uri) { if (rc < 0) { if (errno == EINPROGRESS) { + // TODO(Nick) Move to main select thread to prevent blocking fd_set myset; struct timeval tv; tv.tv_sec = 1; @@ -389,34 +390,41 @@ void Peer::error(int e) { void Peer::data() { //if (!is_waiting_) return; - is_waiting_ = false; + //is_waiting_ = false; + std::unique_lock<std::recursive_mutex> lk(recv_mtx_); + recv_buf_.reserve_buffer(kMaxMessage); + + if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { + LOG(WARNING) << "Net buffer at capacity"; + return; + } + + int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), recv_buf_.buffer_capacity(), 0); + + if (rc <= 0) { + return; + } + + recv_buf_.buffer_consumed(rc); + lk.unlock(); + pool.push([](int id, Peer *p) { p->_data(); - p->is_waiting_ = true; + //p->is_waiting_ = true; }, this); } -/*inline std::ostream& hex_dump(std::ostream& o, std::string const& v) { - std::ios::fmtflags f(o.flags()); - o << std::hex; - for (auto c : v) { - o << "0x" << std::setw(2) << std::setfill('0') << (static_cast<int>(c) & 0xff) << ' '; - } - o.flags(f); - return o; -}*/ - bool Peer::_data() { std::unique_lock<std::recursive_mutex> lk(recv_mtx_); - recv_buf_.reserve_buffer(kMaxMessage); + /*recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); if (rc <= 0) { return false; } - recv_buf_.buffer_consumed(rc); + recv_buf_.buffer_consumed(rc);*/ if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) { wsheader_type ws; @@ -426,11 +434,6 @@ bool Peer::_data() { ws_read_header_ = true; } - /*if (rc > 0) { - hex_dump(std::cout, std::string((char*)recv_buf_.nonparsed_buffer(), recv_buf_.nonparsed_size())); - std::cout << std::endl; - }*/ - msgpack::object_handle msg; while (recv_buf_.next(msg)) { ws_read_header_ = false;