diff --git a/applications/vision/src/main.cpp b/applications/vision/src/main.cpp index bb77b0810c810cded5c1a74eec432b1ee47fa8f2..64c2cfbb7eb675a73944b7ddd7e2a553842a6021 100644 --- a/applications/vision/src/main.cpp +++ b/applications/vision/src/main.cpp @@ -96,16 +96,16 @@ static void run(ftl::Configurable *root) { stream->add(source); stream->run(); - while (display->active()) { + while (ftl::running && display->active()) { cv::Mat rgb, depth; source->getRGBD(rgb, depth); if (!rgb.empty()) display->render(rgb, depth, source->getParameters()); display->wait(10); } + LOG(INFO) << "Stopping..."; stream->stop(); - LOG(INFO) << "Finished."; delete stream; delete display; delete source; @@ -125,5 +125,7 @@ int main(int argc, char **argv) { //} else { // ftl::middlebury::test(config); //} + + return ftl::exit_code; } diff --git a/components/common/cpp/include/ftl/configuration.hpp b/components/common/cpp/include/ftl/configuration.hpp index 6a91c9b05781a6442844a53b73828027b1fcf9e0..531852ca5fbdc7f7be0eceda9eb32e341842dd7c 100644 --- a/components/common/cpp/include/ftl/configuration.hpp +++ b/components/common/cpp/include/ftl/configuration.hpp @@ -12,6 +12,9 @@ namespace ftl { +extern bool running; +extern int exit_code; + class Configurable; bool is_directory(const std::string &path); diff --git a/components/common/cpp/src/configuration.cpp b/components/common/cpp/src/configuration.cpp index c4369bd0057488455e43770aa7772ff350b6f395..11385dee3dd644bc06de0c5b771e4f5fd60c7063 100644 --- a/components/common/cpp/src/configuration.cpp +++ b/components/common/cpp/src/configuration.cpp @@ -47,6 +47,9 @@ using ftl::config::config; static Configurable *rootCFG = nullptr; +bool ftl::running = true; +int ftl::exit_code = 0; + bool ftl::is_directory(const std::string &path) { #ifdef WIN32 DWORD attrib = GetFileAttributesA(path.c_str()); @@ -413,7 +416,7 @@ static void signalIntHandler( int signum ) { // cleanup and close up stuff here // terminate program - exit(0); + ftl::running = false; } Configurable *ftl::config::configure(int argc, char **argv, const std::string &root) { diff --git a/components/control/cpp/src/slave.cpp b/components/control/cpp/src/slave.cpp index 8d5cefb9f33bdf910948d9a058730a2848f40b0f..75af907a82eadacd17f9c3e76702f43b8a681bfb 100644 --- a/components/control/cpp/src/slave.cpp +++ b/components/control/cpp/src/slave.cpp @@ -15,12 +15,15 @@ static void netLog(void* user_data, const loguru::Message& message) { Slave::Slave(Universe *net, ftl::Configurable *root) : net_(net) { net->bind("restart", []() { LOG(WARNING) << "Remote restart..."; - exit(1); + //exit(1); + ftl::exit_code = 1; + ftl::running = false; }); net->bind("shutdown", []() { LOG(WARNING) << "Remote shutdown..."; - exit(0); + //exit(0); + ftl::running = false; }); net->bind("update_cfg", [](const std::string &uri, const std::string &value) { diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index 6d28d3b84d1c6a9c3861c09239c9e97ccf4dc995..cffb6362d291fc333ad341c63b40a86739042de8 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -74,7 +74,10 @@ class Peer { /** * Close the peer if open. Setting retry parameter to true will initiate - * backoff retry attempts. + * backoff retry attempts. This is used to deliberately close a connection + * and not for error conditions where different close semantics apply. + * + * @param retry Should reconnection be attempted? */ void close(bool retry=false); @@ -83,10 +86,19 @@ class Peer { }; /** - * Block until the connection and handshake has completed. + * Block until the connection and handshake has completed. You should use + * onConnect callbacks instead of blocking, mostly this is intended for + * the unit tests to keep them synchronous. + * + * @return True if all connections were successful, false if timeout or error. */ bool waitConnection(); + /** + * Test if the connection is valid. This returns true in all conditions + * except where the socket has been disconnected permenantly or was never + * able to connect, perhaps due to an invalid address. + */ bool isValid() const { return status_ != kInvalid && sock_ != INVALID_SOCKET; }; @@ -116,6 +128,8 @@ class Peer { /** * Non-blocking Remote Procedure Call using a callback function. + * + * @return A call id for use with cancelCall() if needed. */ template <typename T, typename... ARGS> int asyncCall(const std::string &name, @@ -159,6 +173,8 @@ class Peer { void error(int e); bool _data(); + + void _badClose(bool retry=true); void _dispatchResponse(uint32_t id, msgpack::object &obj); void _sendResponse(uint32_t id, const msgpack::object &obj); diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index 407458c11cf1ecfa116a77b0568ee12cbe41276c..2d4bdc71f6ad3a35e4555009e7aac88a4d79cffa 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -20,6 +20,10 @@ namespace ftl { namespace net { +struct Error { + int errno; +}; + /** * Represents a group of network peers and their resources, managing the * searching of and sharing of resources across peers. Each universe can @@ -142,6 +146,14 @@ class Universe : public ftl::Configurable { void setLocalID(const ftl::UUID &u) { this_peer = u; }; const ftl::UUID &id() const { return this_peer; } + + // --- Event Handlers ------------------------------------------------------ + + void onConnect(const std::string &, std::function<void(ftl::net::Peer*)>); + void onDisconnect(const std::string &, std::function<void(ftl::net::Peer*)>); + void onError(const std::string &, std::function<void(ftl::net::Peer*, const ftl::net::Error &)>); + + void removeCallbacks(const std::string &); private: void _run(); @@ -149,7 +161,10 @@ class Universe : public ftl::Configurable { void _installBindings(); void _installBindings(Peer *); bool _subscribe(const std::string &res); - void _remove(Peer *); + void _cleanupPeers(); + void _notifyConnect(Peer *); + void _notifyDisconnect(Peer *); + void _notifyError(Peer *, const ftl::net::Error &); static void __start(Universe *u); @@ -167,6 +182,22 @@ class Universe : public ftl::Configurable { ftl::UUID id_; ftl::net::Dispatcher disp_; std::thread thread_; + + struct ConnHandler { + std::string name; + std::function<void(ftl::net::Peer*)> h; + }; + + struct ErrHandler { + std::string name; + std::function<void(ftl::net::Peer*, const ftl::net::Error &)> h; + }; + + // Handlers + std::list<ConnHandler> on_connect_; + std::list<ConnHandler> on_disconnect_; + std::list<ErrHandler> on_error_; + // std::map<std::string, std::vector<ftl::net::Peer*>> subscriptions_; }; diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index bb29691e5e8bc2d8203277e03829fff5b76ddeb9..ed732f1b9c8b9ca21b3a76887e4b36a816a8cc2c 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -151,7 +151,7 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { LOG(INFO) << "Handshake 2 received"; if (magic != ftl::net::kMagic) { - close(); + _badClose(false); LOG(ERROR) << "Invalid magic during handshake"; } else { status_ = kConnected; @@ -163,6 +163,10 @@ Peer::Peer(SOCKET s, Dispatcher *d) : sock_(s) { } }); + bind("__disconnect__", [this]() { + _badClose(false); + }); + send("__handshake__", ftl::net::kMagic, ftl::net::kVersion, ftl::net::this_peer); } } @@ -186,7 +190,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { if (sock_ != INVALID_SOCKET) { if (!ws_connect(sock_, uri)) { LOG(ERROR) << "Websocket connection failed"; - close(); + _badClose(false); } } else { LOG(ERROR) << "Connection refused to " << uri.getHost() << ":" << uri.getPort(); @@ -204,7 +208,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { LOG(INFO) << "Handshake 1 received"; if (magic != ftl::net::kMagic) { - close(); + _badClose(false); LOG(ERROR) << "Invalid magic during handshake"; } else { status_ = kConnected; @@ -247,6 +251,16 @@ void Peer::_updateURI() { } void Peer::close(bool retry) { + if (sock_ != INVALID_SOCKET) { + + // Attempt to inform about disconnect + send("__disconnect__"); + + _badClose(retry); + } +} + +void Peer::_badClose(bool retry) { if (sock_ != INVALID_SOCKET) { #ifndef WIN32 ::close(sock_); @@ -257,6 +271,7 @@ void Peer::close(bool retry) { status_ = kDisconnected; // Attempt auto reconnect? + if (retry) LOG(INFO) << "Should attempt reconnect..."; //auto i = find(sockets.begin(),sockets.end(),this); //sockets.erase(i); @@ -330,12 +345,12 @@ bool Peer::_data() { obj.convert(hs); if (get<1>(hs) != "__handshake__") { - close(); + _badClose(false); LOG(ERROR) << "Missing handshake"; return false; } } catch(...) { - close(); + _badClose(false); LOG(ERROR) << "Bad first message format"; return false; } @@ -345,116 +360,6 @@ bool Peer::_data() { return false; } -/*bool Socket::data() { - //Read data from socket - size_t n = 0; - int c = 0; - uint32_t len = 0; - - if (pos_ < 4) { - n = 4 - pos_; - } else { - len = *(int*)buffer_; - n = len+4-pos_; - } - - while (pos_ < len+4) { - if (len > MAX_MESSAGE) { - close(); - LOG(ERROR) << "Socket: " << uri_ << " - message attack"; - return false; - } - - const int rc = ftl::net::internal::recv(sock_, buffer_+pos_, n, 0); - - if (rc > 0) { - pos_ += static_cast<size_t>(rc); - - if (pos_ < 4) { - n = 4 - pos_; - } else { - len = *(int*)buffer_; - n = len+4-pos_; - } - } else if (rc == EWOULDBLOCK || rc == 0) { - // Data not yet available - if (c == 0) { - LOG(INFO) << "Socket disconnected " << uri_; - close(); - } - return false; - } else { - LOG(ERROR) << "Socket: " << uri_ << " - error " << rc; - close(); - return false; - } - c++; - } - - // Route the message... - uint32_t service = ((uint32_t*)buffer_)[1]; - auto d = std::string(buffer_+8, len-4); - - pos_ = 0; // DODGY, processing messages inside handlers is dangerous. - gpos_ = 0; - - if (service == FTL_PROTOCOL_HS1 && !connected_) { - handshake1(); - } else if (service == FTL_PROTOCOL_HS2 && !connected_) { - handshake2(); - } else if (service == FTL_PROTOCOL_RPC) { - if (proto_) proto_->dispatchRPC(*this, d); - else LOG(WARNING) << "No protocol set for socket " << uri_; - } else if (service == FTL_PROTOCOL_RPCRETURN) { - _dispatchReturn(d); - } else { - if (proto_) proto_->dispatchRaw(service, *this); - else LOG(WARNING) << "No protocol set for socket " << uri_; - } - - return true; -}*/ - -/*int Socket::read(char *b, size_t count) { - if (count > size()) LOG(WARNING) << "Reading too much data for service " << header_->service; - count = (count > size() || count==0) ? size() : count; - // TODO, utilise recv directly here... - memcpy(b,data_+gpos_,count); - gpos_+=count; - return count; -} - -int Socket::read(std::string &s, size_t count) { - count = (count > size() || count==0) ? size() : count; - s = std::string(data_+gpos_,count); - return count; -} - -void Socket::handshake1() { - Handshake header; - read(header); - - std::string peer; - if (header.name_size > 0) read(peer,header.name_size); - - std::string protouri; - if (header.proto_size > 0) read(protouri,header.proto_size); - - if (protouri.size() > 0) { - remote_proto_ = protouri; - // TODO Validate protocols with local protocol? - } - - send(FTL_PROTOCOL_HS2); // TODO Counterpart protocol. - LOG(INFO) << "Handshake (" << protouri << ") confirmed from " << uri_; - _connected(); -} - -void Socket::handshake2() { - LOG(INFO) << "Handshake finalised for " << uri_; - _connected(); -}*/ - void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... @@ -548,7 +453,7 @@ int Peer::_send() { // We are blocking, so -1 should mean actual error if (c == -1) { socketError(); - close(); + _badClose(); } return c; @@ -557,7 +462,7 @@ int Peer::_send() { Peer::~Peer() { std::unique_lock<std::mutex> lk1(send_mtx_); std::unique_lock<std::mutex> lk2(recv_mtx_); - close(); + _badClose(false); delete disp_; } diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index 28a6b00565ca2f0bffd47814a6417c4a33138f76..04404e8da0030f8e267adbe0bd485c1eb1da9aab 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -51,6 +51,8 @@ Universe::Universe(nlohmann::json &config) : } Universe::~Universe() { + LOG(INFO) << "Cleanup Network ..."; + active_ = false; thread_.join(); @@ -88,6 +90,7 @@ Peer *Universe::connect(const string &addr) { p->onConnect([this](Peer &p) { peer_ids_[p.id()] = &p; + _notifyConnect(&p); }); return p; @@ -131,16 +134,15 @@ int Universe::_setDescriptors() { FD_SET(s->_socket(), &sfdread_); } FD_SET(s->_socket(), &sfderror_); - } else if (s) { - _remove(s); } } + _cleanupPeers(); return n; } void Universe::_installBindings(Peer *p) { - + } void Universe::_installBindings() { @@ -159,17 +161,24 @@ void Universe::_installBindings() { } // Note: should be called inside a net lock -void Universe::_remove(Peer *p) { - LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); - for (auto i=peers_.begin(); i!=peers_.end(); i++) { - if ((*i) == p) { - peers_.erase(i); break; +void Universe::_cleanupPeers() { + + auto i = peers_.begin(); + while (i != peers_.end()) { + if (!(*i)->isValid()) { + Peer *p = *i; + LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); + _notifyDisconnect(p); + + auto ix = peer_ids_.find(p->id()); + if (ix != peer_ids_.end()) peer_ids_.erase(ix); + delete p; + + i = peers_.erase(i); + } else { + i++; } } - - auto ix = peer_ids_.find(p->id()); - if (ix != peer_ids_.end()) peer_ids_.erase(ix); - delete p; } Peer *Universe::getPeer(const UUID &id) const { @@ -244,6 +253,7 @@ void Universe::_run() { int n = _setDescriptors(); int selres = 1; + // It is an error to use "select" with no sockets ... so just sleep if (n == 0) { std::this_thread::sleep_for(std::chrono::milliseconds(300)); continue; @@ -285,6 +295,9 @@ void Universe::_run() { _installBindings(p); p->onConnect([this](Peer &p) { peer_ids_[p.id()] = &p; + // Note, called in another thread so above lock + // does not apply. + _notifyConnect(&p); }); } } @@ -302,11 +315,87 @@ void Universe::_run() { s->socketError(); s->close(); } - } else if (s != NULL) { - // Erase it - _remove(s); } } + // TODO(Nick) Don't always need to call this + _cleanupPeers(); } } +void Universe::onConnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { + unique_lock<mutex> lk(net_mutex_); + on_connect_.push_back({name, cb}); +} + +void Universe::onDisconnect(const std::string &name, std::function<void(ftl::net::Peer*)> cb) { + unique_lock<mutex> lk(net_mutex_); + on_disconnect_.push_back({name, cb}); +} + +void Universe::onError(const std::string &name, std::function<void(ftl::net::Peer*, const ftl::net::Error &)> cb) { + unique_lock<mutex> lk(net_mutex_); + on_error_.push_back({name, cb}); +} + +void Universe::removeCallbacks(const std::string &name) { + unique_lock<mutex> lk(net_mutex_); + { + auto i = on_connect_.begin(); + while (i != on_connect_.end()) { + if ((*i).name == name) { + i = on_connect_.erase(i); + } else { + i++; + } + } + } + + { + auto i = on_disconnect_.begin(); + while (i != on_disconnect_.end()) { + if ((*i).name == name) { + i = on_disconnect_.erase(i); + } else { + i++; + } + } + } + + { + auto i = on_error_.begin(); + while (i != on_error_.end()) { + if ((*i).name == name) { + i = on_error_.erase(i); + } else { + i++; + } + } + } +} + +void Universe::_notifyConnect(Peer *p) { + unique_lock<mutex> lk(net_mutex_); + for (auto &i : on_connect_) { + try { + i.h(p); + } catch(...) { + LOG(ERROR) << "Exception inside OnConnect hander: " << i.name; + } + } +} + +void Universe::_notifyDisconnect(Peer *p) { + // In all cases, should already be locked outside this function call + //unique_lock<mutex> lk(net_mutex_); + for (auto &i : on_disconnect_) { + try { + i.h(p); + } catch(...) { + LOG(ERROR) << "Exception inside OnDisconnect hander: " << i.name; + } + } +} + +void Universe::_notifyError(Peer *p, const ftl::net::Error &e) { + // TODO(Nick) +} diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index 816edca923a854ed9ace53ef3a799e6fef7d5b65..58aaedd22e41f53cd1d109d2db3b972d705e332a 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -5,6 +5,7 @@ #include <chrono> using ftl::net::Universe; +using ftl::net::Peer; using std::this_thread::sleep_for; using std::chrono::milliseconds; @@ -76,6 +77,74 @@ TEST_CASE("Universe::connect()", "[net]") { //fin_server(); } +TEST_CASE("Universe::onConnect()", "[net]") { + Universe a; + Universe b; + + a.listen("tcp://localhost:7077"); + + SECTION("single valid remote init connection") { + bool done = false; + + a.onConnect("test", [&done](Peer *p) { + done = true; + }); + + b.connect("tcp://localhost:7077")->waitConnection(); + sleep_for(milliseconds(100)); + REQUIRE( done ); + } + + SECTION("single valid init connection") { + bool done = false; + + b.onConnect("test", [&done](Peer *p) { + done = true; + }); + + b.connect("tcp://localhost:7077")->waitConnection(); + sleep_for(milliseconds(100)); + REQUIRE( done ); + } +} + +TEST_CASE("Universe::onDisconnect()", "[net]") { + Universe a; + Universe b; + + a.listen("tcp://localhost:7077"); + + SECTION("single valid remote close") { + bool done = false; + + a.onDisconnect("test", [&done](Peer *p) { + done = true; + }); + + Peer *p = b.connect("tcp://localhost:7077"); + p->waitConnection(); + sleep_for(milliseconds(100)); + p->close(); + sleep_for(milliseconds(1100)); + REQUIRE( done ); + } + + SECTION("single valid close") { + bool done = false; + + b.onDisconnect("test", [&done](Peer *p) { + done = true; + }); + + Peer *p = b.connect("tcp://localhost:7077"); + p->waitConnection(); + sleep_for(milliseconds(100)); + p->close(); + sleep_for(milliseconds(1100)); + REQUIRE( done ); + } +} + TEST_CASE("Universe::broadcast()", "[net]") { Universe a; Universe b; diff --git a/components/rgbd-sources/src/rgbd_streamer.cpp b/components/rgbd-sources/src/rgbd_streamer.cpp index 6d6157936637c88e1f66d2ccf36b6136676688f0..fe758646f4dda505637fb807dc26362ff035f846 100644 --- a/components/rgbd-sources/src/rgbd_streamer.cpp +++ b/components/rgbd-sources/src/rgbd_streamer.cpp @@ -125,6 +125,7 @@ void Streamer::remove(const std::string &) { void Streamer::stop() { active_ = false; + pool_.stop(); } void Streamer::poll() { @@ -136,7 +137,11 @@ void Streamer::poll() { std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start; - sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); + if (elapsed.count() >= wait) { + LOG(WARNING) << "Frame rate below optimal @ " << (1.0f / elapsed.count()); + } else { + sleep_for(milliseconds((long long)((wait - elapsed.count()) * 1000.0f))); + } } void Streamer::run(bool block) { @@ -164,6 +169,10 @@ void Streamer::_swap(StreamSource &src) { } void Streamer::_schedule() { + std::mutex job_mtx; + std::condition_variable job_cv; + int jobs = 0; + for (auto s : sources_) { string uri = s.first; @@ -182,20 +191,35 @@ void Streamer::_schedule() { } slk.unlock(); + unique_lock<mutex> lk(job_mtx); + jobs += 2; + lk.unlock(); + // Grab job - pool_.push([this,uri](int id) { + pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { StreamSource *src = sources_[uri]; - src->src->grab(); + + try { + src->src->grab(); + } catch(...) { + LOG(ERROR) << "Grab Exception for: " << uri; + } unique_lock<shared_mutex> lk(src->mutex); //LOG(INFO) << "Grab frame"; src->state |= ftl::rgbd::detail::kGrabbed; _swap(*src); + lk.unlock(); + + unique_lock<mutex> ulk(job_mtx); + jobs--; + ulk.unlock(); + job_cv.notify_one(); }); // Transmit job // TODO, could do one for each bitrate... - pool_.push([this,uri](int id) { + pool_.push([this,uri,&jobs,&job_mtx,&job_cv](int id) { StreamSource *src = sources_[uri]; try { @@ -211,13 +235,16 @@ void Streamer::_schedule() { auto i = src->clients[0].begin(); while (i != src->clients[0].end()) { try { - net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf); + if (!net_->send((*i).peerid, (*i).uri, rgb_buf, d_buf)) { + (*i).txcount = (*i).txmax; + } } catch(...) { (*i).txcount = (*i).txmax; } (*i).txcount++; if ((*i).txcount >= (*i).txmax) { - LOG(INFO) << "Remove client"; + DLOG(2) << "Remove client"; + unique_lock<shared_mutex> lk(src->mutex); i = src->clients[0].erase(i); } else { i++; @@ -232,8 +259,18 @@ void Streamer::_schedule() { DLOG(2) << "Tx Frame: " << uri; src->state |= ftl::rgbd::detail::kTransmitted; _swap(*src); + lk.unlock(); + + unique_lock<mutex> ulk(job_mtx); + jobs--; + ulk.unlock(); + job_cv.notify_one(); }); } + + // TODO Wait until all jobs completed... + unique_lock<mutex> lk(job_mtx); + job_cv.wait(lk, [&jobs]{ return jobs == 0; }); } RGBDSource *Streamer::get(const std::string &uri) {