diff --git a/applications/vision/src/streamer.cpp b/applications/vision/src/streamer.cpp index b9a3f78bb905383b94f002ccb431a863e897346d..0a4d0853583e7ef74d1f2985f56131c64598d9ec 100644 --- a/applications/vision/src/streamer.cpp +++ b/applications/vision/src/streamer.cpp @@ -54,7 +54,11 @@ void Streamer::send(const Mat &rgb, const Mat &depth) { cv::imencode(".png", d2, d_buf); LOG(INFO) << "Depth Size = " << ((float)d_buf.size() / (1024.0f*1024.0f)); - - net_.publish(uri_, rgb_buf, d_buf); + + try { + net_.publish(uri_, rgb_buf, d_buf); + } catch (...) { + LOG(ERROR) << "Exception on net publish to " << uri_; + } } diff --git a/components/net/cpp/include/ftl/net/peer.hpp b/components/net/cpp/include/ftl/net/peer.hpp index bf0aae4c9c937d02aef8566a1cc365e373b9c416..9f0d25d2beafa63b2562e6ea752e3b30e31bcfc1 100644 --- a/components/net/cpp/include/ftl/net/peer.hpp +++ b/components/net/cpp/include/ftl/net/peer.hpp @@ -195,22 +195,6 @@ class Peer { h(*this, args...); } } - - /*template <typename... ARGS> - int _send(const std::string &t, ARGS... args); - - template <typename... ARGS> - int _send(const array &b, ARGS... args); - - template <typename T, typename... ARGS> - int _send(const std::vector<T> &t, ARGS... args); - - template <typename... Types, typename... ARGS> - int _send(const std::tuple<Types...> &t, ARGS... args); - - template <typename T, typename... ARGS, - ENABLE_IF(std::is_trivial<T>::value && !std::is_pointer<T>::value)> - int _send(const T &t, ARGS... args);*/ private: // Data Status status_; diff --git a/components/net/cpp/include/ftl/net/universe.hpp b/components/net/cpp/include/ftl/net/universe.hpp index fe376971c5c644093ea037132eef2e7c21cd6d9a..f7fa2db894f14d0f61856eefd3596e79e18ead09 100644 --- a/components/net/cpp/include/ftl/net/universe.hpp +++ b/components/net/cpp/include/ftl/net/universe.hpp @@ -138,6 +138,7 @@ class Universe : public ftl::Configurable { void _installBindings(); void _installBindings(Peer *); bool _subscribe(const std::string &res); + void _remove(Peer *); static void __start(Universe *u); diff --git a/components/net/cpp/include/ftl/uuid.hpp b/components/net/cpp/include/ftl/uuid.hpp index 77b448beed1b337288e5a9d6bc041015b4031dcf..c7c884e5defa36ee8d4cd8bdf96c4f9e263a5d52 100644 --- a/components/net/cpp/include/ftl/uuid.hpp +++ b/components/net/cpp/include/ftl/uuid.hpp @@ -51,13 +51,16 @@ namespace ftl { * Get a pretty string. */ std::string to_string() const { - char b[37]; #ifdef WIN32 - UuidToString(&guid_, (RPC_CSTR*)b); + RPC_CSTR szUuid = NULL; + if (::UuidToStringA(&guid_, &szUuid) == RPC_S_OK) { + return std::string((char*)szUuid); + } #else + char b[37]; uuid_unparse(uuid_, b); -#endif return std::string(b); +#endif } /* Allow the UUID to be packed into an RPC message. */ diff --git a/components/net/cpp/src/peer.cpp b/components/net/cpp/src/peer.cpp index fb69ce699850bed6550098fdfae0e3a652ba3ae6..186b4ba7853947bf49de96c96325e565ddd3e675 100644 --- a/components/net/cpp/src/peer.cpp +++ b/components/net/cpp/src/peer.cpp @@ -310,7 +310,7 @@ void Peer::data() { } bool Peer::_data() { - // std::unique_lock<std::mutex> lk(recv_mtx_); + std::unique_lock<std::mutex> lk(recv_mtx_); recv_buf_.reserve_buffer(kMaxMessage); int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); @@ -556,6 +556,8 @@ int Peer::_send() { } Peer::~Peer() { + std::unique_lock<std::mutex> lk1(send_mtx_); + std::unique_lock<std::mutex> lk2(recv_mtx_); close(); delete disp_; diff --git a/components/net/cpp/src/universe.cpp b/components/net/cpp/src/universe.cpp index f98d33b8eb86979b16f9a1ae0c256c626682e9bf..290993394cee4b68a9a46ba06add7364a0ff5ee9 100644 --- a/components/net/cpp/src/universe.cpp +++ b/components/net/cpp/src/universe.cpp @@ -6,6 +6,10 @@ #pragma comment(lib, "Rpcrt4.lib") #endif +#ifndef WIN32 +#include <signal.h> +#endif + using std::string; using std::vector; using std::thread; @@ -122,6 +126,8 @@ int Universe::_setDescriptors() { FD_SET(s->_socket(), &sfdread_); } FD_SET(s->_socket(), &sfderror_); + } else if (s) { + _remove(s); } } @@ -147,6 +153,20 @@ void Universe::_installBindings() { }); } +// Note: should be called inside a net lock +void Universe::_remove(Peer *p) { + LOG(INFO) << "Removing disconnected peer: " << p->id().to_string(); + for (auto i=peers_.begin(); i!=peers_.end(); i++) { + if ((*i) == p) { + peers_.erase(i); break; + } + } + + auto ix = peer_ids_.find(p->id()); + if (ix != peer_ids_.end()) peer_ids_.erase(ix); + delete p; +} + Peer *Universe::getPeer(const UUID &id) const { auto ix = peer_ids_.find(id); if (ix == peer_ids_.end()) return nullptr; @@ -197,6 +217,9 @@ bool Universe::_subscribe(const std::string &res) { } void Universe::__start(Universe * u) { +#ifndef WIN32 + signal(SIGPIPE,SIG_IGN); +#endif // WIN32 u->_run(); } @@ -228,8 +251,10 @@ void Universe::_run() { //Some kind of error occured, it is usually possible to recover from this. if (selres < 0) { - std::cout << "SELECT ERROR " << selres << " - " << strerror(errno) << std::endl; - //return false; + switch (errno) { + case 9 : continue; // Bad file descriptor = socket closed + default : std::cout << "Unknown select error: " << strerror(errno) << std::endl; + } continue; } else if (selres == 0) { // Timeout, nothing to do... @@ -245,24 +270,17 @@ void Universe::_run() { int rsize = sizeof(sockaddr_storage); sockaddr_storage addr; - //int freeclient = freeSocket(); - - //if (freeclient >= 0) { - // TODO Limit connection rate or allow a pause in accepting - // TODO Send auto reject message under heavy load - - //Finally accept this client connection. - int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); + //Finally accept this client connection. + int csock = accept(l->_socket(), (sockaddr*)&addr, (socklen_t*)&rsize); - if (csock != INVALID_SOCKET) { - auto p = new Peer(csock, &disp_); - peers_.push_back(p); - _installBindings(p); - p->onConnect([this](Peer &p) { - peer_ids_[p.id()] = &p; - }); - } - //} + if (csock != INVALID_SOCKET) { + auto p = new Peer(csock, &disp_); + peers_.push_back(p); + _installBindings(p); + p->onConnect([this](Peer &p) { + peer_ids_[p.id()] = &p; + }); + } } } } @@ -272,25 +290,15 @@ void Universe::_run() { if (s != NULL && s->isValid()) { //If message received from this client then deal with it if (FD_ISSET(s->_socket(), &sfdread_)) { - //s->data(); - //std::cout << "QUEUE DATA PROC" << std::endl; - //p.push([](int id, Peer *s) { - // std::cout << "Processing in thread " << std::to_string(id) << std::endl; - s->data(); - //}, s); + s->data(); } if (FD_ISSET(s->_socket(), &sfderror_)) { s->socketError(); + s->close(); } } else if (s != NULL) { // Erase it - - for (auto i=peers_.begin(); i!=peers_.end(); i++) { - if ((*i) == s) { - LOG(INFO) << "REMOVING SOCKET"; - peers_.erase(i); break; - } - } + _remove(s); } } } diff --git a/components/net/cpp/test/net_integration.cpp b/components/net/cpp/test/net_integration.cpp index 3532c5b7f85fd1f49959a733119506ddf82b4b4e..d714ca0b9847bca2acbeed8b13f5f38f2e488d31 100644 --- a/components/net/cpp/test/net_integration.cpp +++ b/components/net/cpp/test/net_integration.cpp @@ -207,7 +207,8 @@ TEST_CASE("Universe::publish()", "") { Universe a; Universe b; a.listen("tcp://localhost:7077"); - b.connect("tcp://localhost:7077")->waitConnection(); + ftl::net::Peer *p = b.connect("tcp://localhost:7077"); + p->waitConnection(); SECTION("no subscribers") { a.createResource("ftl://test"); @@ -227,6 +228,23 @@ TEST_CASE("Universe::publish()", "") { REQUIRE( done == 56 ); } + + SECTION("publish to disconnected subscriber") { + int done = 0; + a.createResource("ftl://test2"); + REQUIRE( b.subscribe("ftl://test2", [&done](int a) { + done = a; + }) ); + sleep_for(milliseconds(50)); + + p->close(); + sleep_for(milliseconds(100)); + + a.publish("ftl://test2", 56); + sleep_for(milliseconds(50)); + + REQUIRE( done == 0 ); + } } /*TEST_CASE("net::listen()", "[net]") {