diff --git a/applications/reconstruct/src/main.cpp b/applications/reconstruct/src/main.cpp index 7885c167e44dd2a3f377bd88997890bd7fe1301e..a041142b6cab2bd7ba35c219d6c084518b2b9416 100644 --- a/applications/reconstruct/src/main.cpp +++ b/applications/reconstruct/src/main.cpp @@ -196,9 +196,11 @@ static void run(ftl::Configurable *root) { // TODO(Nick) Improve sync further... for (size_t i = 0; i < inputs.size(); i++) { - if (inputs[i].source->isReady()) inputs[i].source->grab(); + inputs[i].source->grab(); } + stream->wait(); + for (size_t i = 0; i < inputs.size(); i++) { if (!inputs[i].source->isReady()) { inputs[i].params.m_imageWidth = 0; diff --git a/components/common/cpp/include/ftl/threads.hpp b/components/common/cpp/include/ftl/threads.hpp new file mode 100644 index 0000000000000000000000000000000000000000..199b355b448032895f86c7ba12f44371fd2a2337 --- /dev/null +++ b/components/common/cpp/include/ftl/threads.hpp @@ -0,0 +1,30 @@ +#ifndef _FTL_THREADS_HPP_ +#define _FTL_THREADS_HPP_ + +#include <mutex> +#include <shared_mutex> + +#if defined _DEBUG && DEBUG_MUTEX +#include <loguru.hpp> +#include <chrono> +#include <type_traits> + +#define UNIQUE_LOCK(M,L) \ + auto start_##L = std::chrono::high_resolution_clock::now(); \ + std::unique_lock<std::remove_reference<decltype(M)>::type> L(M); \ + LOG(INFO) << "LOCK(" << #M "," #L << ") in " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_##L).count(); + +#define SHARED_LOCK(M,L) \ + auto start_##L = std::chrono::high_resolution_clock::now(); \ + std::shared_lock<std::remove_reference<decltype(M)>::type> L(M); \ + LOG(INFO) << "LOCK(" << #M "," #L << ") in " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - start_##L).count(); +#else +#define UNIQUE_LOCK(M,L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M); +#define SHARED_LOCK(M,L) std::shared_lock<std::remove_reference<decltype(M)>::type> L(M); +#endif // _DEBUG && DEBUG_MUTEX + +namespace ftl { + +} + +#endif // _FTL_THREADS_HPP_ diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index d017781c6c7c3ddde9dcaa9b0225e4294dd4f694..e0740b3c29687fbb0979b1859a9c3dbda31ff4eb 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -1,12 +1,11 @@ #include <ftl/slave.hpp> +#include <ftl/threads.hpp> + using ftl::Configurable; using ftl::net::Universe; using ftl::ctrl::Slave; using std::string; -using std::mutex; -using std::unique_lock; -using std::recursive_mutex; static void netLog(void* user_data, const loguru::Message& message) { Slave *slave = static_cast<Slave*>(user_data); @@ -45,7 +44,7 @@ Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net), in_log_(false) }); net->bind("log_subscribe", [this](const ftl::UUID &peer) { - unique_lock<recursive_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_, lk); log_peers_.push_back(peer); }); @@ -77,7 +76,7 @@ void Slave::stop() { } void Slave::sendLog(const loguru::Message& message) { - unique_lock<recursive_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_, lk); if (in_log_) return; in_log_ = true; diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 62142e17afd98764d34f891604a7e3598e616076..16393a55c9cd055971531903cf62f58a84b87a85 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -13,6 +13,7 @@ #include <ftl/net/dispatcher.hpp> #include <ftl/uri.hpp> #include <ftl/uuid.hpp> +#include <ftl/threads.hpp> #include <iostream> #include <sstream> @@ -20,7 +21,6 @@ #include <vector> #include <type_traits> #include <thread> -#include <mutex> #include <condition_variable> #include <chrono> @@ -267,13 +267,14 @@ class Peer { template <typename... ARGS> int Peer::send(const std::string &s, ARGS... args) { - std::unique_lock<std::recursive_mutex> lk(send_mtx_); + UNIQUE_LOCK(send_mtx_, lk); // Leave a blank entry for websocket header if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); auto args_obj = std::make_tuple(args...); auto call_obj = std::make_tuple(0,s,args_obj); msgpack::pack(send_buf_, call_obj); - return _send(); + int rc = _send(); + return rc; } template <typename F> @@ -291,7 +292,7 @@ R Peer::call(const std::string &name, ARGS... args) { R result; int id = asyncCall<R>(name, [&](const R &r) { - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); hasreturned = true; result = r; lk.unlock(); @@ -299,7 +300,7 @@ R Peer::call(const std::string &name, ARGS... args) { }, std::forward<ARGS>(args)...); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); } @@ -319,20 +320,20 @@ int Peer::asyncCall( ARGS... args) { auto args_obj = std::make_tuple(args...); auto rpcid = 0; - - LOG(INFO) << "RPC " << name << "() -> " << uri_; { // Could this be the problem???? - std::unique_lock<std::recursive_mutex> lk(cb_mtx_); + UNIQUE_LOCK(cb_mtx_,lk); // Register the CB rpcid = rpcid__++; callbacks_[rpcid] = std::make_unique<caller<T>>(cb); } + LOG(INFO) << "RPC " << name << "(" << rpcid << ") -> " << uri_; + auto call_obj = std::make_tuple(0,rpcid,name,args_obj); - std::unique_lock<std::recursive_mutex> lk(send_mtx_); + UNIQUE_LOCK(send_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); msgpack::pack(send_buf_, call_obj); _send(); diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 306747a652a98b645b576b55709e68ce1bf239a6..db9e703ddbe7054bf7e46fb52cebcc2d9fc6b690 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -10,12 +10,12 @@ #include <ftl/net/listener.hpp> #include <ftl/net/dispatcher.hpp> #include <ftl/uuid.hpp> +#include <ftl/threads.hpp> #include <nlohmann/json.hpp> #include <vector> #include <list> #include <string> #include <thread> -#include <shared_mutex> #include <map> namespace ftl { @@ -241,7 +241,7 @@ class Universe : public ftl::Configurable { template <typename F> void Universe::bind(const std::string &name, F func) { - std::unique_lock<std::shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); disp_.bind(name, func, typename ftl::internal::func_kind_info<F>::result_kind(), typename ftl::internal::func_kind_info<F>::args_kind()); @@ -260,7 +260,7 @@ bool Universe::subscribe(const ftl::URI &res, F func) { template <typename... ARGS> void Universe::broadcast(const std::string &name, ARGS... args) { - std::shared_lock<std::shared_mutex> lk(net_mutex_); + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { if (p->isConnected()) p->send(name, args...); } @@ -275,7 +275,7 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { std::optional<R> result; auto handler = [&](const std::optional<R> &r) { - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); if (hasreturned || !r) return; hasreturned = true; result = r; @@ -284,14 +284,15 @@ std::optional<R> Universe::findOne(const std::string &name, ARGS... args) { }; std::map<Peer*, int> record; - std::shared_lock<std::shared_mutex> lk(net_mutex_); + SHARED_LOCK(net_mutex_,lk); + for (auto p : peers_) { if (p->isConnected()) record[p] = p->asyncCall<std::optional<R>>(name, handler, args...); } lk.unlock(); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(m); + UNIQUE_LOCK(m,llk); cv.wait_for(llk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); // Cancel any further results @@ -317,7 +318,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { std::vector<R> results; auto handler = [&](const std::vector<R> &r) { - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); returncount++; results.insert(results.end(), r.begin(), r.end()); lk.unlock(); @@ -325,7 +326,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { }; std::map<Peer*, int> record; - std::shared_lock<std::shared_mutex> lk(net_mutex_); + SHARED_LOCK(net_mutex_,lk); for (auto p : peers_) { if (!p->isConnected()) { continue; @@ -336,7 +337,7 @@ std::vector<R> Universe::findAll(const std::string &name, ARGS... args) { lk.unlock(); { // Block thread until async callback notifies us - std::unique_lock<std::mutex> llk(m); + UNIQUE_LOCK(m,llk); cv.wait_for(llk, std::chrono::seconds(1), [&returncount,&sentcount]{return returncount == sentcount;}); // Cancel any further results diff --git a/components/net/cpp/src/net_internal.hpp b/components/net/cpp/src/net_internal.hpp index d6bcfd7398245387b947e695fe3d0138ec7c1267..f8586ea774a9c5b79e9c9a7513e992554c793611 100644 --- a/components/net/cpp/src/net_internal.hpp +++ b/components/net/cpp/src/net_internal.hpp @@ -1,6 +1,11 @@ #ifndef _FTL_NET_INTERNAL_HPP_ #define _FTL_NET_INTERNAL_HPP_ +#if defined _DEBUG && DEBUG_NET +#include <loguru.hpp> +#include <chrono> +#endif + namespace ftl { namespace net { namespace internal { #ifdef TEST_MOCKS #ifdef WIN32 @@ -14,11 +19,24 @@ namespace ftl { namespace net { namespace internal { #ifdef WIN32 inline int recv(SOCKET sd, char *buf, int n, int f) { return ::recv(sd,buf,n,f); } inline int send(SOCKET sd, const char *v, int cnt, int flags) { return ::send(sd,v,cnt,flags); } +#else +#if defined _DEBUG && DEBUG_NET + inline ssize_t recv(int sd, void *buf, size_t n, int f) { + return ::recv(sd,buf,n,f); + } + inline ssize_t writev(int sd, const struct iovec *v, int cnt) { + auto start = std::chrono::high_resolution_clock::now(); + return ::writev(sd,v,cnt); + std::chrono::duration<double> elapsed = + std::chrono::high_resolution_clock::now() - start; + LOG(INFO) << "WRITEV in " << elapsed.count() << "s"; + } #else inline ssize_t recv(int sd, void *buf, size_t n, int f) { return ::recv(sd,buf,n,f); } inline ssize_t writev(int sd, const struct iovec *v, int cnt) { return ::writev(sd,v,cnt); } -#endif -#endif +#endif // DEBUG +#endif // WIN32 +#endif // TEST_MOCKS }}} #endif // _FTL_NET_INTERNAL_HPP_ diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index 1541b56fdcb3a8c416150253037d360995dc76c6..f3f875ae8e648c176159417adf9711318ae3cb7d 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -207,7 +207,7 @@ Peer::Peer(const char *pUri, Universe *u, Dispatcher *d) : can_reconnect_(true), disp_ = new Dispatcher(d); // Must do to prevent receiving message before handlers are installed - unique_lock<recursive_mutex> lk(recv_mtx_); + UNIQUE_LOCK(recv_mtx_,lk); scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -393,7 +393,7 @@ void Peer::data() { // processed. //if (!is_waiting_) return; //is_waiting_ = false; - std::unique_lock<std::recursive_mutex> lk(recv_mtx_); + UNIQUE_LOCK(recv_mtx_,lk); recv_buf_.reserve_buffer(kMaxMessage); if (recv_buf_.buffer_capacity() < (kMaxMessage / 10)) { @@ -401,32 +401,39 @@ void Peer::data() { return; } - int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), recv_buf_.buffer_capacity(), 0); + int cap = recv_buf_.buffer_capacity(); + int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), cap, 0); - if (rc <= 0) { + if (rc >= cap) { + LOG(WARNING) << "More than buffers worth of data received"; + } + if (cap < (kMaxMessage / 10)) LOG(WARNING) << "NO BUFFER"; + + if (rc == 0) { + close(); + return; + } else if (rc < 0) { + socketError(); return; } recv_buf_.buffer_consumed(rc); + + // No thread currently processing messages so start one + if (is_waiting_) { + pool.push([](int id, Peer *p) { + p->_data(); + //p->is_waiting_ = true; + }, this); + is_waiting_ = false; + } lk.unlock(); - pool.push([](int id, Peer *p) { - p->_data(); - //p->is_waiting_ = true; - }, this); + //LOG(INFO) << "Received " << rc << " bytes"; } bool Peer::_data() { - std::unique_lock<std::recursive_mutex> lk(recv_mtx_); - - /*recv_buf_.reserve_buffer(kMaxMessage); - int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); - - if (rc <= 0) { - return false; - } - - recv_buf_.buffer_consumed(rc);*/ + UNIQUE_LOCK(recv_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS && !ws_read_header_) { wsheader_type ws; @@ -457,7 +464,14 @@ bool Peer::_data() { return false; } } + + // CHECK Safe to unlock here? + is_waiting_ = true; + lk.unlock(); disp_->dispatch(*this, obj); + // Relock before next loop of while + lk.lock(); + is_waiting_ = false; if (scheme_ == ftl::URI::SCHEME_WS && recv_buf_.nonparsed_size() > 0) { wsheader_type ws; @@ -467,12 +481,13 @@ bool Peer::_data() { ws_read_header_ = true; } } + is_waiting_ = true; // Can start another thread... return false; } void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... - unique_lock<recursive_mutex> lk(cb_mtx_); + UNIQUE_LOCK(cb_mtx_,lk); if (callbacks_.count(id) > 0) { DLOG(1) << "Received return RPC value"; @@ -489,7 +504,7 @@ void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { } void Peer::cancelCall(int id) { - unique_lock<recursive_mutex> lk(cb_mtx_); + UNIQUE_LOCK(cb_mtx_,lk); if (callbacks_.count(id) > 0) { callbacks_.erase(id); } @@ -497,7 +512,7 @@ void Peer::cancelCall(int id) { void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1,id,std::string(""),res); - std::unique_lock<std::recursive_mutex> lk(send_mtx_); + UNIQUE_LOCK(send_mtx_,lk); if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); msgpack::pack(send_buf_, res_obj); _send(); @@ -507,7 +522,7 @@ bool Peer::waitConnection() { if (status_ == kConnected) return true; std::mutex m; - std::unique_lock<std::mutex> lk(m); + UNIQUE_LOCK(m,lk); std::condition_variable cv; callback_t h = universe_->onConnect([this,&cv](Peer *p) { @@ -567,6 +582,7 @@ int Peer::_send() { #ifdef WIN32 // TODO(nick) Use WSASend instead as equivalent to writev + auto send_vec = send_buf_.vector(); auto send_size = send_buf_.vector_size(); int c = 0; @@ -576,6 +592,7 @@ int Peer::_send() { #else int c = ftl::net::internal::writev(sock_, send_buf_.vector(), (int)send_buf_.vector_size()); #endif + send_buf_.clear(); // We are blocking, so -1 should mean actual error @@ -588,8 +605,8 @@ int Peer::_send() { } Peer::~Peer() { - std::unique_lock<std::recursive_mutex> lk1(send_mtx_); - std::unique_lock<std::recursive_mutex> lk2(recv_mtx_); + UNIQUE_LOCK(send_mtx_,lk1); + UNIQUE_LOCK(recv_mtx_,lk2); _badClose(false); LOG(INFO) << "Deleting peer object"; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index af510a71038b486325a79b7744471dcbb068c817..76b3e546ab3ff40027ec635167a9bbdd1f93c9fe 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -84,7 +84,7 @@ void Universe::shutdown() { bool Universe::listen(const string &addr) { auto l = new Listener(addr.c_str()); if (!l) return false; - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); listeners_.push_back(l); return l->isListening(); } @@ -94,7 +94,7 @@ Peer *Universe::connect(const string &addr) { if (!p) return nullptr; if (p->status() != Peer::kInvalid) { - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); peers_.push_back(p); } @@ -103,7 +103,7 @@ Peer *Universe::connect(const string &addr) { } void Universe::unbind(const std::string &name) { - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); disp_.unbind(name); } @@ -122,7 +122,8 @@ int Universe::_setDescriptors() { SOCKET n = 0; - unique_lock<shared_mutex> lk(net_mutex_); + // TODO Shared lock for some of the time... + UNIQUE_LOCK(net_mutex_,lk); //Set file descriptor for the listening sockets. for (auto l : listeners_) { @@ -141,9 +142,9 @@ int Universe::_setDescriptors() { n = s->_socket(); } - if (s->isWaiting()) { + //if (s->isWaiting()) { FD_SET(s->_socket(), &sfdread_); - } + //} FD_SET(s->_socket(), &sfderror_); } } @@ -249,7 +250,7 @@ void Universe::_periodic() { auto i = reconnects_.begin(); while (i != reconnects_.end()) { if ((*i).peer->reconnect()) { - unique_lock<shared_mutex> lk(net_mutex_); + UNIQUE_LOCK(net_mutex_,lk); peers_.push_back((*i).peer); i = reconnects_.erase(i); } else if ((*i).tries > 0) { @@ -304,7 +305,7 @@ void Universe::_run() { //Wait for a network event or timeout in 3 seconds block.tv_sec = 0; - block.tv_usec = 10000; + block.tv_usec = 100000; selres = select(n+1, &sfdread_, 0, &sfderror_, &block); // NOTE Nick: Is it possible that not all the recvs have been called before I @@ -323,70 +324,83 @@ void Universe::_run() { continue; } - unique_lock<shared_mutex> lk(net_mutex_); - - //If connection request is waiting - for (auto l : listeners_) { - if (l && l->isListening()) { - if (FD_ISSET(l->_socket(), &sfdread_)) { - int rsize = sizeof(sockaddr_storage); - sockaddr_storage addr; - - //Finally accept this client connection. - SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); - - if (csock != INVALID_SOCKET) { - auto p = new Peer(csock, this, &disp_); - peers_.push_back(p); - _installBindings(p); + // CHECK Could this mutex be the problem!? + { + UNIQUE_LOCK(net_mutex_,lk); + + //If connection request is waiting + for (auto l : listeners_) { + if (l && l->isListening()) { + if (FD_ISSET(l->_socket(), &sfdread_)) { + int rsize = sizeof(sockaddr_storage); + sockaddr_storage addr; + + //Finally accept this client connection. + SOCKET csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); + + if (csock != INVALID_SOCKET) { + auto p = new Peer(csock, this, &disp_); + peers_.push_back(p); + _installBindings(p); + } } } } } // TODO(Nick) Might switch to shared lock here? - - //Also check each clients socket to see if any messages or errors are waiting - for (auto s : peers_) { - if (s != NULL && s->isValid()) { - //If message received from this client then deal with it - if (FD_ISSET(s->_socket(), &sfdread_)) { - s->data(); - } - if (FD_ISSET(s->_socket(), &sfderror_)) { - s->socketError(); - s->close(); + { + SHARED_LOCK(net_mutex_, lk); + + // Also check each clients socket to see if any messages or errors are waiting + for (auto s : peers_) { + if (s != NULL && s->isValid()) { + // Note: It is possible that the socket becomes invalid after check but before + // looking at the FD sets, therefore cache the original socket + SOCKET sock = s->_socket(); + if (sock == INVALID_SOCKET) continue; + + if (FD_ISSET(sock, &sfderror_)) { + s->socketError(); + s->close(); + continue; // No point in reading data... + } + //If message received from this client then deal with it + if (FD_ISSET(sock, &sfdread_)) { + s->data(); + } } } } + // TODO(Nick) Don't always need to call this - _cleanupPeers(); + //_cleanupPeers(); } } callback_t Universe::onConnect(const std::function<void(ftl::net::Peer*)> &cb) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); callback_t id = cbid__++; on_connect_.push_back({id, cb}); return id; } callback_t Universe::onDisconnect(const std::function<void(ftl::net::Peer*)> &cb) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); callback_t id = cbid__++; on_disconnect_.push_back({id, cb}); return id; } callback_t Universe::onError(const std::function<void(ftl::net::Peer*, const ftl::net::Error &)> &cb) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); callback_t id = cbid__++; on_error_.push_back({id, cb}); return id; } void Universe::removeCallback(callback_t cbid) { - unique_lock<shared_mutex> lk(handler_mutex_); + UNIQUE_LOCK(handler_mutex_,lk); { auto i = on_connect_.begin(); while (i != on_connect_.end()) { @@ -422,7 +436,7 @@ void Universe::removeCallback(callback_t cbid) { } void Universe::_notifyConnect(Peer *p) { - shared_lock<shared_mutex> lk(handler_mutex_); + SHARED_LOCK(handler_mutex_,lk); peer_ids_[p->id()] = p; for (auto &i : on_connect_) { @@ -437,7 +451,7 @@ void Universe::_notifyConnect(Peer *p) { void Universe::_notifyDisconnect(Peer *p) { // In all cases, should already be locked outside this function call //unique_lock<mutex> lk(net_mutex_); - shared_lock<shared_mutex> lk(handler_mutex_); + SHARED_LOCK(handler_mutex_,lk); for (auto &i : on_disconnect_) { try { i.h(p); diff --git a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp index 02025e22209a029e0bf6827214260369ab693929..208fef758e1c268d42c7f8a49bc0eb43c8b35b85 100644 --- a/components/rgbd-sources/include/ftl/rgbd/streamer.hpp +++ b/components/rgbd-sources/include/ftl/rgbd/streamer.hpp @@ -8,9 +8,10 @@ #include <ftl/rgbd/source.hpp> #include <ftl/net/universe.hpp> #include <string> -#include <list> +#include <vector> #include <map> #include <shared_mutex> +#include <atomic> namespace ftl { namespace rgbd { @@ -25,14 +26,17 @@ struct StreamClient { }; static const unsigned int kGrabbed = 0x1; -static const unsigned int kTransmitted = 0x2; +static const unsigned int kRGB = 0x2; +static const unsigned int kDepth = 0x4; struct StreamSource { ftl::rgbd::Source *src; - unsigned int state; // Busy or ready to swap? + std::atomic<unsigned int> state; // Busy or ready to swap? cv::Mat rgb; // Tx buffer cv::Mat depth; // Tx buffer - std::list<detail::StreamClient> clients[10]; // One list per bitrate + std::vector<unsigned char> rgb_buf; + std::vector<unsigned char> d_buf; + std::vector<detail::StreamClient> clients[10]; // One list per bitrate std::shared_mutex mutex; }; @@ -85,6 +89,8 @@ class Streamer : public ftl::Configurable { */ void stop(); + void wait(); + /** * Alternative to calling run(), it will operate a single frame capture, * compress and stream cycle. @@ -100,9 +106,12 @@ class Streamer : public ftl::Configurable { bool active_; ftl::net::Universe *net_; bool late_; + std::mutex job_mtx_; + std::condition_variable job_cv_; + std::atomic<int> jobs_; void _schedule(); - void _swap(detail::StreamSource &); + void _swap(detail::StreamSource *); void _addClient(const std::string &source, int N, int rate, const ftl::UUID &peer, const std::string &dest); }; diff --git a/components/rgbd-sources/src/net.cpp b/components/rgbd-sources/src/net.cpp index 7c56e5636753b2161b912dc19ac6f0eb9455b9b4..ef3fdd5d270288cd6c73422d0651cb40bced7b82 100644 --- a/components/rgbd-sources/src/net.cpp +++ b/components/rgbd-sources/src/net.cpp @@ -62,14 +62,18 @@ NetSource::~NetSource() { } void NetSource::_recv(const vector<unsigned char> &jpg, const vector<unsigned char> &d) { - unique_lock<shared_mutex> lk(host_->mutex()); + cv::Mat tmp_rgb, tmp_depth; - cv::imdecode(jpg, cv::IMREAD_COLOR, &rgb_); - //Mat(rgb_.size(), CV_16UC1); - cv::imdecode(d, cv::IMREAD_UNCHANGED, &depth_); - depth_.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); + // Decode in temporary buffers to prevent long locks + cv::imdecode(jpg, cv::IMREAD_COLOR, &tmp_rgb); + cv::imdecode(d, cv::IMREAD_UNCHANGED, &tmp_depth); + // Lock host to prevent grab + UNIQUE_LOCK(host_->mutex(),lk); + rgb_ = tmp_rgb; + tmp_depth.convertTo(depth_, CV_32FC1, 1.0f/(16.0f*100.0f)); N_--; + //lk.unlock(); } void NetSource::setPose(const Eigen::Matrix4f &pose) { @@ -128,8 +132,9 @@ void NetSource::_updateURI() { } bool NetSource::grab() { - if (N_ == 0) { - N_ += 10; + // Send one frame before end to prevent unwanted pause + if (N_ <= 2) { + N_ = 10; if (!host_->getNet()->send(peer_, "get_stream", *host_->get<string>("uri"), 10, 0, host_->getNet()->id(), *host_->get<string>("uri"))) { active_ = false; } diff --git a/components/rgbd-sources/src/realsense_source.cpp b/components/rgbd-sources/src/realsense_source.cpp index db97d93ebf61b76829685fc4238fa67fdce4af79..0766adb60669ae126cd89ee0faac66a2333e29cf 100644 --- a/components/rgbd-sources/src/realsense_source.cpp +++ b/components/rgbd-sources/src/realsense_source.cpp @@ -39,20 +39,16 @@ RealsenseSource::~RealsenseSource() { bool RealsenseSource::grab() { rs2::frameset frames = pipe_.wait_for_frames(); //rs2::align align(RS2_STREAM_DEPTH); - frames = align_to_depth_.process(frames); //align_to_depth_.process(frames); + //frames = align_to_depth_.process(frames); //align_to_depth_.process(frames); rs2::depth_frame depth = frames.get_depth_frame(); float w = depth.get_width(); float h = depth.get_height(); - rs2::frame colour = frames.first(RS2_STREAM_COLOR); //.get_color_frame(); + rscolour_ = frames.first(RS2_STREAM_COLOR); //.get_color_frame(); - //LOG(INFO) << " RS Frame size = " << w << "x" << h; - - //std::unique_lock<std::mutex> lk(mutex_); cv::Mat tmp(cv::Size((int)w, (int)h), CV_16UC1, (void*)depth.get_data(), depth.get_stride_in_bytes()); tmp.convertTo(depth_, CV_32FC1, scale_); - rgb_ = cv::Mat(cv::Size(w, h), CV_8UC4, (void*)colour.get_data(), cv::Mat::AUTO_STEP); - //LOG(INFO) << "RS FRAME GRABBED: " << rgb_.cols << "x" << rgb_.rows; + rgb_ = cv::Mat(cv::Size(w, h), CV_8UC4, (void*)rscolour_.get_data(), cv::Mat::AUTO_STEP); return true; } diff --git a/components/rgbd-sources/src/realsense_source.hpp b/components/rgbd-sources/src/realsense_source.hpp index d2ca206a2756d6c90a2491728e1b0b9b1bfab662..2a48ac2dbe06e7bcefdb70112bb6f5c0e4d0461a 100644 --- a/components/rgbd-sources/src/realsense_source.hpp +++ b/components/rgbd-sources/src/realsense_source.hpp @@ -25,6 +25,7 @@ class RealsenseSource : public ftl::rgbd::detail::Source { float scale_; rs2::pipeline pipe_; rs2::align align_to_depth_; + rs2::frame rscolour_; }; } diff --git a/components/rgbd-sources/src/source.cpp b/components/rgbd-sources/src/source.cpp index be7d56057b323db1f636c0f593111accc5824659..bc84322c3d95c320e429accb39227dcce70c9ffd 100644 --- a/components/rgbd-sources/src/source.cpp +++ b/components/rgbd-sources/src/source.cpp @@ -1,5 +1,6 @@ #include <loguru.hpp> #include <ftl/rgbd/source.hpp> +#include <ftl/threads.hpp> #include "net.hpp" #include "stereovideo.hpp" @@ -139,9 +140,11 @@ ftl::rgbd::detail::Source *Source::_createDeviceImpl(const ftl::URI &uri) { } void Source::getFrames(cv::Mat &rgb, cv::Mat &depth) { - shared_lock<shared_mutex> lk(mutex_); - rgb_.copyTo(rgb); - depth_.copyTo(depth); + SHARED_LOCK(mutex_,lk); + //rgb_.copyTo(rgb); + //depth_.copyTo(depth); + rgb = rgb_; + depth = depth_; } Eigen::Vector4f Source::point(uint ux, uint uy) { @@ -149,7 +152,7 @@ Eigen::Vector4f Source::point(uint ux, uint uy) { const float x = ((float)ux-(float)params.cx) / (float)params.fx; const float y = ((float)uy-(float)params.cy) / (float)params.fy; - shared_lock<shared_mutex> lk(mutex_); + SHARED_LOCK(mutex_,lk); const float depth = depth_.at<float>(uy,ux); return Eigen::Vector4f(x*depth,y*depth,depth,1.0); } @@ -168,13 +171,13 @@ bool Source::hasCapability(capability_t) { } void Source::reset() { - unique_lock<shared_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_,lk); if (impl_) delete impl_; impl_ = _createImplementation(); } bool Source::grab() { - unique_lock<shared_mutex> lk(mutex_); + UNIQUE_LOCK(mutex_,lk); if (impl_ && impl_->grab()) { impl_->rgb_.copyTo(rgb_); impl_->depth_.copyTo(depth_); diff --git a/components/rgbd-sources/src/streamer.cpp b/components/rgbd-sources/src/streamer.cpp index 00a370fc6c8222514fc0ad6e0421c21692c82376..9b90cd4b8e7235aee0b2211dcda343b1b1d35e2e 100644 --- a/components/rgbd-sources/src/streamer.cpp +++ b/components/rgbd-sources/src/streamer.cpp @@ -24,13 +24,13 @@ using std::chrono::milliseconds; #define THREAD_POOL_SIZE 6 Streamer::Streamer(nlohmann::json &config, Universe *net) - : ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false) { + : ftl::Configurable(config), pool_(THREAD_POOL_SIZE), late_(false), jobs_(0) { active_ = false; net_ = net; net->bind("find_stream", [this](const std::string &uri) -> optional<UUID> { - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) { LOG(INFO) << "Valid source request received: " << uri; @@ -47,7 +47,7 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) }); net->bind("set_pose", [this](const std::string &uri, const std::vector<unsigned char> &buf) { - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) { Eigen::Matrix4f pose; @@ -59,7 +59,7 @@ Streamer::Streamer(nlohmann::json &config, Universe *net) // Allow remote users to access camera calibration matrix net->bind("source_calibration", [this](const std::string &uri) -> vector<unsigned char> { vector<unsigned char> buf; - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) { buf.resize(sizeof(Camera)); @@ -93,26 +93,48 @@ Streamer::~Streamer() { } void Streamer::add(Source *src) { - unique_lock<shared_mutex> ulk(mutex_); - if (sources_.find(src->getID()) != sources_.end()) return; + StreamSource *s = nullptr; - StreamSource *s = new StreamSource; - s->src = src; - s->state = 0; - sources_[src->getID()] = s; + { + UNIQUE_LOCK(mutex_,ulk); + if (sources_.find(src->getID()) != sources_.end()) return; + + StreamSource *s = new StreamSource; + s->src = src; + s->state = 0; + sources_[src->getID()] = s; + } LOG(INFO) << "Streaming: " << src->getID(); net_->broadcast("add_stream", src->getID()); } void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID &peer, const string &dest) { - unique_lock<shared_mutex> slk(mutex_); - if (sources_.find(source) == sources_.end()) return; + StreamSource *s = nullptr; + + //{ + UNIQUE_LOCK(mutex_,slk); + if (sources_.find(source) == sources_.end()) return; + + if (rate < 0 || rate >= 10) return; + if (N < 0 || N > ftl::rgbd::kMaxFrames) return; - if (rate < 0 || rate >= 10) return; - if (N < 0 || N > ftl::rgbd::kMaxFrames) return; + DLOG(INFO) << "Adding Stream Peer: " << peer.to_string(); - LOG(INFO) << "Adding Stream Peer: " << peer.to_string(); + s = sources_[source]; + //} + + if (!s) return; + + UNIQUE_LOCK(s->mutex, lk2); + for (int i=0; i<s->clients[rate].size(); i++) { + if (s->clients[rate][i].peerid == peer) { + StreamClient &c = s->clients[rate][i]; + c.txmax = N; + c.txcount = 0; + return; + } + } StreamClient c; c.peerid = peer; @@ -120,8 +142,6 @@ void Streamer::_addClient(const string &source, int N, int rate, const ftl::UUID c.txcount = 0; c.txmax = N; - StreamSource *s = sources_[source]; - //unique_lock<shared_mutex> ulk(s->mutex); s->clients[rate].push_back(c); } @@ -150,6 +170,7 @@ void Streamer::poll() { if (elapsed.count() >= wait) { LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); } else { + //LOG(INFO) << "Frame rate @ " << (1.0f / elapsed.count()); // Otherwise, wait until next frame should start. // CHECK(Nick) Is this accurate enough? Almost certainly not // TODO(Nick) Synchronise by time corrections and use of fixed time points @@ -176,20 +197,55 @@ void Streamer::run(bool block) { } // Must be called in source locked state or src.state must be atomic -void Streamer::_swap(StreamSource &src) { - if (src.state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kTransmitted)) { - src.src->getFrames(src.rgb, src.depth); - src.state = 0; +void Streamer::_swap(StreamSource *src) { + if (src->state == (ftl::rgbd::detail::kGrabbed | ftl::rgbd::detail::kRGB | ftl::rgbd::detail::kDepth)) { + UNIQUE_LOCK(src->mutex,lk); + + if (src->rgb_buf.size() > 0 && src->d_buf.size() > 0) { + auto i = src->clients[0].begin(); + while (i != src->clients[0].end()) { + try { + // TODO(Nick) Send pose and timestamp + if (!net_->send((*i).peerid, (*i).uri, src->rgb_buf, src->d_buf)) { + (*i).txcount = (*i).txmax; + } + } catch(...) { + (*i).txcount = (*i).txmax; + } + (*i).txcount++; + if ((*i).txcount >= (*i).txmax) { + LOG(INFO) << "Remove client: " << (*i).uri; + i = src->clients[0].erase(i); + } else { + i++; + } + } + } + src->src->getFrames(src->rgb, src->depth); + src->state = 0; } } +void Streamer::wait() { + // Do some jobs in this thread, might as well... + std::function<void(int)> j; + while ((bool)(j=pool_.pop())) { + j(-1); + } + + // Wait for all jobs to complete before finishing frame + UNIQUE_LOCK(job_mtx_, lk); + job_cv_.wait(lk, [this]{ return jobs_ == 0; }); +} + void Streamer::_schedule() { - std::mutex job_mtx; - std::condition_variable job_cv; - int jobs = 0; + wait(); + //std::mutex job_mtx; + //std::condition_variable job_cv; + //int jobs = 0; // Prevent new clients during processing. - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); for (auto s : sources_) { string uri = s.first; @@ -200,27 +256,62 @@ void Streamer::_schedule() { } // There will be two jobs for this source... - unique_lock<mutex> lk(job_mtx); - jobs += 2; - lk.unlock(); + //UNIQUE_LOCK(job_mtx_,lk); + jobs_ += 3; + //lk.unlock(); - // Grab job - pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { - StreamSource *src = sources_[uri]; + StreamSource *src = sources_[uri]; + if (src == nullptr || src->state != 0) continue; + // Grab job + pool_.push([this,src](int id) { + //StreamSource *src = sources_[uri]; src->src->grab(); // CHECK (Nick) Can state be an atomic instead? - unique_lock<shared_mutex> lk(src->mutex); + //UNIQUE_LOCK(src->mutex, lk); src->state |= ftl::rgbd::detail::kGrabbed; - _swap(*src); - lk.unlock(); + _swap(src); // Mark job as finished - unique_lock<mutex> ulk(job_mtx); - jobs--; - ulk.unlock(); - job_cv.notify_one(); + --jobs_; + job_cv_.notify_one(); + }); + + // Compress colour job + pool_.push([this,src](int id) { + if (!src->rgb.empty()) { + auto start = std::chrono::high_resolution_clock::now(); + + //vector<unsigned char> src->rgb_buf; + cv::imencode(".jpg", src->rgb, src->rgb_buf); + } + + src->state |= ftl::rgbd::detail::kRGB; + _swap(src); + --jobs_; + job_cv_.notify_one(); + }); + + // Compress depth job + pool_.push([this,src](int id) { + if (!src->depth.empty()) { + cv::Mat d2; + src->depth.convertTo(d2, CV_16UC1, 16*100); + //vector<unsigned char> d_buf; + + // Setting 1 = fast but large + // Setting 9 = small but slow + // Anything up to 8 causes minimal if any impact on frame rate + // on my (Nicks) laptop, but 9 halves the frame rate. + vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow. + cv::imencode(".png", d2, src->d_buf, pngparams); + } + + src->state |= ftl::rgbd::detail::kDepth; + _swap(src); + --jobs_; + job_cv_.notify_one(); }); // Transmit job @@ -228,13 +319,19 @@ void Streamer::_schedule() { // meaning that no lock is required here since outer shared_lock // prevents addition of new clients. // TODO, could do one for each bitrate... - pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { - StreamSource *src = sources_[uri]; + /* pool_.push([this,src](int id) { + //StreamSource *src = sources_[uri]; try { if (src && src->rgb.rows > 0 && src->depth.rows > 0 && src->clients[0].size() > 0) { + auto start = std::chrono::high_resolution_clock::now(); + vector<unsigned char> rgb_buf; cv::imencode(".jpg", src->rgb, rgb_buf); + + std::chrono::duration<double> elapsed = + std::chrono::high_resolution_clock::now() - start; + LOG(INFO) << "JPG in " << elapsed.count() << "s"; cv::Mat d2; src->depth.convertTo(d2, CV_16UC1, 16*100); @@ -244,60 +341,37 @@ void Streamer::_schedule() { // Setting 9 = small but slow // Anything up to 8 causes minimal if any impact on frame rate // on my (Nicks) laptop, but 9 halves the frame rate. - vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 5}; // Default is 1 for fast, 9 = small but slow. + vector<int> pngparams = {cv::IMWRITE_PNG_COMPRESSION, 1}; // Default is 1 for fast, 9 = small but slow. cv::imencode(".png", d2, d_buf, pngparams); //LOG(INFO) << "Data size: " << ((rgb_buf.size() + d_buf.size()) / 1024) << "kb"; - auto i = src->clients[0].begin(); - while (i != src->clients[0].end()) { - try { - // TODO(Nick) Send pose and timestamp - if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) { - (*i).txcount = (*i).txmax; - } - } catch(...) { - (*i).txcount = (*i).txmax; - } - (*i).txcount++; - if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client: " << (*i).uri; - i = src->clients[0].erase(i); - } else { - i++; - } - } + } } catch(...) { LOG(ERROR) << "Error in transmission loop"; } - /*std::chrono::duration<double> elapsed = - std::chrono::high_resolution_clock::now() - start; - LOG(INFO) << "Stream Elapsed: " << elapsed.count();*/ - // CHECK (Nick) Could state be an atomic? - unique_lock<shared_mutex> lk(src->mutex); + //UNIQUE_LOCK(src->mutex,lk); //LOG(INFO) << "Tx Frame: " << uri; src->state |= ftl::rgbd::detail::kTransmitted; _swap(*src); - lk.unlock(); + //lk.unlock(); // Mark job as finished - unique_lock<mutex> ulk(job_mtx); - jobs--; - ulk.unlock(); - job_cv.notify_one(); - }); - } + //UNIQUE_LOCK(job_mtx_,ulk); + //jobs_--; + //ulk.unlock(); - // Wait for all jobs to complete before finishing frame - unique_lock<mutex> lk(job_mtx); - job_cv.wait(lk, [&jobs]{ return jobs == 0; }); + --jobs_; + job_cv_.notify_one(); + });*/ + } } Source *Streamer::get(const std::string &uri) { - shared_lock<shared_mutex> slk(mutex_); + SHARED_LOCK(mutex_,slk); if (sources_.find(uri) != sources_.end()) return sources_[uri]->src; else return nullptr; }