diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index b167cb05c4840104058555aea90451b36b20aa85..fd89e08fe4c01011e3ec442a5b5bb103ff393899 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -242,11 +242,11 @@ class Peer { // Receive buffers bool is_waiting_; msgpack::unpacker recv_buf_; - std::mutex recv_mtx_; + std::recursive_mutex recv_mtx_; // Send buffers msgpack::vrefbuffer send_buf_; - std::mutex send_mtx_; + std::recursive_mutex send_mtx_; std::string uri_; // Original connection URI, or assumed URI ftl::UUID peerid_; // Received in handshake or allocated @@ -264,7 +264,7 @@ class Peer { template <typename... ARGS> int Peer::send(const std::string &s, ARGS... args) { - std::unique_lock<std::mutex> lk(send_mtx_); + std::unique_lock<std::recursive_mutex> lk(send_mtx_); // Leave a blank entry for websocket header if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); auto args_obj = std::make_tuple(args...); @@ -315,17 +315,21 @@ int Peer::asyncCall( std::function<void(const T&)> cb, ARGS... args) { auto args_obj = std::make_tuple(args...); - auto rpcid = rpcid__++; - auto call_obj = std::make_tuple(0,rpcid,name,args_obj); + auto rpcid = 0; - LOG(INFO) << "RPC " << name << "() -> " << uri_; + DLOG(1) << "RPC " << name << "() -> " << uri_; + + { + std::unique_lock<std::recursive_mutex> lk(recv_mtx_); + // Register the CB + rpcid = rpcid__++; + callbacks_[rpcid] = std::make_unique<caller<T>>(cb); + } + + auto call_obj = std::make_tuple(0,rpcid,name,args_obj); - std::unique_lock<std::mutex> lk(send_mtx_); + std::unique_lock<std::recursive_mutex> lk(send_mtx_); msgpack::pack(send_buf_, call_obj); - - // Register the CB - callbacks_[rpcid] = std::make_unique<caller<T>>(cb); - _send(); return rpcid; } diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index af5051129032100917b46528eb5740b7e171a255..baca8fda74e19126cf2905f86e0e3a580820aa5b 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -41,6 +41,7 @@ using std::chrono::seconds; using ftl::net::Universe; using ftl::net::callback_t; using std::mutex; +using std::recursive_mutex; using std::unique_lock; /*static std::string hexStr(const std::string &s) @@ -100,17 +101,36 @@ static SOCKET tcpConnect(URI &uri) { } // Make nonblocking - /*long arg = fcntl(csocket, F_GETFL, NULL)); +#ifndef WIN32 + long arg = fcntl(csocket, F_GETFL, NULL); arg |= O_NONBLOCK; - fcntl(csocket, F_SETFL, arg) < 0)*/ - + fcntl(csocket, F_SETFL, arg); +#endif + // TODO(Nick) - Check all returned addresses. auto addr = addrs; rc = ::connect(csocket, addr->ai_addr, (socklen_t)addr->ai_addrlen); if (rc < 0) { if (errno == EINPROGRESS) { - + fd_set myset; + struct timeval tv; + tv.tv_sec = 1; + tv.tv_usec = 0; + FD_ZERO(&myset); + FD_SET(csocket, &myset); + rc = select(csocket+1, NULL, &myset, NULL, &tv); + if (rc <= 0) { //} && errno != EINTR) { + #ifndef WIN32 + close(csocket); + #else + closesocket(csocket); + #endif + + LOG(ERROR) << "Could not connect to " << uri.getBaseURI(); + + return INVALID_SOCKET; + } } else { #ifndef WIN32 close(csocket); @@ -125,9 +145,11 @@ static SOCKET tcpConnect(URI &uri) { } // Make blocking again - /*long arg = fcntl(csocket, F_GETFL, NULL); +#ifndef WIN32 + arg = fcntl(csocket, F_GETFL, NULL); arg &= (~O_NONBLOCK); - fcntl(csocket, F_SETFL, arg);*/ + fcntl(csocket, F_SETFL, arg); +#endif return csocket; } @@ -183,7 +205,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), disp_ = new Dispatcher(d); // Must to to prevent receiving message before handlers are installed - unique_lock<mutex> lk(recv_mtx_); + unique_lock<recursive_mutex> lk(recv_mtx_); scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -353,7 +375,7 @@ void Peer::data() { } bool Peer::_data() { - std::unique_lock<std::mutex> lk(recv_mtx_); + std::unique_lock<std::recursive_mutex> lk(recv_mtx_); recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); @@ -393,7 +415,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... if (callbacks_.count(id) > 0) { - LOG(INFO) << "Received return RPC value"; + DLOG(1) << "Received return RPC value"; // Call the callback with unpacked return value (*callbacks_[id])(res); @@ -411,7 +433,7 @@ void Peer::cancelCall(int id) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); - std::unique_lock<std::mutex> lk(send_mtx_); + std::unique_lock<std::recursive_mutex> lk(send_mtx_); msgpack::pack(send_buf_, res_obj); _send(); } @@ -495,8 +517,8 @@ int Peer::_send() { } Peer::~Peer() { - std::unique_lock<std::mutex> lk1(send_mtx_); - std::unique_lock<std::mutex> lk2(recv_mtx_); + std::unique_lock<std::recursive_mutex> lk1(send_mtx_); + std::unique_lock<std::recursive_mutex> lk2(recv_mtx_); _badClose(false); LOG(INFO) << "Deleting peer object"; diff --git a/components/rgbd-sources/src/stereovideo.cpp b/components/rgbd-sources/src/stereovideo.cpp index 8c3e49608c92b45c5dcf648f8c7890619e71f4c9..5180dddd7c44710198b15b2a81ab517c1d768570 100644 --- a/components/rgbd-sources/src/stereovideo.cpp +++ b/components/rgbd-sources/src/stereovideo.cpp @@ -14,7 +14,7 @@ using std::mutex; using std::unique_lock; StereoVideoSource::StereoVideoSource(ftl::rgbd::Source *host) - : ftl::rgbd::detail::Source(host) { + : ftl::rgbd::detail::Source(host), ready_(false) { init(""); }