Skip to content
Snippets Groups Projects
Commit 27912b25 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/132/netblock' into 'master'

Resolve #132 Network performance

Closes #132

See merge request nicolas.pope/ftl!74
parents 1b90993a 11039b1b
No related branches found
No related tags found
1 merge request!74Resolve #132 Network performance
Pipeline #12433 passed
......@@ -14,7 +14,10 @@ int main(int argc, char **argv) {
auto sources = ftl::createArray<ftl::rgbd::Source>(root, "sources", net);
ftl::rgbd::Group group;
for (auto s : sources) group.addSource(s);
for (auto s : sources) {
s->setChannel(ftl::rgbd::kChanRight);
group.addSource(s);
}
group.sync([](const ftl::rgbd::FrameSet &fs) {
LOG(INFO) << "Complete set: " << fs.timestamp;
......@@ -23,7 +26,7 @@ int main(int argc, char **argv) {
while (ftl::running) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
for (auto s : sources) s->grab();
for (auto s : sources) s->grab(30);
}
return 0;
......
......@@ -366,6 +366,7 @@ void SceneRep::_integrateDepthMaps() {
//cudaSafeCall(cudaDeviceSynchronize());
for (size_t i=0; i<cameras_.size(); ++i) {
if (!cameras_[i].source->isReady()) continue;
//ftl::cuda::clear_depth(*(cameras_[i].gpu.depth2_tex_), integ_stream_);
ftl::cuda::clear_points(*(cameras_[i].gpu.points_tex_), integ_stream_);
ftl::cuda::mls_smooth(*(cameras_[i].gpu.points_tex_), m_hashParams, cameras_.size(), i, integ_stream_);
......
......@@ -7,7 +7,8 @@
#define POOL_SIZE 10
// #define DEBUG_MUTEX
#define DEBUG_MUTEX
#define MUTEX_TIMEOUT 15
#if defined DEBUG_MUTEX
#include <loguru.hpp>
......@@ -18,8 +19,8 @@
#define RECURSIVE_MUTEX std::recursive_timed_mutex
#define SHARED_MUTEX std::shared_timed_mutex
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock";
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock";
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); };
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); };
#else
#define MUTEX std::mutex
......
......@@ -26,10 +26,10 @@ Master::Master(Configurable *root, Universe *net)
return {json.dump()};
});
net->broadcast("log_subscribe", net->id());
//net->broadcast("log_subscribe", net->id());
net->onConnect([this](ftl::net::Peer*) {
net_->broadcast("log_subscribe", net_->id());
//net_->broadcast("log_subscribe", net_->id());
});
}
......
......@@ -300,8 +300,14 @@ R Peer::call(const std::string &name, ARGS... args) {
cv.notify_one();
}, std::forward<ARGS>(args)...);
// While waiting, do some other thread jobs...
std::function<void(int)> j;
while (!hasreturned && (bool)(j=ftl::pool.pop())) {
LOG(INFO) << "Doing job whilst waiting...";
j(-1);
}
{ // Block thread until async callback notifies us
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
}
......
......@@ -43,6 +43,8 @@ using ftl::net::Universe;
using ftl::net::callback_t;
using std::vector;
#define TCP_BUFFER_SIZE (1024*1024*10)
/*static std::string hexStr(const std::string &s)
{
const char *data = s.data();
......@@ -82,8 +84,16 @@ static SOCKET tcpConnect(URI &uri) {
return INVALID_SOCKET;
}
//int flags =1;
//if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int flags =1;
if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_BUFFER_SIZE;
if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET;
......@@ -166,8 +176,15 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
is_waiting_ = true;
scheme_ = ftl::URI::SCHEME_TCP;
//int flags =1;
//if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int flags =1;
if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_BUFFER_SIZE;
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
// Send the initiating handshake if valid
if (status_ == kConnecting) {
......@@ -394,46 +411,52 @@ void Peer::error(int e) {
}
void Peer::data() {
UNIQUE_LOCK(recv_mtx_,lk);
recv_buf_.reserve_buffer(kMaxMessage);
{
UNIQUE_LOCK(recv_mtx_,lk);
if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) {
LOG(WARNING) << "Net buffer at capacity";
return;
}
int rc=0;
int c=0;
int cap = recv_buf_.buffer_capacity();
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0);
//do {
recv_buf_.reserve_buffer(kMaxMessage);
if (rc >= cap) {
LOG(WARNING) << "More than buffers worth of data received";
}
if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER";
if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) {
LOG(WARNING) << "Net buffer at capacity";
return;
}
if (rc == 0) {
close();
return;
} else if (rc < 0) {
socketError();
return;
}
recv_buf_.buffer_consumed(rc);
// No thread currently processing messages so start one
if (is_waiting_) {
ftl::pool.push([](int id, Peer *p) {
p->_data();
//p->is_waiting_ = true;
}, this);
is_waiting_ = false;
int cap = recv_buf_.buffer_capacity();
rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0);
//if (c > 0 && rc > 0) LOG(INFO) << "RECV: " << rc;
if (rc >= cap-1) {
LOG(WARNING) << "More than buffers worth of data received";
}
if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER";
if (rc == 0) {
close();
return;
} else if (rc < 0 && c == 0) {
socketError();
return;
}
//if (rc == -1) break;
++c;
recv_buf_.buffer_consumed(rc);
//} while (rc > 0);
}
lk.unlock();
//LOG(INFO) << "Received " << rc << " bytes";
ftl::pool.push([this](int id) {
_data();
});
}
bool Peer::_data() {
msgpack::object_handle msg;
UNIQUE_LOCK(recv_mtx_,lk);
if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) {
......@@ -444,10 +467,18 @@ bool Peer::_data() {
ws_read_header_ = true;
}
msgpack::object_handle msg;
while (recv_buf_.next(msg)) {
ws_read_header_ = false;
msgpack::object obj = msg.get();
if (!recv_buf_.next(msg)) return false;
msgpack::object obj = msg.get();
lk.unlock();
ftl::pool.push([this](int id) {
_data();
});
if (status_ != kConnected) {
// If not connected, must lock to make sure no other thread performs this step
UNIQUE_LOCK(recv_mtx_,lk);
// Verify still not connected after lock
if (status_ != kConnected) {
// First message must be a handshake
try {
......@@ -463,32 +494,21 @@ bool Peer::_data() {
// to read next message before completion.
// The handshake handler must not block.
disp_->dispatch(*this, obj);
return true;
}
} catch(...) {
_badClose(false);
LOG(ERROR) << "Bad first message format";
return false;
}
} else {
// CHECK Safe to unlock here?
is_waiting_ = true;
lk.unlock();
disp_->dispatch(*this, obj);
// Relock before next loop of while
lk.lock();
is_waiting_ = false;
}
if (scheme_ == ftl::URI::SCHEME_WS && recv_buf_.nonparsed_size() > 0) {
wsheader_type ws;
if (ws_parse(recv_buf_, ws) < 0) {
return false;
}
ws_read_header_ = true;
}
}
is_waiting_ = true; // Can start another thread...
return false;
disp_->dispatch(*this, obj);
// Lock again before freeing msg handle
UNIQUE_LOCK(recv_mtx_,lk2);
return true;
}
void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
......
......@@ -20,7 +20,7 @@ struct FrameSet {
unsigned int mask;
};
static const size_t kFrameBufferSize = 10;
static const size_t kFrameBufferSize = 20;
class Group {
public:
......
......@@ -269,14 +269,14 @@ void NetSource::_updateURI() {
bool NetSource::grab(int n, int b) {
// Choose highest requested number of frames
maxN_ = std::max(maxN_,(n == -1) ? 10 : n);
maxN_ = std::max(maxN_,(n == -1) ? ftl::rgbd::detail::kDefaultFrameCount : n);
// Choose best requested quality
minB_ = std::min(minB_,(b == -1) ? 0 : b);
// Send k frames before end to prevent unwanted pause
// Unless only a single frame is requested
if ((N_ <= 2 && maxN_ > 1) || N_ == 0) {
if ((N_ <= maxN_/2 && maxN_ > 1) || N_ == 0) {
const ftl::rgbd::channel_t chan = host_->getChannel();
N_ = maxN_;
......
......@@ -11,6 +11,8 @@ namespace ftl {
namespace rgbd {
namespace detail {
static const int kDefaultFrameCount = 30;
/**
* RGBD source from either a stereo video file with left + right images, or
* direct from two camera devices. A variety of algorithms are included for
......
......@@ -389,7 +389,8 @@ void Streamer::_schedule() {
// Create jobs for each chunk
for (int i=0; i<kChunkCount; ++i) {
// Add chunk job to thread pool
ftl::pool.push([this,src](int id, int chunk) {
ftl::pool.push([this,src,i](int id) {
int chunk = i;
try {
if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) {
_encodeAndTransmit(src, chunk);
......@@ -403,7 +404,7 @@ void Streamer::_schedule() {
std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_;
job_cv_.notify_one();
}, i);
});
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment