Skip to content
Snippets Groups Projects
Commit 919adaea authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fix for blocking connect and cb bug

parent bf6dfb95
No related branches found
No related tags found
No related merge requests found
Pipeline #11287 passed
......@@ -242,11 +242,11 @@ class Peer {
// Receive buffers
bool is_waiting_;
msgpack::unpacker recv_buf_;
std::mutex recv_mtx_;
std::recursive_mutex recv_mtx_;
// Send buffers
msgpack::vrefbuffer send_buf_;
std::mutex send_mtx_;
std::recursive_mutex send_mtx_;
std::string uri_; // Original connection URI, or assumed URI
ftl::UUID peerid_; // Received in handshake or allocated
......@@ -264,7 +264,7 @@ class Peer {
template <typename... ARGS>
int Peer::send(const std::string &s, ARGS... args) {
std::unique_lock<std::mutex> lk(send_mtx_);
std::unique_lock<std::recursive_mutex> lk(send_mtx_);
// Leave a blank entry for websocket header
if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
auto args_obj = std::make_tuple(args...);
......@@ -315,17 +315,21 @@ int Peer::asyncCall(
std::function<void(const T&)> cb,
ARGS... args) {
auto args_obj = std::make_tuple(args...);
auto rpcid = rpcid__++;
auto call_obj = std::make_tuple(0,rpcid,name,args_obj);
auto rpcid = 0;
LOG(INFO) << "RPC " << name << "() -> " << uri_;
DLOG(1) << "RPC " << name << "() -> " << uri_;
{
std::unique_lock<std::recursive_mutex> lk(recv_mtx_);
// Register the CB
rpcid = rpcid__++;
callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
}
auto call_obj = std::make_tuple(0,rpcid,name,args_obj);
std::unique_lock<std::mutex> lk(send_mtx_);
std::unique_lock<std::recursive_mutex> lk(send_mtx_);
msgpack::pack(send_buf_, call_obj);
// Register the CB
callbacks_[rpcid] = std::make_unique<caller<T>>(cb);
_send();
return rpcid;
}
......
......@@ -41,6 +41,7 @@ using std::chrono::seconds;
using ftl::net::Universe;
using ftl::net::callback_t;
using std::mutex;
using std::recursive_mutex;
using std::unique_lock;
/*static std::string hexStr(const std::string &s)
......@@ -100,17 +101,36 @@ static SOCKET tcpConnect(URI &uri) {
}
// Make nonblocking
/*long arg = fcntl(csocket, F_GETFL, NULL));
#ifndef WIN32
long arg = fcntl(csocket, F_GETFL, NULL);
arg |= O_NONBLOCK;
fcntl(csocket, F_SETFL, arg) < 0)*/
fcntl(csocket, F_SETFL, arg);
#endif
// TODO(Nick) - Check all returned addresses.
auto addr = addrs;
rc = ::connect(csocket, addr->ai_addr, (socklen_t)addr->ai_addrlen);
if (rc < 0) {
if (errno == EINPROGRESS) {
fd_set myset;
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
FD_ZERO(&myset);
FD_SET(csocket, &myset);
rc = select(csocket+1, NULL, &myset, NULL, &tv);
if (rc <= 0) { //} && errno != EINTR) {
#ifndef WIN32
close(csocket);
#else
closesocket(csocket);
#endif
LOG(ERROR) << "Could not connect to " << uri.getBaseURI();
return INVALID_SOCKET;
}
} else {
#ifndef WIN32
close(csocket);
......@@ -125,9 +145,11 @@ static SOCKET tcpConnect(URI &uri) {
}
// Make blocking again
/*long arg = fcntl(csocket, F_GETFL, NULL);
#ifndef WIN32
arg = fcntl(csocket, F_GETFL, NULL);
arg &= (~O_NONBLOCK);
fcntl(csocket, F_SETFL, arg);*/
fcntl(csocket, F_SETFL, arg);
#endif
return csocket;
}
......@@ -183,7 +205,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true),
disp_ = new Dispatcher(d);
// Must to to prevent receiving message before handlers are installed
unique_lock<mutex> lk(recv_mtx_);
unique_lock<recursive_mutex> lk(recv_mtx_);
scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) {
......@@ -353,7 +375,7 @@ void Peer::data() {
}
bool Peer::_data() {
std::unique_lock<std::mutex> lk(recv_mtx_);
std::unique_lock<std::recursive_mutex> lk(recv_mtx_);
recv_buf_.reserve_buffer(kMaxMessage);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
......@@ -393,7 +415,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) {
// TODO Handle error reporting...
if (callbacks_.count(id) > 0) {
LOG(INFO) << "Received return RPC value";
DLOG(1) << "Received return RPC value";
// Call the callback with unpacked return value
(*callbacks_[id])(res);
......@@ -411,7 +433,7 @@ void Peer::cancelCall(int id) {
void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res);
std::unique_lock<std::mutex> lk(send_mtx_);
std::unique_lock<std::recursive_mutex> lk(send_mtx_);
msgpack::pack(send_buf_, res_obj);
_send();
}
......@@ -495,8 +517,8 @@ int Peer::_send() {
}
Peer::~Peer() {
std::unique_lock<std::mutex> lk1(send_mtx_);
std::unique_lock<std::mutex> lk2(recv_mtx_);
std::unique_lock<std::recursive_mutex> lk1(send_mtx_);
std::unique_lock<std::recursive_mutex> lk2(recv_mtx_);
_badClose(false);
LOG(INFO) << "Deleting peer object";
......
......@@ -14,7 +14,7 @@ using std::mutex;
using std::unique_lock;
StereoVideoSource::StereoVideoSource(ftl::rgbd::Source *host)
: ftl::rgbd::detail::Source(host) {
: ftl::rgbd::detail::Source(host), ready_(false) {
init("");
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment