diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 7885c167e44dd2a3f377bd88997890bd7fe1301e..a041142b6cab2bd7ba35c219d6c084518b2b9416 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -196,9 +196,11 @@ static void run(ftl::Configurable *root) { // TODO(Nick) Improve sync further... for (size_t i = 0; i < inputs.size(); i++) { - if (inputs[i].source->isReady()) inputs[i].source->grab(); + inputs[i].source->grab(); } + stream->wait(); + for (size_t i = 0; i < inputs.size(); i++) { if (!inputs[i].source->isReady()) { inputs[i].params.m_imageWidth = 0; diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp new file mode 100644 index 0000000000000000000000000000000000000000..199b355b448032895f86c7ba12f44371fd2a2337 --- /dev/null +++ b/components/common/cpp/include/ftl/threads.hpp @@ -0,0 +1,30 @@ +#ifndef _FTL_THREADS_HPP_ +#define _FTL_THREADS_HPP_ + +#include <mutex> +#include <shared_mutex> + +#if defined _DEBUG && DEBUG_MUTEX +#include <loguru.hpp> +#include <chrono> +#include <type_traits> + +#define UNIQUE_LOCK(M,L) \ + auto start_##L = std::chrono::high_resolution_clock::now(); \ + std::unique_lock<std::remove_reference<decltype(M)>::type> L(M); \ + LOG(INFO) << "LOCK(" << #M "," #L << ") in " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_##L).count(); + +#define SHARED_LOCK(M,L) \ + auto start_##L = std::chrono::high_resolution_clock::now(); \ + std::shared_lock<std::remove_reference<decltype(M)>::type> L(M); \ + LOG(INFO) << "LOCK(" << #M "," #L << ") in " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_##L).count(); +#else +#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M); +#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M); +#endif // _DEBUG && DEBUG_MUTEX + +namespace ftl { + +} + +#endif // _FTL_THREADS_HPP_ diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index d017781c6c7c3ddde9dcaa9b0225e4294dd4f694..e0740b3c29687fbb0979b1859a9c3dbda31ff4eb 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -1,12 +1,11 @@ #include <ftl/slave.hpp> +#include <ftl/threads.hpp> + using ftl::Configurable; using ftl::net::Universe; using ftl::ctrl::Slave; using std::string; -using std::mutex; -using std::unique_lock; -using std::recursive_mutex; static void netLog(void* user_data, const loguru::Message& message) { Slave *slave = static_cast<Slave*>(user_data); @@ -45,7 +44,7 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) }); net->bind("log_subscribe", [this](const ftl::UUID &peer) { - unique_lock<recursive_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_, lk); log_peers_.push_back(peer); }); @@ -77,7 +76,7 @@ void Slave::stop() { } void Slave::sendLog(const loguru::Message& message) { - unique_lock<recursive_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_, lk); if (in_log_) return; in_log_ = true; diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 62142e17afd98764d34f891604a7e3598e616076..35b7bfac3c6ae04411dbf2ffbefa625c531a38b4 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -13,6 +13,7 @@ #include <ftl/net/dispatcher.hpp> #include <ftl/uri.hpp> #include <ftl/uuid.hpp> +#include <ftl/threads.hpp> #include <iostream> #include <sstream> @@ -20,7 +21,6 @@ #include <vector> #include <type_traits> #include <thread> -#include <mutex> #include <condition_variable> #include <chrono> @@ -267,7 +267,7 @@ class Peer { template <typename... ARGS> int Peer::send(const std::string &s, ARGS... args) { - std::unique_lock<std::recursive_mutex> lk(send_mtx_); + UNIQUE_LOCK(send_mtx_, lk); // Leave a blank entry for websocket header if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); auto args_obj = std::make_tuple(args...); @@ -291,7 +291,7 @@ R Peer::call(const std::string &name, ARGS... args) { R result; int id = asyncCall<R>(name, [&](const R &r) { - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); hasreturned = true; result = r; lk.unlock(); @@ -299,7 +299,7 @@ R Peer::call(const std::string &name, ARGS... args) { }, std::forward<ARGS>(args)...); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); } @@ -324,7 +324,7 @@ int Peer::asyncCall( { // Could this be the problem???? - std::unique_lock<std::recursive_mutex> lk(cb_mtx_); + UNIQUE_LOCK(cb_mtx_,lk); // Register the CB rpcid = rpcid__++; callbacks_[rpcid] = std::make_unique<caller<T>>(cb); @@ -332,7 +332,7 @@ int Peer::asyncCall( auto call_obj = std::make_tuple(0,rpcid,name,args_obj); - std::unique_lock<std::recursive_mutex> lk(send_mtx_); + UNIQUE_LOCK(send_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); msgpack::pack(send_buf_, call_obj); _send(); diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 306747a652a98b645b576b55709e68ce1bf239a6..db9e703ddbe7054bf7e46fb52cebcc2d9fc6b690 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -10,12 +10,12 @@ #include <ftl/net/listener.hpp> #include <ftl/net/dispatcher.hpp> #include <ftl/uuid.hpp> +#include <ftl/threads.hpp> #include <nlohmann/json.hpp> #include <vector> #include <list> #include <string> #include <thread> -#include <shared_mutex> #include <map> namespace ftl { @@ -241,7 +241,7 @@ class Universe : public ftl::Configurable { template <typename F> void Universe::bind(const std::string &name, F func) { - std::unique_lock<std::shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); disp_.bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::args_kind()); @@ -260,7 +260,7 @@ bool Universe::subscribe(const ftl::URI &res, F func) { template <typename... ARGS> void Universe::broadcast(const std::string &name, ARGS... args) { - std::shared_lock<std::shared_mutex> lk(net_mutex_); + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { if (p->isConnected()) p->send(name, args...); } @@ -275,7 +275,7 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { std::optional<R> result; auto handler = [&](const std::optional<R> &r) { - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); if (hasreturned || !r) return; hasreturned = true; result = r; @@ -284,14 +284,15 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { }; std::map<Peer*, int> record; - std::shared_lock<std::shared_mutex> lk(net_mutex_); + SHARED_LOCK(net_mutex_,lk); + for (auto p : peers_) { if (p->isConnected()) record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); } lk.unlock(); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(m); + UNIQUE_LOCK(m,llk); cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); // Cancel any further results @@ -317,7 +318,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { std::vector<R> results; auto handler = [&](const std::vector<R> &r) { - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); returncount++; results.insert(results.end(), r.begin(), r.end()); lk.unlock(); @@ -325,7 +326,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { }; std::map<Peer*, int> record; - std::shared_lock<std::shared_mutex> lk(net_mutex_); + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { if (!p->isConnected()) { continue; @@ -336,7 +337,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { lk.unlock(); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(m); + UNIQUE_LOCK(m,llk); cv.wait_for(llk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;}); // Cancel any further results diff --git a/components/net/cpp/src/net_internal.hpp b/components/net/cpp/src/net_internal.hpp index d6bcfd7398245387b947e695fe3d0138ec7c1267..f8586ea774a9c5b79e9c9a7513e992554c793611 100644 --- a/components/net/cpp/src/net_internal.hpp +++ b/components/net/cpp/src/net_internal.hpp @@ -1,6 +1,11 @@ #ifndef _FTL_NET_INTERNAL_HPP_ #define _FTL_NET_INTERNAL_HPP_ +#if defined _DEBUG && DEBUG_NET +#include <loguru.hpp> +#include <chrono> +#endif + namespace ftl { namespace net { namespace internal { #ifdef TEST_MOCKS #ifdef WIN32 @@ -14,11 +19,24 @@ namespace ftl { namespace net { namespace internal { #ifdef WIN32 inline int recv(SOCKET sd, char *buf, int n, int f) { return ::recv(sd,buf,n,f); } inline int send(SOCKET sd, const char *v, int cnt, int flags) { return ::send(sd,v,cnt,flags); } +#else +#if defined _DEBUG && DEBUG_NET + inline ssize_t recv(int sd, void *buf, size_t n, int f) { + return ::recv(sd,buf,n,f); + } + inline ssize_t writev(int sd, const struct iovec *v, int cnt) { + auto start = std::chrono::high_resolution_clock::now(); + return ::writev(sd,v,cnt); + std::chrono::duration<double> elapsed = + std::chrono::high_resolution_clock::now() - start; + LOG(INFO) << "WRITEV in " << elapsed.count() << "s"; + } #else inline ssize_t recv(int sd, void *buf, size_t n, int f) { return ::recv(sd,buf,n,f); } inline ssize_t writev(int sd, const struct iovec *v, int cnt) { return ::writev(sd,v,cnt); } -#endif -#endif +#endif // DEBUG +#endif // WIN32 +#endif // TEST_MOCKS }}} #endif // _FTL_NET_INTERNAL_HPP_ diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index c32d627835d912502f78bffb1886338db092267c..6fc9397d7cb544fb1fcbcf89278b5660a5d62c97 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -207,7 +207,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), disp_ = new Dispatcher(d); // Must do to prevent receiving message before handlers are installed - unique_lock<recursive_mutex> lk(recv_mtx_); + UNIQUE_LOCK(recv_mtx_,lk); scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -393,7 +393,7 @@ void Peer::data() { // processed. //if (!is_waiting_) return; //is_waiting_ = false; - std::unique_lock<std::recursive_mutex> lk(recv_mtx_); + UNIQUE_LOCK(recv_mtx_,lk); recv_buf_.reserve_buffer(kMaxMessage); if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { @@ -407,8 +407,11 @@ void Peer::data() { if (rc >= cap) { LOG(WARNING) << "More than buffers worth of data received"; } + if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER"; if (rc <= 0) { + //LOG(WARNING) << "Weird rc: " << rc; + //close(); return; } @@ -424,7 +427,7 @@ void Peer::data() { } bool Peer::_data() { - std::unique_lock<std::recursive_mutex> lk(recv_mtx_); + UNIQUE_LOCK(recv_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) { wsheader_type ws; @@ -437,7 +440,7 @@ bool Peer::_data() { msgpack::object_handle msg; while (recv_buf_.next(msg)) { // CHECK Safe to unlock here? - lk.unlock(); + //lk.unlock(); ws_read_header_ = false; msgpack::object obj = msg.get(); if (status_ != kConnected) { @@ -460,7 +463,7 @@ bool Peer::_data() { disp_->dispatch(*this, obj); // Relock before next loop of while - lk.lock(); + //lk.lock(); if (scheme_ == ftl::URI::SCHEME_WS && recv_buf_.nonparsed_size() > 0) { wsheader_type ws; @@ -475,7 +478,7 @@ bool Peer::_data() { void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... - unique_lock<recursive_mutex> lk(cb_mtx_); + UNIQUE_LOCK(cb_mtx_,lk); if (callbacks_.count(id) > 0) { DLOG(1) << "Received return RPC value"; @@ -492,7 +495,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { } void Peer::cancelCall(int id) { - unique_lock<recursive_mutex> lk(cb_mtx_); + UNIQUE_LOCK(cb_mtx_,lk); if (callbacks_.count(id) > 0) { callbacks_.erase(id); } @@ -500,7 +503,7 @@ void Peer::cancelCall(int id) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); - std::unique_lock<std::recursive_mutex> lk(send_mtx_); + UNIQUE_LOCK(send_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); msgpack::pack(send_buf_, res_obj); _send(); @@ -510,7 +513,7 @@ bool Peer::waitConnection() { if (status_ == kConnected) return true; std::mutex m; - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); std::condition_variable cv; callback_t h = universe_->onConnect([this,&cv](Peer *p) { @@ -570,7 +573,6 @@ int Peer::_send() { #ifdef WIN32 // TODO(nick) Use WSASend instead as equivalent to writev - auto start = std::chrono::high_resolution_clock::now(); auto send_vec = send_buf_.vector(); auto send_size = send_buf_.vector_size(); @@ -578,14 +580,10 @@ int Peer::_send() { for (int i = 0; i < send_size; i++) { c += ftl::net::internal::send(sock_, (char*)send_vec[i].iov_base, (int)send_vec[i].iov_len, 0); } - - std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - - LOG(INFO) << "SEND TIME: " << elapsed.count(); #else int c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size()); #endif + send_buf_.clear(); // We are blocking, so -1 should mean actual error @@ -598,8 +596,8 @@ int Peer::_send() { } Peer::~Peer() { - std::unique_lock<std::recursive_mutex> lk1(send_mtx_); - std::unique_lock<std::recursive_mutex> lk2(recv_mtx_); + UNIQUE_LOCK(send_mtx_,lk1); + UNIQUE_LOCK(recv_mtx_,lk2); _badClose(false); LOG(INFO) << "Deleting peer object"; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 1a7e808a158b6ab30f33e08ec2c07ca80294f43a..eec96a6c79fde0600dd74e760467b65cece3f6ed 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -84,7 +84,7 @@ void Universe::shutdown() { bool Universe::listen(const string &addr) { auto l = new Listener(addr.c_str()); if (!l) return false; - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); listeners_.push_back(l); return l->isListening(); } @@ -94,7 +94,7 @@ Peer *Universe::connect(const string &addr) { if (!p) return nullptr; if (p->status() != Peer::kInvalid) { - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); peers_.push_back(p); } @@ -103,7 +103,7 @@ Peer *Universe::connect(const string &addr) { } void Universe::unbind(const std::string &name) { - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); disp_.unbind(name); } @@ -122,7 +122,7 @@ int Universe::_setDescriptors() { SOCKET n = 0; - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); //Set file descriptor for the listening sockets. for (auto l : listeners_) { @@ -249,7 +249,7 @@ void Universe::_periodic() { auto i = reconnects_.begin(); while (i != reconnects_.end()) { if ((*i).peer->reconnect()) { - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); peers_.push_back((*i).peer); i = reconnects_.erase(i); } else if ((*i).tries > 0) { @@ -304,7 +304,7 @@ void Universe::_run() { //Wait for a network event or timeout in 3 seconds block.tv_sec = 0; - block.tv_usec = 10000; + block.tv_usec = 100000; selres = select(n+1, &sfdread_, 0, &sfderror_, &block); // NOTE Nick: Is it possible that not all the recvs have been called before I @@ -324,7 +324,7 @@ void Universe::_run() { } // CHECK Could this mutex be the problem!? - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); //If connection request is waiting for (auto l : listeners_) { @@ -366,28 +366,28 @@ void Universe::_run() { } callback_t Universe::onConnect(const std::function<void(ftl::net::Peer*)> &cb) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); callback_t id = cbid__++; on_connect_.push_back({id, cb}); return id; } callback_t Universe::onDisconnect(const std::function<void(ftl::net::Peer*)> &cb) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); callback_t id = cbid__++; on_disconnect_.push_back({id, cb}); return id; } callback_t Universe::onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)> &cb) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); callback_t id = cbid__++; on_error_.push_back({id, cb}); return id; } void Universe::removeCallback(callback_t cbid) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); { auto i = on_connect_.begin(); while (i != on_connect_.end()) { @@ -423,7 +423,7 @@ void Universe::removeCallback(callback_t cbid) { } void Universe::_notifyConnect(Peer *p) { - shared_lock<shared_mutex> lk(handler_mutex_); + SHARED_LOCK(handler_mutex_,lk); peer_ids_[p->id()] = p; for (auto &i : on_connect_) { @@ -438,7 +438,7 @@ void Universe::_notifyConnect(Peer *p) { void Universe::_notifyDisconnect(Peer *p) { // In all cases, should already be locked outside this function call //unique_lock<mutex> lk(net_mutex_); - shared_lock<shared_mutex> lk(handler_mutex_); + SHARED_LOCK(handler_mutex_,lk); for (auto &i : on_disconnect_) { try { i.h(p); diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 972f48beca8771743bfd3111704001086a7c5546..065ee28c60cc9382c33e1136309275c64eed0f6d 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -85,6 +85,8 @@ class Streamer : public ftl::Configurable { */ void stop(); + void wait(); + /** * Alternative to calling run(), it will operate a single frame capture, * compress and stream cycle. @@ -100,6 +102,9 @@ class Streamer : public ftl::Configurable { bool active_; ftl::net::Universe *net_; bool late_; + std::mutex job_mtx_; + std::condition_variable job_cv_; + int jobs_; void _schedule(); void _swap(detail::StreamSource &); diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index b0f6b1a5dbeb942897b4249c20c45a680f9355f5..d928614b914dc8f367af5d6789fad41671a1121b 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -64,11 +64,13 @@ NetSource::~NetSource() { void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) { cv::Mat tmp_rgb, tmp_depth; + DLOG(INFO) << "Received frame"; + // Decode in temporary buffers to prevent long locks cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); - unique_lock<shared_mutex> lk(host_->mutex()); + UNIQUE_LOCK(host_->mutex(),lk); rgb_ = tmp_rgb; tmp_depth.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); N_--; @@ -132,7 +134,7 @@ void NetSource::_updateURI() { bool NetSource::grab() { // Send one frame before end to prevent unwanted pause - if (N_ <= 1) { + if (N_ <= 2) { N_ = 10; if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) { active_ = false; diff --git a/components/rgbd-sources/src/realsense_source.cpp b/components/rgbd-sources/src/realsense_source.cpp index db97d93ebf61b76829685fc4238fa67fdce4af79..309d3275799573846b622bea70b1dbae69b0da60 100644 --- a/components/rgbd-sources/src/realsense_source.cpp +++ b/components/rgbd-sources/src/realsense_source.cpp @@ -39,7 +39,7 @@ RealsenseSource::~RealsenseSource() { bool RealsenseSource::grab() { rs2::frameset frames = pipe_.wait_for_frames(); //rs2::align align(RS2_STREAM_DEPTH); - frames = align_to_depth_.process(frames); //align_to_depth_.process(frames); + //frames = align_to_depth_.process(frames); //align_to_depth_.process(frames); rs2::depth_frame depth = frames.get_depth_frame(); float w = depth.get_width(); diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index 4c7379574a145004c46aaf20083484e0e8a89f81..1a3d6885f5c4b589542ef76d3795b94033fc34c6 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -1,5 +1,6 @@ #include <loguru.hpp> #include <ftl/rgbd/source.hpp> +#include <ftl/threads.hpp> #include "net.hpp" #include "stereovideo.hpp" @@ -132,9 +133,11 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) { } void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) { - shared_lock<shared_mutex> lk(mutex_); - rgb_.copyTo(rgb); - depth_.copyTo(depth); + SHARED_LOCK(mutex_,lk); + //rgb_.copyTo(rgb); + //depth_.copyTo(depth); + rgb = rgb_; + depth = depth_; } Eigen::Vector4f Source::point(uint ux, uint uy) { @@ -142,7 +145,7 @@ Eigen::Vector4f Source::point(uint ux, uint uy) { const float x = ((float)ux-(float)params.cx) / (float)params.fx; const float y = ((float)uy-(float)params.cy) / (float)params.fy; - shared_lock<shared_mutex> lk(mutex_); + SHARED_LOCK(mutex_,lk); const float depth = depth_.at<float>(uy,ux); return Eigen::Vector4f(x*depth,y*depth,depth,1.0); } @@ -161,13 +164,13 @@ bool Source::hasCapability(capability_t) { } void Source::reset() { - unique_lock<shared_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_,lk); if (impl_) delete impl_; impl_ = _createImplementation(); } bool Source::grab() { - unique_lock<shared_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_,lk); if (impl_ && impl_->grab()) { impl_->rgb_.copyTo(rgb_); impl_->depth_.copyTo(depth_); diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index d5d2989fe1ca76ec0d016dff4e96ebfbe0692bf8..5b556a1fd0418009cfa53e849ca03f90c4548685 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -24,13 +24,13 @@ using std::chrono::milliseconds; #define THREAD_POOL_SIZE 6 Streamer::Streamer(nlohmann::json &config, Universe *net) - : ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false) { + : ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false), jobs_(0) { active_ = false; net_ = net; net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) { LOG(INFO) << "Valid source request received: " << uri; @@ -47,7 +47,7 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) }); net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) { - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) { Eigen::Matrix4f pose; @@ -59,7 +59,7 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) // Allow remote users to access camera calibration matrix net->bind("source_calibration", [this](const std::string &uri) -> vector<unsigned char> { vector<unsigned char> buf; - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) { buf.resize(sizeof(Camera)); @@ -93,7 +93,7 @@ Streamer::~Streamer() { } void Streamer::add(Source *src) { - unique_lock<shared_mutex> ulk(mutex_); + UNIQUE_LOCK(mutex_,ulk); if (sources_.find(src->getID()) != sources_.end()) return; StreamSource *s = new StreamSource; @@ -106,7 +106,7 @@ void Streamer::add(Source *src) { } void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { - unique_lock<shared_mutex> slk(mutex_); + UNIQUE_LOCK(mutex_,slk); if (sources_.find(source) == sources_.end()) return; if (rate < 0 || rate >= 10) return; @@ -191,13 +191,26 @@ void Streamer::_swap(StreamSource &src) { } } +void Streamer::wait() { + // Do some jobs in this thread, might as well... + std::function<void(int)> j; + while ((bool)(j=pool_.pop())) { + j(-1); + } + + // Wait for all jobs to complete before finishing frame + UNIQUE_LOCK(job_mtx_, lk); + job_cv_.wait(lk, [this]{ return jobs_ == 0; }); +} + void Streamer::_schedule() { - std::mutex job_mtx; - std::condition_variable job_cv; - int jobs = 0; + wait(); + //std::mutex job_mtx; + //std::condition_variable job_cv; + //int jobs = 0; // Prevent new clients during processing. - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); for (auto s : sources_) { string uri = s.first; @@ -208,27 +221,27 @@ void Streamer::_schedule() { } // There will be two jobs for this source... - unique_lock<mutex> lk(job_mtx); - jobs += 2; + UNIQUE_LOCK(job_mtx_,lk); + jobs_ += 2; lk.unlock(); // Grab job - pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { + pool_.push([this,uri](int id) { StreamSource *src = sources_[uri]; src->src->grab(); // CHECK (Nick) Can state be an atomic instead? - unique_lock<shared_mutex> lk(src->mutex); + UNIQUE_LOCK(src->mutex, lk); src->state |= ftl::rgbd::detail::kGrabbed; _swap(*src); lk.unlock(); // Mark job as finished - unique_lock<mutex> ulk(job_mtx); - jobs--; + UNIQUE_LOCK(job_mtx_, ulk); + jobs_--; ulk.unlock(); - job_cv.notify_one(); + job_cv_.notify_one(); }); // Transmit job @@ -236,7 +249,7 @@ void Streamer::_schedule() { // meaning that no lock is required here since outer shared_lock // prevents addition of new clients. // TODO, could do one for each bitrate... - pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { + pool_.push([this,uri](int id) { StreamSource *src = sources_[uri]; try { @@ -285,33 +298,23 @@ void Streamer::_schedule() { LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/ // CHECK (Nick) Could state be an atomic? - unique_lock<shared_mutex> lk(src->mutex); + UNIQUE_LOCK(src->mutex,lk); //LOG(INFO) << "Tx Frame: " << uri; src->state |= ftl::rgbd::detail::kTransmitted; _swap(*src); lk.unlock(); // Mark job as finished - unique_lock<mutex> ulk(job_mtx); - jobs--; + UNIQUE_LOCK(job_mtx_,ulk); + jobs_--; ulk.unlock(); - job_cv.notify_one(); + job_cv_.notify_one(); }); } - - // Do some jobs in this thread, might as well... - std::function<void(int)> j; - while ((bool)(j=pool_.pop())) { - j(-1); - } - - // Wait for all jobs to complete before finishing frame - unique_lock<mutex> lk(job_mtx); - job_cv.wait(lk, [&jobs]{ return jobs == 0; }); } Source *Streamer::get(const std::string &uri) { - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) return sources_[uri]->src; else return nullptr; }