diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index eec96a6c79fde0600dd74e760467b65cece3f6ed..ed01d6fce4711136eae1a26d44dacf890b1ce75e 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -122,6 +122,7 @@ int Universe::_setDescriptors() { SOCKET n = 0; + // TODO Shared lock for some of the time... UNIQUE_LOCK(net_mutex_,lk); //Set file descriptor for the listening sockets. @@ -324,44 +325,50 @@ void Universe::_run() { } // CHECK Could this mutex be the problem!? - UNIQUE_LOCK(net_mutex_,lk); - - //If connection request is waiting - for (auto l : listeners_) { - if (l && l->isListening()) { - if (FD_ISSET(l->_socket(), &sfdread_)) { - int rsize = sizeof(sockaddr_storage); - sockaddr_storage addr; - - //Finally accept this client connection. - SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); + { + UNIQUE_LOCK(net_mutex_,lk); - if (csock != INVALID_SOCKET) { - auto p = new Peer(csock, this, &disp_); - peers_.push_back(p); - _installBindings(p); + //If connection request is waiting + for (auto l : listeners_) { + if (l && l->isListening()) { + if (FD_ISSET(l->_socket(), &sfdread_)) { + int rsize = sizeof(sockaddr_storage); + sockaddr_storage addr; + + //Finally accept this client connection. + SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); + + if (csock != INVALID_SOCKET) { + auto p = new Peer(csock, this, &disp_); + peers_.push_back(p); + _installBindings(p); + } } } } } // TODO(Nick) Might switch to shared lock here? - - //Also check each clients socket to see if any messages or errors are waiting - for (auto s : peers_) { - if (s != NULL && s->isValid()) { - //If message received from this client then deal with it - if (FD_ISSET(s->_socket(), &sfdread_)) { - s->data(); - } - if (FD_ISSET(s->_socket(), &sfderror_)) { - s->socketError(); - s->close(); + { + SHARED_LOCK(net_mutex_, lk); + + //Also check each clients socket to see if any messages or errors are waiting + for (auto s : peers_) { + if (s != NULL && s->isValid()) { + //If message received from this client then deal with it + if (FD_ISSET(s->_socket(), &sfdread_)) { + s->data(); + } + if (FD_ISSET(s->_socket(), &sfderror_)) { + s->socketError(); + s->close(); + } } } } + // TODO(Nick) Don't always need to call this - _cleanupPeers(); + //_cleanupPeers(); } } diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index d928614b914dc8f367af5d6789fad41671a1121b..c23ec9376250b379f9314e8e3a01950fdf7a281c 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -70,11 +70,12 @@ void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned ch cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); + // Lock host to prevent grab UNIQUE_LOCK(host_->mutex(),lk); rgb_ = tmp_rgb; tmp_depth.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); N_--; - lk.unlock(); + //lk.unlock(); } void NetSource::setPose(const Eigen::Matrix4f &pose) { diff --git a/components/rgbd-sources/src/realsense_source.cpp b/components/rgbd-sources/src/realsense_source.cpp index 309d3275799573846b622bea70b1dbae69b0da60..0766adb60669ae126cd89ee0faac66a2333e29cf 100644 --- a/components/rgbd-sources/src/realsense_source.cpp +++ b/components/rgbd-sources/src/realsense_source.cpp @@ -44,15 +44,11 @@ bool RealsenseSource::grab() { rs2::depth_frame depth = frames.get_depth_frame(); float w = depth.get_width(); float h = depth.get_height(); - rs2::frame colour = frames.first(RS2_STREAM_COLOR); //.get_color_frame(); + rscolour_ = frames.first(RS2_STREAM_COLOR); //.get_color_frame(); - //LOG(INFO) << " RS Frame size = " << w << "x" << h; - - //std::unique_lock<std::mutex> lk(mutex_); cv::Mat tmp(cv::Size((int)w, (int)h), CV_16UC1, (void*)depth.get_data(), depth.get_stride_in_bytes()); tmp.convertTo(depth_, CV_32FC1, scale_); - rgb_ = cv::Mat(cv::Size(w, h), CV_8UC4, (void*)colour.get_data(), cv::Mat::AUTO_STEP); - //LOG(INFO) << "RS FRAME GRABBED: " << rgb_.cols << "x" << rgb_.rows; + rgb_ = cv::Mat(cv::Size(w, h), CV_8UC4, (void*)rscolour_.get_data(), cv::Mat::AUTO_STEP); return true; } diff --git a/components/rgbd-sources/src/realsense_source.hpp b/components/rgbd-sources/src/realsense_source.hpp index d2ca206a2756d6c90a2491728e1b0b9b1bfab662..2a48ac2dbe06e7bcefdb70112bb6f5c0e4d0461a 100644 --- a/components/rgbd-sources/src/realsense_source.hpp +++ b/components/rgbd-sources/src/realsense_source.hpp @@ -25,6 +25,7 @@ class RealsenseSource : public ftl::rgbd::detail::Source { float scale_; rs2::pipeline pipe_; rs2::align align_to_depth_; + rs2::frame rscolour_; }; } diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index e9c0701c4e72f7d9f032f4e87173919cfa75d119..2e084975d6fd8299e9787719790fcc842c266668 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -93,13 +93,17 @@ Streamer::~Streamer() { } void Streamer::add(Source *src) { - UNIQUE_LOCK(mutex_,ulk); - if (sources_.find(src->getID()) != sources_.end()) return; + StreamSource *s = nullptr; + + { + UNIQUE_LOCK(mutex_,ulk); + if (sources_.find(src->getID()) != sources_.end()) return; - StreamSource *s = new StreamSource; - s->src = src; - s->state = 0; - sources_[src->getID()] = s; + StreamSource *s = new StreamSource; + s->src = src; + s->state = 0; + sources_[src->getID()] = s; + } LOG(INFO) << "Streaming: " << src->getID(); net_->broadcast("add_stream", src->getID()); @@ -108,7 +112,7 @@ void Streamer::add(Source *src) { void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { StreamSource *s = nullptr; - { + //{ UNIQUE_LOCK(mutex_,slk); if (sources_.find(source) == sources_.end()) return; @@ -118,7 +122,7 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID DLOG(INFO) << "Adding Stream Peer: " << peer.to_string(); s = sources_[source]; - } + //} if (!s) return;