diff --git a/applications/groupview/src/main.cpp b/applications/groupview/src/main.cpp index 564d87d8b5666d07af9f6a54f1e0d778bed73c7b..59a5bb662c1ee2b14f44bd35b549174aedd5e4d5 100644 --- a/applications/groupview/src/main.cpp +++ b/applications/groupview/src/main.cpp @@ -14,7 +14,10 @@ int main(int argc, char **argv) { auto sources = ftl::createArray<ftl::rgbd::Source>(root, "sources", net); ftl::rgbd::Group group; - for (auto s : sources) group.addSource(s); + for (auto s : sources) { + s->setChannel(ftl::rgbd::kChanRight); + group.addSource(s); + } group.sync([](const ftl::rgbd::FrameSet &fs) { LOG(INFO) << "Complete set: " << fs.timestamp; @@ -23,7 +26,7 @@ int main(int argc, char **argv) { while (ftl::running) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); - for (auto s : sources) s->grab(); + for (auto s : sources) s->grab(30); } return 0; diff --git a/applications/reconstruct/src/voxel_scene.cpp b/applications/reconstruct/src/voxel_scene.cpp index e82da537bbf2df6789467f561d24492f04556ca3..71086b52208f2a259b6a6eba29cd89c5ce8b71ae 100644 --- a/applications/reconstruct/src/voxel_scene.cpp +++ b/applications/reconstruct/src/voxel_scene.cpp @@ -366,6 +366,7 @@ void SceneRep::_integrateDepthMaps() { //cudaSafeCall(cudaDeviceSynchronize()); for (size_t i=0; i<cameras_.size(); ++i) { + if (!cameras_[i].source->isReady()) continue; //ftl::cuda::clear_depth(*(cameras_[i].gpu.depth2_tex_), integ_stream_); ftl::cuda::clear_points(*(cameras_[i].gpu.points_tex_), integ_stream_); ftl::cuda::mls_smooth(*(cameras_[i].gpu.points_tex_), m_hashParams, cameras_.size(), i, integ_stream_); diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp index 5d86ce6077be327b24db981412435ba5b29687e1..5de4f5f75b6ff8e7012bd45b5f2aca8a41bc7b63 100644 --- a/components/common/cpp/include/ftl/threads.hpp +++ b/components/common/cpp/include/ftl/threads.hpp @@ -7,7 +7,8 @@ #define POOL_SIZE 10 -// #define DEBUG_MUTEX +#define DEBUG_MUTEX +#define MUTEX_TIMEOUT 15 #if defined DEBUG_MUTEX #include <loguru.hpp> @@ -18,8 +19,8 @@ #define RECURSIVE_MUTEX std::recursive_timed_mutex #define SHARED_MUTEX std::shared_timed_mutex -#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock"; -#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::seconds(5)); if (!L) LOG(FATAL) << "Mutex deadlock"; +#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); }; +#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M, std::chrono::milliseconds(MUTEX_TIMEOUT)); while (!L) { LOG(ERROR) << "Mutex timeout"; L.try_lock_for(std::chrono::milliseconds(MUTEX_TIMEOUT)); }; #else #define MUTEX std::mutex diff --git a/components/control/cpp/src/master.cpp b/components/control/cpp/src/master.cpp index 80f9d841a3d795bb8259b2512f1e1b2f75c9a939..a1248fac771dcd58a4654244296f4f0c608a5a34 100644 --- a/components/control/cpp/src/master.cpp +++ b/components/control/cpp/src/master.cpp @@ -26,10 +26,10 @@ Master::Master(Configurable *root, Universe *net) return {json.dump()}; }); - net->broadcast("log_subscribe", net->id()); + //net->broadcast("log_subscribe", net->id()); net->onConnect([this](ftl::net::Peer*) { - net_->broadcast("log_subscribe", net_->id()); + //net_->broadcast("log_subscribe", net_->id()); }); } diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 14c021861b77821119e1d206a0560b6bafeed8c2..438fc171425b1f1b2cab12317d05eed2f95d8372 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -300,8 +300,14 @@ R Peer::call(const std::string &name, ARGS... args) { cv.notify_one(); }, std::forward<ARGS>(args)...); + // While waiting, do some other thread jobs... + std::function<void(int)> j; + while (!hasreturned && (bool)(j=ftl::pool.pop())) { + LOG(INFO) << "Doing job whilst waiting..."; + j(-1); + } + { // Block thread until async callback notifies us - //UNIQUE_LOCK(m,lk); std::unique_lock<std::mutex> lk(m); cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); } diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index ab627b720fc33c36feaa8bdef461c2913b26d58f..8adbeae8c42ed7b3bbd3215a561ae45aacffebf3 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -43,6 +43,8 @@ using ftl::net::Universe; using ftl::net::callback_t; using std::vector; +#define TCP_BUFFER_SIZE (1024*1024*10) + /*static std::string hexStr(const std::string &s) { const char *data = s.data(); @@ -82,8 +84,16 @@ static SOCKET tcpConnect(URI &uri) { return INVALID_SOCKET; } - //int flags =1; - //if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; + int flags =1; + if (setsockopt(csocket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; + + int a = TCP_BUFFER_SIZE; + if (setsockopt(csocket, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { + fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); + } + if (setsockopt(csocket, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { + fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); + } addrinfo hints = {}, *addrs; hints.ai_family = AF_INET; @@ -166,8 +176,15 @@ Peer::Peer(SOCKET s, Universe *u, Dispatcher *d) : sock_(s), can_reconnect_(fals is_waiting_ = true; scheme_ = ftl::URI::SCHEME_TCP; - //int flags =1; - //if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; + int flags =1; + if (setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (const char *)&flags, sizeof(flags))) { LOG(ERROR) << "ERROR: setsocketopt(), TCP_NODELAY"; }; + int a = TCP_BUFFER_SIZE; + if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const char *)&a, sizeof(int)) == -1) { + fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); + } + if (setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&a, sizeof(int)) == -1) { + fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno)); + } // Send the initiating handshake if valid if (status_ == kConnecting) { @@ -394,46 +411,52 @@ void Peer::error(int e) { } void Peer::data() { - UNIQUE_LOCK(recv_mtx_,lk); - recv_buf_.reserve_buffer(kMaxMessage); + { + UNIQUE_LOCK(recv_mtx_,lk); - if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { - LOG(WARNING) << "Net buffer at capacity"; - return; - } + int rc=0; + int c=0; - int cap = recv_buf_.buffer_capacity(); - int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0); + //do { + recv_buf_.reserve_buffer(kMaxMessage); - if (rc >= cap) { - LOG(WARNING) << "More than buffers worth of data received"; - } - if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER"; + if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { + LOG(WARNING) << "Net buffer at capacity"; + return; + } - if (rc == 0) { - close(); - return; - } else if (rc < 0) { - socketError(); - return; - } - - recv_buf_.buffer_consumed(rc); - - // No thread currently processing messages so start one - if (is_waiting_) { - ftl::pool.push([](int id, Peer *p) { - p->_data(); - //p->is_waiting_ = true; - }, this); - is_waiting_ = false; + int cap = recv_buf_.buffer_capacity(); + rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0); + //if (c > 0 && rc > 0) LOG(INFO) << "RECV: " << rc; + + if (rc >= cap-1) { + LOG(WARNING) << "More than buffers worth of data received"; + } + if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER"; + + if (rc == 0) { + close(); + return; + } else if (rc < 0 && c == 0) { + socketError(); + return; + } + + //if (rc == -1) break; + ++c; + + recv_buf_.buffer_consumed(rc); + //} while (rc > 0); } - lk.unlock(); - //LOG(INFO) << "Received " << rc << " bytes"; + ftl::pool.push([this](int id) { + _data(); + }); } bool Peer::_data() { + msgpack::object_handle msg; + UNIQUE_LOCK(recv_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) { @@ -444,10 +467,18 @@ bool Peer::_data() { ws_read_header_ = true; } - msgpack::object_handle msg; - while (recv_buf_.next(msg)) { - ws_read_header_ = false; - msgpack::object obj = msg.get(); + if (!recv_buf_.next(msg)) return false; + msgpack::object obj = msg.get(); + lk.unlock(); + + ftl::pool.push([this](int id) { + _data(); + }); + + if (status_ != kConnected) { + // If not connected, must lock to make sure no other thread performs this step + UNIQUE_LOCK(recv_mtx_,lk); + // Verify still not connected after lock if (status_ != kConnected) { // First message must be a handshake try { @@ -463,32 +494,21 @@ bool Peer::_data() { // to read next message before completion. // The handshake handler must not block. disp_->dispatch(*this, obj); + return true; } } catch(...) { _badClose(false); LOG(ERROR) << "Bad first message format"; return false; } - } else { - // CHECK Safe to unlock here? - is_waiting_ = true; - lk.unlock(); - disp_->dispatch(*this, obj); - // Relock before next loop of while - lk.lock(); - is_waiting_ = false; - } - - if (scheme_ == ftl::URI::SCHEME_WS && recv_buf_.nonparsed_size() > 0) { - wsheader_type ws; - if (ws_parse(recv_buf_, ws) < 0) { - return false; - } - ws_read_header_ = true; } } - is_waiting_ = true; // Can start another thread... - return false; + + disp_->dispatch(*this, obj); + + // Lock again before freeing msg handle + UNIQUE_LOCK(recv_mtx_,lk2); + return true; } void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { diff --git a/components/rgbd-sources/include/ftl/rgbd/group.hpp b/components/rgbd-sources/include/ftl/rgbd/group.hpp index c2454a0276a4006dfc95d609821cf068be133f76..1fc92a772e4edb7ce21590429aa08c7f602ed2bd 100644 --- a/components/rgbd-sources/include/ftl/rgbd/group.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/group.hpp @@ -20,7 +20,7 @@ struct FrameSet { unsigned int mask; }; -static const size_t kFrameBufferSize = 10; +static const size_t kFrameBufferSize = 20; class Group { public: diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 388b233cd3e90be2a6be3847128f892ef5cb0b65..5e4acd30f12a55c2c41bd0ea7952e978a5551bd3 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -282,14 +282,14 @@ void NetSource::_updateURI() { bool NetSource::grab(int n, int b) { // Choose highest requested number of frames - maxN_ = std::max(maxN_,(n == -1) ? 10 : n); + maxN_ = std::max(maxN_,(n == -1) ? ftl::rgbd::detail::kDefaultFrameCount : n); // Choose best requested quality minB_ = std::min(minB_,(b == -1) ? 0 : b); // Send k frames before end to prevent unwanted pause // Unless only a single frame is requested - if ((N_ <= 2 && maxN_ > 1) || N_ == 0) { + if ((N_ <= maxN_/2 && maxN_ > 1) || N_ == 0) { const ftl::rgbd::channel_t chan = host_->getChannel(); N_ = maxN_; diff --git a/components/rgbd-sources/src/net.hpp b/components/rgbd-sources/src/net.hpp index 12186e82317c130c557f36a7ab3a9cc7e8e83dfe..171630040268e8a3c42446c982f67b3e03d4395d 100644 --- a/components/rgbd-sources/src/net.hpp +++ b/components/rgbd-sources/src/net.hpp @@ -11,6 +11,8 @@ namespace ftl { namespace rgbd { namespace detail { +static const int kDefaultFrameCount = 30; + /** * RGBD source from either a stereo video file with left + right images, or * direct from two camera devices. A variety of algorithms are included for diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index ec1a9610a0511acdc3669fdcc6e4eecff00509cf..ceaface6e71af17561f57fe9e13b62f6841eba7c 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -390,7 +390,8 @@ void Streamer::_schedule() { // Create jobs for each chunk for (int i=0; i<kChunkCount; ++i) { // Add chunk job to thread pool - ftl::pool.push([this,src](int id, int chunk) { + ftl::pool.push([this,src,i](int id) { + int chunk = i; try { if (!src->rgb.empty() && (src->src->getChannel() == ftl::rgbd::kChanNone || !src->depth.empty())) { _encodeAndTransmit(src, chunk); @@ -404,7 +405,7 @@ void Streamer::_schedule() { std::unique_lock<std::mutex> lk(job_mtx_); --jobs_; job_cv_.notify_one(); - }, i); + }); } } }