diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index cd0fe67d027a2803d450c842b49e23de8069dad0..fb789a46b65b9f892f093b9131f4f066b903d55e 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -289,6 +289,7 @@ int Peer::asyncCall( LOG(INFO) << "RPC " << name << "() -> " << uri_; + std::unique_lock<std::mutex> lk(send_mtx_); msgpack::pack(send_buf_, call_obj); // Register the CB diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp index 34861a04fcd08044f8f2b579f739911bbf723dbd..269db7a6d38de8be0ac8fe25d82b222db63f0c04 100644 --- a/net/cpp/src/dispatcher.cpp +++ b/net/cpp/src/dispatcher.cpp @@ -129,7 +129,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const auto &&name = std::get<1>(the_call); auto &&args = std::get<2>(the_call); - //LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); + // LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); auto binding = _locateHandler(name); diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index a782bc67c1c249767e06c1797f2e952f1f3aca18..5c29ace9a708fcee6737b38976afc85b444ffd43 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -122,9 +122,9 @@ static int tcpConnect(URI &uri) { } // Make blocking again - /*rg = fcntl(csocket, F_GETFL, NULL)); + /*long arg = fcntl(csocket, F_GETFL, NULL); arg &= (~O_NONBLOCK); - fcntl(csocket, F_SETFL, arg) < 0)*/ + fcntl(csocket, F_SETFL, arg);*/ return csocket; } @@ -153,8 +153,6 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { _trigger(open_handlers_); } }); - - //ftl::UUID uuid; send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); } @@ -171,14 +169,6 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { sock_ = tcpConnect(uri); - -#ifdef WIN32 - u_long on = 1; - ioctlsocket(sock_, FIONBIO, &on); -#else - fcntl(sock_, F_SETFL, O_NONBLOCK); -#endif - status_ = kConnecting; } else if (uri.getProtocol() == URI::SCHEME_WS) { LOG(INFO) << "Websocket connect " << uri.getPath(); @@ -191,13 +181,6 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { } else { LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); } - -#ifdef WIN32 - u_long on = 1; - ioctlsocket(sock_, FIONBIO, &on); -#else - fcntl(sock_, F_SETFL, O_NONBLOCK); -#endif status_ = kConnecting; } else { @@ -315,7 +298,7 @@ void Peer::data() { } bool Peer::_data() { - //std::unique_lock<std::mutex> lk(recv_mtx_); + // std::unique_lock<std::mutex> lk(recv_mtx_); recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); @@ -483,6 +466,7 @@ void Peer::cancelCall(int id) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); + std::unique_lock<std::mutex> lk(send_mtx_); msgpack::pack(send_buf_, res_obj); _send(); } @@ -535,6 +519,13 @@ int Peer::_send() { int c = ftl::net::internal::writev(sock_, send_buf_.vector(), send_buf_.vector_size()); #endif send_buf_.clear(); + + // We are blocking, so -1 should mean actual error + if (c == -1) { + socketError(); + close(); + } + return c; } diff --git a/reconstruct/src/main.cpp b/reconstruct/src/main.cpp index e4385633ab87aff3380476ba3449fb4a50f52c0d..1df292d6c71f3b34646530fa1bb418267e31e937 100644 --- a/reconstruct/src/main.cpp +++ b/reconstruct/src/main.cpp @@ -131,9 +131,9 @@ static void run(const string &file) { cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb); //LOG(INFO) << "Received JPG : " << rgb.cols; - depth = Mat(rgb.size(), CV_32FC1); + //depth = Mat(rgb.size(), CV_16UC1); - z_stream infstream; + /*z_stream infstream; infstream.zalloc = Z_NULL; infstream.zfree = Z_NULL; infstream.opaque = Z_NULL; @@ -146,7 +146,9 @@ static void run(const string &file) { // the actual DE-compression work. inflateInit(&infstream); inflate(&infstream, Z_NO_FLUSH); - inflateEnd(&infstream); + inflateEnd(&infstream);*/ + cv::imdecode(d, cv::IMREAD_GRAYSCALE, &depth); + //depth.convertTo(depth, CV_32FC1, 1.0f/(256.0f*16.0f)); }); while (disp.active()) { @@ -156,7 +158,10 @@ static void run(const string &file) { if (depth.cols > 0) { // If no calibration data then get it from the remote machine if (Q.rows == 0) { + // Must unlock before calling findOne to prevent net block!! + lk.unlock(); auto buf = net.findOne<vector<unsigned char>>((string)config["source"]+"/calibration"); + lk.lock(); if (buf) { Q = Mat(cv::Size(4,4), CV_32F); memcpy(Q.data, (*buf).data(), (*buf).size()); diff --git a/vision/src/main.cpp b/vision/src/main.cpp index 86909a38b5bc80596c5428dda74de077a042b6a6..2d86556161176bcf6a97cfc4c4031c2e33960434 100644 --- a/vision/src/main.cpp +++ b/vision/src/main.cpp @@ -47,6 +47,8 @@ using std::string; using std::vector; using std::map; using std::condition_variable; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; using std::mutex; using std::unique_lock; using cv::Mat; @@ -152,7 +154,7 @@ static void run(const string &file) { calibrate.getQ().convertTo(Q_32F, CV_32F); // Allow remote users to access camera calibration matrix - net.bind(string("ftl://utu.fi/")+(string)config["stream"]["name"]+string("/rgb-d/calibration"), [&calibrate,Q_32F]() -> vector<unsigned char> { + net.bind(string("ftl://utu.fi/")+(string)config["stream"]["name"]+string("/rgb-d/calibration"), [Q_32F]() -> vector<unsigned char> { vector<unsigned char> buf; buf.resize(Q_32F.step*Q_32F.rows); LOG(INFO) << "Calib buf size = " << buf.size(); @@ -222,6 +224,8 @@ static void run(const string &file) { unique_lock<mutex> lk(m); cv.wait(lk, [&jobs]{return jobs == 2;}); + + //sleep_for(milliseconds(40)); l.copyTo(pl); disp.copyTo(pdisp); diff --git a/vision/src/streamer.cpp b/vision/src/streamer.cpp index 6a0b04ddc664182aac5037b5eb3f8738935aeac6..43cbb220e3dd2d0f1ec15b533919c9ee7a9dc807 100644 --- a/vision/src/streamer.cpp +++ b/vision/src/streamer.cpp @@ -25,7 +25,7 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { cv::imencode(".jpg", rgb, rgb_buf); vector<unsigned char> d_buf; - d_buf.resize(depth.step*depth.rows); + /*d_buf.resize(depth.step*depth.rows); z_stream defstream; defstream.zalloc = Z_NULL; defstream.zfree = Z_NULL; @@ -39,8 +39,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { deflate(&defstream, Z_FINISH); deflateEnd(&defstream); - d_buf.resize(defstream.total_out); - //LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f)); + d_buf.resize(defstream.total_out);*/ + //Mat d2; + //depth.convertTo(d2, CV_16UC1, 256.0f*16.0f); + cv::imencode(".png", depth, d_buf); + LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f)); net_.publish(uri_, rgb_buf, d_buf); }