Skip to content
Snippets Groups Projects
Commit 11039b1b authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolve #132 Network performance

parent 1b90993a
No related branches found
No related tags found
No related merge requests found
...@@ -14,7 +14,10 @@ int main(int argc, char **argv) { ...@@ -14,7 +14,10 @@ int main(int argc, char **argv) {
auto sources = ftl::createArray<ftl::rgbd::Source>(root, "sources", net); auto sources = ftl::createArray<ftl::rgbd::Source>(root, "sources", net);
ftl::rgbd::Group group; ftl::rgbd::Group group;
for (auto s : sources) group.addSource(s); for (auto s : sources) {
s->setChannel(ftl::rgbd::kChanRight);
group.addSource(s);
}
group.sync([](const ftl::rgbd::FrameSet &fs) { group.sync([](const ftl::rgbd::FrameSet &fs) {
LOG(INFO) << "Complete set: " << fs.timestamp; LOG(INFO) << "Complete set: " << fs.timestamp;
...@@ -23,7 +26,7 @@ int main(int argc, char **argv) { ...@@ -23,7 +26,7 @@ int main(int argc, char **argv) {
while (ftl::running) { while (ftl::running) {
std::this_thread::sleep_for(std::chrono::milliseconds(20)); std::this_thread::sleep_for(std::chrono::milliseconds(20));
for (auto s : sources) s->grab(); for (auto s : sources) s->grab(30);
} }
return 0; return 0;
......
...@@ -366,6 +366,7 @@ void SceneRep::_integrateDepthMaps() { ...@@ -366,6 +366,7 @@ void SceneRep::_integrateDepthMaps() {
//cudaSafeCall(cudaDeviceSynchronize()); //cudaSafeCall(cudaDeviceSynchronize());
for (size_t i=0; i<cameras_.size(); ++i) { for (size_t i=0; i<cameras_.size(); ++i) {
if (!cameras_[i].source->isReady()) continue;
//ftl::cuda::clear_depth(*(cameras_[i].gpu.depth2_tex_), integ_stream_); //ftl::cuda::clear_depth(*(cameras_[i].gpu.depth2_tex_), integ_stream_);
ftl::cuda::clear_points(*(cameras_[i].gpu.points_tex_), integ_stream_); ftl::cuda::clear_points(*(cameras_[i].gpu.points_tex_), integ_stream_);
ftl::cuda::mls_smooth(*(cameras_[i].gpu.points_tex_), m_hashParams, cameras_.size(), i, integ_stream_); ftl::cuda::mls_smooth(*(cameras_[i].gpu.points_tex_), m_hashParams, cameras_.size(), i, integ_stream_);
......
...@@ -7,7 +7,8 @@ ...@@ -7,7 +7,8 @@
#define POOL_SIZE 10 #define POOL_SIZE 10
// #define DEBUG_MUTEX #define DEBUG_MUTEX
#define MUTEX_TIMEOUT 15
#if defined DEBUG_MUTEX #if defined DEBUG_MUTEX
#include <loguru.hpp> #include <loguru.hpp>
...@@ -18,8 +19,8 @@ ...@@ -18,8 +19,8 @@
#define RECURSIVE_MUTEX std::recursive_timed_mutex #define RECURSIVE_MUTEX std::recursive_timed_mutex
#define SHARED_MUTEX std::shared_timed_mutex #define SHARED_MUTEX std::shared_timed_mutex
#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock"; #define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); };
#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock"; #define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); };
#else #else
#define MUTEX std::mutex #define MUTEX std::mutex
......
...@@ -26,10 +26,10 @@ Master::Master(Configurable *root, Universe *net) ...@@ -26,10 +26,10 @@ Master::Master(Configurable *root, Universe *net)
return {json.dump()}; return {json.dump()};
}); });
net->broadcast("log_subscribe", net->id()); //net->broadcast("log_subscribe", net->id());
net->onConnect([this](ftl::net::Peer*) { net->onConnect([this](ftl::net::Peer*) {
net_->broadcast("log_subscribe", net_->id()); //net_->broadcast("log_subscribe", net_->id());
}); });
} }
......
...@@ -300,8 +300,14 @@ R Peer::call(const std::string &name, ARGS... args) { ...@@ -300,8 +300,14 @@ R Peer::call(const std::string &name, ARGS... args) {
cv.notify_one(); cv.notify_one();
}, std::forward<ARGS>(args)...); }, std::forward<ARGS>(args)...);
// While waiting, do some other thread jobs...
std::function<void(int)> j;
while (!hasreturned && (bool)(j=ftl::pool.pop())) {
LOG(INFO) << "Doing job whilst waiting...";
j(-1);
}
{ // Block thread until async callback notifies us { // Block thread until async callback notifies us
//UNIQUE_LOCK(m,lk);
std::unique_lock<std::mutex> lk(m); std::unique_lock<std::mutex> lk(m);
cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;});
} }
......
...@@ -43,6 +43,8 @@ using ftl::net::Universe; ...@@ -43,6 +43,8 @@ using ftl::net::Universe;
using ftl::net::callback_t; using ftl::net::callback_t;
using std::vector; using std::vector;
#define TCP_BUFFER_SIZE (1024*1024*10)
/*static std::string hexStr(const std::string &s) /*static std::string hexStr(const std::string &s)
{ {
const char *data = s.data(); const char *data = s.data();
...@@ -82,8 +84,16 @@ static SOCKET tcpConnect(URI &uri) { ...@@ -82,8 +84,16 @@ static SOCKET tcpConnect(URI &uri) {
return INVALID_SOCKET; return INVALID_SOCKET;
} }
//int flags =1; int flags =1;
//if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_BUFFER_SIZE;
if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
addrinfo hints = {}, *addrs; addrinfo hints = {}, *addrs;
hints.ai_family = AF_INET; hints.ai_family = AF_INET;
...@@ -166,8 +176,15 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals ...@@ -166,8 +176,15 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals
is_waiting_ = true; is_waiting_ = true;
scheme_ = ftl::URI::SCHEME_TCP; scheme_ = ftl::URI::SCHEME_TCP;
//int flags =1; int flags =1;
//if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; };
int a = TCP_BUFFER_SIZE;
if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) {
fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
}
// Send the initiating handshake if valid // Send the initiating handshake if valid
if (status_ == kConnecting) { if (status_ == kConnecting) {
...@@ -394,46 +411,52 @@ void Peer::error(int e) { ...@@ -394,46 +411,52 @@ void Peer::error(int e) {
} }
void Peer::data() { void Peer::data() {
UNIQUE_LOCK(recv_mtx_,lk); {
recv_buf_.reserve_buffer(kMaxMessage); UNIQUE_LOCK(recv_mtx_,lk);
if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { int rc=0;
LOG(WARNING) << "Net buffer at capacity"; int c=0;
return;
}
int cap = recv_buf_.buffer_capacity(); //do {
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0); recv_buf_.reserve_buffer(kMaxMessage);
if (rc >= cap) { if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) {
LOG(WARNING) << "More than buffers worth of data received"; LOG(WARNING) << "Net buffer at capacity";
} return;
if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER"; }
if (rc == 0) { int cap = recv_buf_.buffer_capacity();
close(); rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0);
return; //if (c > 0 && rc > 0) LOG(INFO) << "RECV: " << rc;
} else if (rc < 0) {
socketError(); if (rc >= cap-1) {
return; LOG(WARNING) << "More than buffers worth of data received";
} }
if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER";
recv_buf_.buffer_consumed(rc);
if (rc == 0) {
// No thread currently processing messages so start one close();
if (is_waiting_) { return;
ftl::pool.push([](int id, Peer *p) { } else if (rc < 0 && c == 0) {
p->_data(); socketError();
//p->is_waiting_ = true; return;
}, this); }
is_waiting_ = false;
//if (rc == -1) break;
++c;
recv_buf_.buffer_consumed(rc);
//} while (rc > 0);
} }
lk.unlock();
//LOG(INFO) << "Received " << rc << " bytes"; ftl::pool.push([this](int id) {
_data();
});
} }
bool Peer::_data() { bool Peer::_data() {
msgpack::object_handle msg;
UNIQUE_LOCK(recv_mtx_,lk); UNIQUE_LOCK(recv_mtx_,lk);
if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) { if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) {
...@@ -444,10 +467,18 @@ bool Peer::_data() { ...@@ -444,10 +467,18 @@ bool Peer::_data() {
ws_read_header_ = true; ws_read_header_ = true;
} }
msgpack::object_handle msg; if (!recv_buf_.next(msg)) return false;
while (recv_buf_.next(msg)) { msgpack::object obj = msg.get();
ws_read_header_ = false; lk.unlock();
msgpack::object obj = msg.get();
ftl::pool.push([this](int id) {
_data();
});
if (status_ != kConnected) {
// If not connected, must lock to make sure no other thread performs this step
UNIQUE_LOCK(recv_mtx_,lk);
// Verify still not connected after lock
if (status_ != kConnected) { if (status_ != kConnected) {
// First message must be a handshake // First message must be a handshake
try { try {
...@@ -463,32 +494,21 @@ bool Peer::_data() { ...@@ -463,32 +494,21 @@ bool Peer::_data() {
// to read next message before completion. // to read next message before completion.
// The handshake handler must not block. // The handshake handler must not block.
disp_->dispatch(*this, obj); disp_->dispatch(*this, obj);
return true;
} }
} catch(...) { } catch(...) {
_badClose(false); _badClose(false);
LOG(ERROR) << "Bad first message format"; LOG(ERROR) << "Bad first message format";
return false; return false;
} }
} else {
// CHECK Safe to unlock here?
is_waiting_ = true;
lk.unlock();
disp_->dispatch(*this, obj);
// Relock before next loop of while
lk.lock();
is_waiting_ = false;
}
if (scheme_ == ftl::URI::SCHEME_WS && recv_buf_.nonparsed_size() > 0) {
wsheader_type ws;
if (ws_parse(recv_buf_, ws) < 0) {
return false;
}
ws_read_header_ = true;
} }
} }
is_waiting_ = true; // Can start another thread...
return false; disp_->dispatch(*this, obj);
// Lock again before freeing msg handle
UNIQUE_LOCK(recv_mtx_,lk2);
return true;
} }
void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
......
...@@ -20,7 +20,7 @@ struct FrameSet { ...@@ -20,7 +20,7 @@ struct FrameSet {
unsigned int mask; unsigned int mask;
}; };
static const size_t kFrameBufferSize = 10; static const size_t kFrameBufferSize = 20;
class Group { class Group {
public: public:
......
...@@ -269,14 +269,14 @@ void NetSource::_updateURI() { ...@@ -269,14 +269,14 @@ void NetSource::_updateURI() {
bool NetSource::grab(int n, int b) { bool NetSource::grab(int n, int b) {
// Choose highest requested number of frames // Choose highest requested number of frames
maxN_ = std::max(maxN_,(n == -1) ? 10 : n); maxN_ = std::max(maxN_,(n == -1) ? ftl::rgbd::detail::kDefaultFrameCount : n);
// Choose best requested quality // Choose best requested quality
minB_ = std::min(minB_,(b == -1) ? 0 : b); minB_ = std::min(minB_,(b == -1) ? 0 : b);
// Send k frames before end to prevent unwanted pause // Send k frames before end to prevent unwanted pause
// Unless only a single frame is requested // Unless only a single frame is requested
if ((N_ <= 2 && maxN_ > 1) || N_ == 0) { if ((N_ <= maxN_/2 && maxN_ > 1) || N_ == 0) {
const ftl::rgbd::channel_t chan = host_->getChannel(); const ftl::rgbd::channel_t chan = host_->getChannel();
N_ = maxN_; N_ = maxN_;
......
...@@ -11,6 +11,8 @@ namespace ftl { ...@@ -11,6 +11,8 @@ namespace ftl {
namespace rgbd { namespace rgbd {
namespace detail { namespace detail {
static const int kDefaultFrameCount = 30;
/** /**
* RGBD source from either a stereo video file with left + right images, or * RGBD source from either a stereo video file with left + right images, or
* direct from two camera devices. A variety of algorithms are included for * direct from two camera devices. A variety of algorithms are included for
......
...@@ -389,7 +389,8 @@ void Streamer::_schedule() { ...@@ -389,7 +389,8 @@ void Streamer::_schedule() {
// Create jobs for each chunk // Create jobs for each chunk
for (int i=0; i<kChunkCount; ++i) { for (int i=0; i<kChunkCount; ++i) {
// Add chunk job to thread pool // Add chunk job to thread pool
ftl::pool.push([this,src](int id, int chunk) { ftl::pool.push([this,src,i](int id) {
int chunk = i;
try { try {
if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) { if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) {
_encodeAndTransmit(src, chunk); _encodeAndTransmit(src, chunk);
...@@ -403,7 +404,7 @@ void Streamer::_schedule() { ...@@ -403,7 +404,7 @@ void Streamer::_schedule() {
std::unique_lock<std::mutex> lk(job_mtx_); std::unique_lock<std::mutex> lk(job_mtx_);
--jobs_; --jobs_;
job_cv_.notify_one(); job_cv_.notify_one();
}, i); });
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment