Skip to content
Snippets Groups Projects
Commit 5410f99f authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Reduce buffer size and remove logs

parent 0bbe1530
No related branches found
No related tags found
No related merge requests found
...@@ -174,7 +174,7 @@ bool Peer::reconnect() { ...@@ -174,7 +174,7 @@ bool Peer::reconnect() {
URI uri(uri_); URI uri(uri_);
LOG(INFO) << "Reconnecting to " << uri_.to_string() << " ..."; LOG(1) << "Reconnecting to " << uri_.to_string() << " ...";
try { try {
_connect(); _connect();
...@@ -254,31 +254,17 @@ NodeType Peer::getType() const { ...@@ -254,31 +254,17 @@ NodeType Peer::getType() const {
} }
void Peer::data() { void Peer::data() {
//UNIQUE_LOCK(recv_mtx_,lk);
if (!sock_->is_valid()) { return; } if (!sock_->is_valid()) { return; }
int rc = 0; int rc = 0;
{ // Only need to lock and reserve buffer if there isn't enough
if (recv_buf_.buffer_capacity() < kMaxMessage) {
UNIQUE_LOCK(recv_mtx_,lk); UNIQUE_LOCK(recv_mtx_,lk);
recv_buf_.reserve_buffer(kMaxMessage); recv_buf_.reserve_buffer(kMaxMessage);
} }
if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) {
net_->_notifyError(this, ftl::protocol::Error::kBufferSize, "Buffer is at capacity");
return;
}
int cap = static_cast<int>(recv_buf_.buffer_capacity()); int cap = static_cast<int>(recv_buf_.buffer_capacity());
// Buffer acquired, recv can be called outside the lock.
// TODO: Check if this is actually correct. If two threads call recv()
// outside the lock and the second thread to call recv() re-acquires
// the lock first, buffer_consumed() will be called first with second
// thread's number of bytes (rc).
//auto ctr = dbg_recv_begin_ctr_++;
//lk.unlock();
try { try {
rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity()); rc = sock_->recv(recv_buf_.buffer(), recv_buf_.buffer_capacity());
...@@ -308,16 +294,9 @@ void Peer::data() { ...@@ -308,16 +294,9 @@ void Peer::data() {
return; return;
} }
// Re-acquire lock before processing buffer further // May possibly need locking
//lk.lock();
// buffer_consumed() will not be updated with correct value, race condition
// described above has occurred
//CHECK(ctr == dbg_recv_end_ctr_++) << "race in Peer::data()";
recv_buf_.buffer_consumed(rc); recv_buf_.buffer_consumed(rc);
//UNIQUE_LOCK(recv_mtx_, lk);
recv_checked_.clear(); recv_checked_.clear();
if (!already_processing_.test_and_set()) { if (!already_processing_.test_and_set()) {
//lk.unlock(); //lk.unlock();
...@@ -543,7 +522,6 @@ bool Peer::waitConnection(int s) { ...@@ -543,7 +522,6 @@ bool Peer::waitConnection(int s) {
}); });
cv.wait_for(lk, seconds(s), [this]() { return status_ == NodeStatus::kConnected;}); cv.wait_for(lk, seconds(s), [this]() { return status_ == NodeStatus::kConnected;});
LOG(ERROR) << "CONN STAT = " << int(status_);
return status_ == NodeStatus::kConnected; return status_ == NodeStatus::kConnected;
} }
......
...@@ -195,7 +195,7 @@ class Peer { ...@@ -195,7 +195,7 @@ class Peer {
int connectionCount() const { return connection_count_; } int connectionCount() const { return connection_count_; }
public: public:
static const int kMaxMessage = 10*1024*1024; // 10Mb currently static const int kMaxMessage = 2*1024*1024; // 10Mb currently
protected: protected:
void data(); // Process one message from socket void data(); // Process one message from socket
......
...@@ -374,8 +374,6 @@ void Universe::_removePeer(PeerPtr &p) { ...@@ -374,8 +374,6 @@ void Universe::_removePeer(PeerPtr &p) {
} }
} }
LOG(INFO) << "Remove peer: " << int(p->status());
if (p->status() == NodeStatus::kReconnecting) { if (p->status() == NodeStatus::kReconnecting) {
reconnects_.push_back({reconnect_attempts_, 1.0f, p}); reconnects_.push_back({reconnect_attempts_, 1.0f, p});
} else { } else {
...@@ -427,7 +425,6 @@ std::list<PeerPtr> Universe::getPeers() const { ...@@ -427,7 +425,6 @@ std::list<PeerPtr> Universe::getPeers() const {
} }
void Universe::_periodic() { void Universe::_periodic() {
LOG(INFO) << "PERIODIC " << reconnects_.size();
auto i = reconnects_.begin(); auto i = reconnects_.begin();
while (i != reconnects_.end()) { while (i != reconnects_.end()) {
...@@ -458,9 +455,7 @@ void Universe::_periodic() { ...@@ -458,9 +455,7 @@ void Universe::_periodic() {
peer->status_ = NodeStatus::kConnecting; peer->status_ = NodeStatus::kConnecting;
i = reconnects_.erase(i); i = reconnects_.erase(i);
//ftl::pool.push([peer](int id) { //ftl::pool.push([peer](int id) {
if (!peer->reconnect()) { peer->reconnect();
LOG(INFO) << "Reconnect failed";
}
//}); //});
/*if ((*i).peer->reconnect()) { /*if ((*i).peer->reconnect()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment