diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 6fc9397d7cb544fb1fcbcf89278b5660a5d62c97..04beff81f5f5891bc293f31100918e316f3df015 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -416,14 +416,18 @@ void Peer::data() { } recv_buf_.buffer_consumed(rc); + + // No thread currently processing messages so start one + if (is_waiting_) { + pool.push([](int id, Peer *p) { + p->_data(); + //p->is_waiting_ = true; + }, this); + is_waiting_ = false; + } lk.unlock(); LOG(INFO) << "Received " << rc << " bytes"; - - pool.push([](int id, Peer *p) { - p->_data(); - //p->is_waiting_ = true; - }, this); } bool Peer::_data() { @@ -440,7 +444,7 @@ bool Peer::_data() { msgpack::object_handle msg; while (recv_buf_.next(msg)) { // CHECK Safe to unlock here? - //lk.unlock(); + lk.unlock(); ws_read_header_ = false; msgpack::object obj = msg.get(); if (status_ != kConnected) { @@ -463,7 +467,7 @@ bool Peer::_data() { disp_->dispatch(*this, obj); // Relock before next loop of while - //lk.lock(); + lk.lock(); if (scheme_ == ftl::URI::SCHEME_WS && recv_buf_.nonparsed_size() > 0) { wsheader_type ws; @@ -473,6 +477,7 @@ bool Peer::_data() { ws_read_header_ = true; } } + is_waiting_ = true; // Can start another thread... return false; }