Skip to content
Snippets Groups Projects
Commit be9fd540 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fix for unwanted mutex lock and switch to PNG for depth compression (for now)

parent d228e1f2
No related branches found
No related tags found
No related merge requests found
Pipeline #9844 passed
...@@ -289,6 +289,7 @@ int Peer::asyncCall( ...@@ -289,6 +289,7 @@ int Peer::asyncCall(
LOG(INFO) << "RPC " << name << "() -> " << uri_; LOG(INFO) << "RPC " << name << "() -> " << uri_;
std::unique_lock<std::mutex> lk(send_mtx_);
msgpack::pack(send_buf_, call_obj); msgpack::pack(send_buf_, call_obj);
// Register the CB // Register the CB
......
...@@ -129,7 +129,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const ...@@ -129,7 +129,7 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const
auto &&name = std::get<1>(the_call); auto &&name = std::get<1>(the_call);
auto &&args = std::get<2>(the_call); auto &&args = std::get<2>(the_call);
//LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); // LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI();
auto binding = _locateHandler(name); auto binding = _locateHandler(name);
......
...@@ -122,9 +122,9 @@ static int tcpConnect(URI &uri) { ...@@ -122,9 +122,9 @@ static int tcpConnect(URI &uri) {
} }
// Make blocking again // Make blocking again
/*rg = fcntl(csocket, F_GETFL, NULL)); /*long arg = fcntl(csocket, F_GETFL, NULL);
arg &= (~O_NONBLOCK); arg &= (~O_NONBLOCK);
fcntl(csocket, F_SETFL, arg) < 0)*/ fcntl(csocket, F_SETFL, arg);*/
return csocket; return csocket;
} }
...@@ -153,8 +153,6 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { ...@@ -153,8 +153,6 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) {
_trigger(open_handlers_); _trigger(open_handlers_);
} }
}); });
//ftl::UUID uuid;
send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer);
} }
...@@ -171,14 +169,6 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -171,14 +169,6 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
scheme_ = uri.getProtocol(); scheme_ = uri.getProtocol();
if (uri.getProtocol() == URI::SCHEME_TCP) { if (uri.getProtocol() == URI::SCHEME_TCP) {
sock_ = tcpConnect(uri); sock_ = tcpConnect(uri);
#ifdef WIN32
u_long on = 1;
ioctlsocket(sock_, FIONBIO, &on);
#else
fcntl(sock_, F_SETFL, O_NONBLOCK);
#endif
status_ = kConnecting; status_ = kConnecting;
} else if (uri.getProtocol() == URI::SCHEME_WS) { } else if (uri.getProtocol() == URI::SCHEME_WS) {
LOG(INFO) << "Websocket connect " << uri.getPath(); LOG(INFO) << "Websocket connect " << uri.getPath();
...@@ -191,13 +181,6 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { ...@@ -191,13 +181,6 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) {
} else { } else {
LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort();
} }
#ifdef WIN32
u_long on = 1;
ioctlsocket(sock_, FIONBIO, &on);
#else
fcntl(sock_, F_SETFL, O_NONBLOCK);
#endif
status_ = kConnecting; status_ = kConnecting;
} else { } else {
...@@ -315,7 +298,7 @@ void Peer::data() { ...@@ -315,7 +298,7 @@ void Peer::data() {
} }
bool Peer::_data() { bool Peer::_data() {
//std::unique_lock<std::mutex> lk(recv_mtx_); // std::unique_lock<std::mutex> lk(recv_mtx_);
recv_buf_.reserve_buffer(kMaxMessage); recv_buf_.reserve_buffer(kMaxMessage);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
...@@ -483,6 +466,7 @@ void Peer::cancelCall(int id) { ...@@ -483,6 +466,7 @@ void Peer::cancelCall(int id) {
void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) {
Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res);
std::unique_lock<std::mutex> lk(send_mtx_);
msgpack::pack(send_buf_, res_obj); msgpack::pack(send_buf_, res_obj);
_send(); _send();
} }
...@@ -535,6 +519,13 @@ int Peer::_send() { ...@@ -535,6 +519,13 @@ int Peer::_send() {
int c = ftl::net::internal::writev(sock_, send_buf_.vector(), send_buf_.vector_size()); int c = ftl::net::internal::writev(sock_, send_buf_.vector(), send_buf_.vector_size());
#endif #endif
send_buf_.clear(); send_buf_.clear();
// We are blocking, so -1 should mean actual error
if (c == -1) {
socketError();
close();
}
return c; return c;
} }
......
...@@ -131,9 +131,9 @@ static void run(const string &file) { ...@@ -131,9 +131,9 @@ static void run(const string &file) {
cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb); cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb);
//LOG(INFO) << "Received JPG : " << rgb.cols; //LOG(INFO) << "Received JPG : " << rgb.cols;
depth = Mat(rgb.size(), CV_32FC1); //depth = Mat(rgb.size(), CV_16UC1);
z_stream infstream; /*z_stream infstream;
infstream.zalloc = Z_NULL; infstream.zalloc = Z_NULL;
infstream.zfree = Z_NULL; infstream.zfree = Z_NULL;
infstream.opaque = Z_NULL; infstream.opaque = Z_NULL;
...@@ -146,7 +146,9 @@ static void run(const string &file) { ...@@ -146,7 +146,9 @@ static void run(const string &file) {
// the actual DE-compression work. // the actual DE-compression work.
inflateInit(&infstream); inflateInit(&infstream);
inflate(&infstream, Z_NO_FLUSH); inflate(&infstream, Z_NO_FLUSH);
inflateEnd(&infstream); inflateEnd(&infstream);*/
cv::imdecode(d, cv::IMREAD_GRAYSCALE, &depth);
//depth.convertTo(depth, CV_32FC1, 1.0f/(256.0f*16.0f));
}); });
while (disp.active()) { while (disp.active()) {
...@@ -156,7 +158,10 @@ static void run(const string &file) { ...@@ -156,7 +158,10 @@ static void run(const string &file) {
if (depth.cols > 0) { if (depth.cols > 0) {
// If no calibration data then get it from the remote machine // If no calibration data then get it from the remote machine
if (Q.rows == 0) { if (Q.rows == 0) {
// Must unlock before calling findOne to prevent net block!!
lk.unlock();
auto buf = net.findOne<vector<unsigned char>>((string)config["source"]+"/calibration"); auto buf = net.findOne<vector<unsigned char>>((string)config["source"]+"/calibration");
lk.lock();
if (buf) { if (buf) {
Q = Mat(cv::Size(4,4), CV_32F); Q = Mat(cv::Size(4,4), CV_32F);
memcpy(Q.data, (*buf).data(), (*buf).size()); memcpy(Q.data, (*buf).data(), (*buf).size());
......
...@@ -47,6 +47,8 @@ using std::string; ...@@ -47,6 +47,8 @@ using std::string;
using std::vector; using std::vector;
using std::map; using std::map;
using std::condition_variable; using std::condition_variable;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
using std::mutex; using std::mutex;
using std::unique_lock; using std::unique_lock;
using cv::Mat; using cv::Mat;
...@@ -152,7 +154,7 @@ static void run(const string &file) { ...@@ -152,7 +154,7 @@ static void run(const string &file) {
calibrate.getQ().convertTo(Q_32F, CV_32F); calibrate.getQ().convertTo(Q_32F, CV_32F);
// Allow remote users to access camera calibration matrix // Allow remote users to access camera calibration matrix
net.bind(string("ftl://utu.fi/")+(string)config["stream"]["name"]+string("/rgb-d/calibration"), [&calibrate,Q_32F]() -> vector<unsigned char> { net.bind(string("ftl://utu.fi/")+(string)config["stream"]["name"]+string("/rgb-d/calibration"), [Q_32F]() -> vector<unsigned char> {
vector<unsigned char> buf; vector<unsigned char> buf;
buf.resize(Q_32F.step*Q_32F.rows); buf.resize(Q_32F.step*Q_32F.rows);
LOG(INFO) << "Calib buf size = " << buf.size(); LOG(INFO) << "Calib buf size = " << buf.size();
...@@ -222,6 +224,8 @@ static void run(const string &file) { ...@@ -222,6 +224,8 @@ static void run(const string &file) {
unique_lock<mutex> lk(m); unique_lock<mutex> lk(m);
cv.wait(lk, [&jobs]{return jobs == 2;}); cv.wait(lk, [&jobs]{return jobs == 2;});
//sleep_for(milliseconds(40));
l.copyTo(pl); l.copyTo(pl);
disp.copyTo(pdisp); disp.copyTo(pdisp);
......
...@@ -25,7 +25,7 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { ...@@ -25,7 +25,7 @@ void Streamer::send(const Mat &rgb, const Mat &depth) {
cv::imencode(".jpg", rgb, rgb_buf); cv::imencode(".jpg", rgb, rgb_buf);
vector<unsigned char> d_buf; vector<unsigned char> d_buf;
d_buf.resize(depth.step*depth.rows); /*d_buf.resize(depth.step*depth.rows);
z_stream defstream; z_stream defstream;
defstream.zalloc = Z_NULL; defstream.zalloc = Z_NULL;
defstream.zfree = Z_NULL; defstream.zfree = Z_NULL;
...@@ -39,8 +39,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { ...@@ -39,8 +39,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) {
deflate(&defstream, Z_FINISH); deflate(&defstream, Z_FINISH);
deflateEnd(&defstream); deflateEnd(&defstream);
d_buf.resize(defstream.total_out); d_buf.resize(defstream.total_out);*/
//LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f)); //Mat d2;
//depth.convertTo(d2, CV_16UC1, 256.0f*16.0f);
cv::imencode(".png", depth, d_buf);
LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f));
net_.publish(uri_, rgb_buf, d_buf); net_.publish(uri_, rgb_buf, d_buf);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment