diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 147fc65481dab7ccfa044efd7aafc8eb5b11193d..0f76c8f6acd519ef8ab0bfdad2115bf30513397f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -189,7 +189,12 @@ windows:test: script: - $env:PATH+=";C:/Shared/Deploy" - cd build - - ctest -V --output-on-failure --timeout 60 + - ctest -V --output-on-failure --timeout 60 --output-junit report.xml + + artifacts: + when: always + reports: + junit: build/report.xml windows:pack: only: diff --git a/include/ftl/threads.hpp b/include/ftl/threads.hpp index 33eade4f6c2b49bd62401dba8d339773b2a15636..955a54da0d7385642a1dc3de7ab9379692b31439 100644 --- a/include/ftl/threads.hpp +++ b/include/ftl/threads.hpp @@ -12,7 +12,7 @@ #define POOL_SIZE 10 -//#define DEBUG_MUTEX +// #define DEBUG_MUTEX #define MUTEX_TIMEOUT 2 #if defined DEBUG_MUTEX diff --git a/src/peer.cpp b/src/peer.cpp index 28dbdc2ba2f272a1e575dd006d715fc2f2ad468a..95f5b787980ebf9c86418c2e723b07035d386ba2 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -66,12 +66,12 @@ void Peer::_set_socket_options() { sock_->set_send_buffer_size(net_->getSendBufferSize(sock_->scheme())); sock_->set_recv_buffer_size(net_->getRecvBufferSize(sock_->scheme())); - LOG(1) << "send buffer size: " << (sock_->get_send_buffer_size() >> 10) << "KiB, " + DLOG(1) << "send buffer size: " << (sock_->get_send_buffer_size() >> 10) << "KiB, " << "recv buffer size: " << (sock_->get_recv_buffer_size() >> 10) << "KiB"; } void Peer::_send_handshake() { - LOG(1) << "(" << (outgoing_ ? "connecting" : "listening") + DLOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening") << " peer) handshake sent, status: " << (isConnected() ? "connected" : "connecting"); @@ -91,7 +91,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) { } else { if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!"; - LOG(1) << "(" << (outgoing_ ? "connecting" : "listening") + DLOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening") << " peer) handshake received from remote for " << pid.to_string(); status_ = NodeStatus::kConnected; @@ -127,39 +127,48 @@ void Peer::_bind_rpc() { } Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatcher* d) : - outgoing_(false), local_id_(0), - uri_("0"), status_(NodeStatus::kConnecting), can_reconnect_(false), - net_(u), sock_(std::move(s)) { + outgoing_(false), + local_id_(0), + uri_("0"), + status_(NodeStatus::kConnecting), + can_reconnect_(false), + net_(u), + sock_(std::move(s)), + disp_(std::make_unique<Dispatcher>(d)) { /* Incoming connection constructor */ CHECK(sock_) << "incoming SocketConnection pointer null"; _set_socket_options(); _updateURI(); - - disp_ = std::make_unique<Dispatcher>(d); - _bind_rpc(); - _send_handshake(); ++net_->peer_instances_; } Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : - outgoing_(true), local_id_(0), uri_(uri), - status_(NodeStatus::kInvalid), can_reconnect_(true), net_(u) { + outgoing_(true), + local_id_(0), + uri_(uri), + status_(NodeStatus::kInvalid), + can_reconnect_(true), + net_(u), + disp_(std::make_unique<Dispatcher>(d)) { /* Outgoing connection constructor */ - // Must do to prevent receiving message before handlers are installed - //UNIQUE_LOCK(recv_mtx_,lk); - - disp_ = std::make_unique<Dispatcher>(d); - _bind_rpc(); _connect(); ++net_->peer_instances_; } +void Peer::start() { + if (outgoing_) { + // Connect needs to be in constructor + } else { + _send_handshake(); + } +} + void Peer::_connect() { sock_ = ftl::net::internal::createConnection(uri_); // throws on bad uri _set_socket_options(); @@ -174,7 +183,15 @@ bool Peer::reconnect() { URI uri(uri_); - LOG(1) << "Reconnecting to " << uri_.to_string() << " ..."; + DLOG(INFO) << "Reconnecting to " << uri_.to_string() << " ..."; + + // First, ensure all stale jobs and buffer data are removed. + while (job_count_ > 0 && ftl::pool.size() > 0) { + DLOG(1) << "Waiting on peer jobs before reconnect " << job_count_; + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + } + recv_buf_.remove_nonparsed_buffer(); + recv_buf_.reset(); try { _connect(); @@ -203,7 +220,9 @@ void Peer::rawClose() { void Peer::close(bool retry) { // Attempt to inform about disconnect - if (sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); } + if (sock_->is_valid() && status_ == NodeStatus::kConnected) { + send("__disconnect__"); + } UNIQUE_LOCK(send_mtx_, lk_send); //UNIQUE_LOCK(recv_mtx_, lk_recv); @@ -217,7 +236,6 @@ void Peer::_close(bool retry) { // Attempt auto reconnect? if (retry && can_reconnect_) { status_ = NodeStatus::kReconnecting; - } else { status_ = NodeStatus::kDisconnected; } @@ -256,6 +274,19 @@ NodeType Peer::getType() const { return NodeType::kNode; } +void Peer::_createJob() { + ++job_count_; + + ftl::pool.push([this](int id) { + try { + _data(); + } catch (const std::exception &e) { + net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); + } + --job_count_; + }); +} + void Peer::data() { if (!sock_->is_valid()) { return; } @@ -304,17 +335,7 @@ void Peer::data() { recv_checked_.clear(); if (!already_processing_.test_and_set()) { //lk.unlock(); - - ++job_count_; - - ftl::pool.push([this](int id) { - try { - _data(); - } catch (const std::exception &e) { - net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); - } - --job_count_; - }); + _createJob(); } } @@ -400,15 +421,7 @@ bool Peer::_data() { net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what()); } - ++job_count_; - ftl::pool.push([this](int id) { - try { - _data(); - } catch (const std::exception &e) { - net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); - } - --job_count_; - }); + _createJob(); return true; } } catch(...) { @@ -428,16 +441,8 @@ bool Peer::_data() { //} } - // more data: repeat (loop) - ++job_count_; - ftl::pool.push([this](int id) { - try { - _data(); - } catch (const std::exception &e) { - net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what()); - } - --job_count_; - }); + // Process more data... + _createJob(); try { disp_->dispatch(*this, obj); @@ -514,8 +519,8 @@ bool Peer::waitConnection(int s) { else if (status_ == NodeStatus::kDisconnected) return false; std::mutex m; - std::unique_lock<std::mutex> lk(m); - std::condition_variable cv; + m.lock(); + std::condition_variable_any cv; auto h = net_->onConnect([this, &cv](const PeerPtr &p) { if (p.get() == this) { @@ -524,7 +529,8 @@ bool Peer::waitConnection(int s) { return true; }); - cv.wait_for(lk, seconds(s), [this]() { return status_ == NodeStatus::kConnected;}); + cv.wait_for(m, seconds(s), [this]() { return status_ == NodeStatus::kConnected;}); + m.unlock(); return status_ == NodeStatus::kConnected; } @@ -569,7 +575,9 @@ Peer::~Peer() { } // Prevent deletion if there are any jobs remaining - while (job_count_ > 0 && ftl::pool.size() > 0) { + if (job_count_ > 0 && ftl::pool.size() > 0) { + DLOG(1) << "Waiting on peer jobs... " << job_count_; std::this_thread::sleep_for(std::chrono::milliseconds(2)); + if (job_count_ > 0) LOG(FATAL) << "Peer jobs not terminated"; } } diff --git a/src/peer.hpp b/src/peer.hpp index 88b654397a07b1da976d6220a45b93886ab03051..2791559b9ee1639d9281c97f9d34d9b52525e93f 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -69,6 +69,8 @@ class Peer { explicit Peer(std::unique_ptr<internal::SocketConnection> s, ftl::net::Universe*, ftl::net::Dispatcher* d=nullptr); ~Peer(); + + void start(); /** * Close the peer if open. Setting retry parameter to true will initiate @@ -237,6 +239,8 @@ private: // Functions void _connect(); int _send(); + void _createJob(); + void _waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name); template<typename... ARGS> diff --git a/src/universe.cpp b/src/universe.cpp index e3944ddc96156fdb9dc7a712f4aa8655961caf79..5b5ad3185e08ce5a89a33aa4f7107ba1b54aebbd 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -178,8 +178,10 @@ void Universe::shutdown() { thread_.join(); // FIXME: This shouldn't be needed - while (peer_instances_ > 0 && ftl::pool.size() > 0) { + if (peer_instances_ > 0 && ftl::pool.size() > 0) { + DLOG(WARNING) << "Waiting on peer destruction... " << peer_instances_; std::this_thread::sleep_for(std::chrono::milliseconds(2)); + if (peer_instances_ > 0) LOG(FATAL) << "Peers not destroyed"; } } @@ -256,14 +258,10 @@ PeerPtr Universe::connect(const ftl::URI &u) { auto p = std::make_shared<Peer>(u, this, &disp_); - if (p->status() != NodeStatus::kInvalid) { - _insertPeer(p); - } - else { - DLOG(ERROR) << "Peer in invalid state"; - } - + _insertPeer(p); _installBindings(p); + p->start(); + return p; } @@ -285,13 +283,7 @@ int Universe::waitConnections(int seconds) { }); } -socket_t Universe::_setDescriptors() { - //Reset all file descriptors - //FD_ZERO(&impl_->sfdread_); - //FD_ZERO(&impl_->sfderror_); - - socket_t n = 0; - +void Universe::_setDescriptors() { SHARED_LOCK(net_mutex_, lk); impl_->pollfds.clear(); @@ -312,11 +304,7 @@ socket_t Universe::_setDescriptors() { fdentry.revents = 0; impl_->pollfds.push_back(fdentry); impl_->idMap[sock] = impl_->pollfds.size() - 1; - - //FD_SET(sock, &impl_->sfdread_); - //FD_SET(sock, &impl_->sfderror_); } - n = std::max<socket_t>(n, l->fd()); } } @@ -324,7 +312,6 @@ socket_t Universe::_setDescriptors() { for (const auto &s : peers_) { if (s && s->isValid()) { auto sock = s->_socket(); - n = std::max<socket_t>(n, sock); if (sock != INVALID_SOCKET) { pollfd fdentry; #ifdef WIN32 @@ -336,14 +323,9 @@ socket_t Universe::_setDescriptors() { fdentry.revents = 0; impl_->pollfds.push_back(fdentry); impl_->idMap[sock] = impl_->pollfds.size() - 1; - - //FD_SET(sock, &impl_->sfdread_); - //FD_SET(s->_socket(), &impl_->sfderror_); } } } - - return n; } void Universe::_installBindings(const PeerPtr &p) { @@ -549,6 +531,7 @@ void Universe::_run() { if (csock) { auto p = std::make_shared<Peer>(std::move(csock), this, &disp_); _insertPeer(p); + p->start(); } lk.lock(); diff --git a/src/universe.hpp b/src/universe.hpp index a245b8fab043d800abcc82be36e80b310b2e6e36..180eabf044b2d9eb614da8e142e2afd4f97495df 100644 --- a/src/universe.hpp +++ b/src/universe.hpp @@ -175,10 +175,9 @@ public: private: void _run(); - SOCKET _setDescriptors(); // TODO: move to implementation + void _setDescriptors(); void _installBindings(); void _installBindings(const ftl::net::PeerPtr&); - //bool _subscribe(const std::string &res); void _cleanupPeers(); void _notifyConnect(ftl::net::Peer *); void _notifyDisconnect(ftl::net::Peer *); diff --git a/test/net_integration.cpp b/test/net_integration.cpp index 0a11524f0ce419972d9047c2a44577ff163a988f..df9d898765c82020abdd9dcb2fc8588bf4b76d96 100644 --- a/test/net_integration.cpp +++ b/test/net_integration.cpp @@ -37,7 +37,7 @@ TEST_CASE("Listen and Connect", "[net]") { REQUIRE( p->waitConnection(5) ); - REQUIRE( self->numberOfNodes() == 1 ); + REQUIRE( self->waitConnections(5) == 1 ); REQUIRE( ftl::getSelf()->numberOfNodes() == 1); } @@ -65,10 +65,6 @@ TEST_CASE("Listen and Connect", "[net]") { } SECTION("automatic reconnect from originating connection") { - std::mutex mtx; - std::condition_variable cv; - std::unique_lock<std::mutex> lk(mtx); - auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); auto p_connecting = ftl::connectNode(uri); @@ -82,10 +78,6 @@ TEST_CASE("Listen and Connect", "[net]") { } SECTION("automatic reconnect from remote termination") { - std::mutex mtx; - std::condition_variable cv; - std::unique_lock<std::mutex> lk(mtx); - auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort()); auto p_connecting = ftl::connectNode(uri); @@ -97,11 +89,12 @@ TEST_CASE("Listen and Connect", "[net]") { auto nodes = self->getNodes(); REQUIRE( nodes.size() == 1 ); for (auto &node : nodes) { + node->waitConnection(5); node->close(); } bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() >= 2; }); - // REQUIRE( r ); + REQUIRE( r ); } ftl::protocol::reset(); diff --git a/test/net_performance.cpp b/test/net_performance.cpp index e5056f86e45d75d82f08832a19519f1ac5984522..abdcce819ff61b9d78c7a6354b05880db730fd26 100644 --- a/test/net_performance.cpp +++ b/test/net_performance.cpp @@ -23,14 +23,14 @@ static std::vector<DTYPE> data_test; static std::atomic_uint64_t recv_cnt_ = 0; static auto t_last_recv_ = std::chrono::steady_clock::now(); -static void recv_data(std::vector<DTYPE> data) { +static void recv_data(const std::vector<DTYPE> &data) { recv_cnt_.fetch_add(data.size() * sizeof(DTYPE)); t_last_recv_ = std::chrono::steady_clock::now(); } static float peer_send(ftl::net::Peer* p, const std::vector<DTYPE>& data, int cnt) { auto t_start = std::chrono::steady_clock::now(); - auto t_stop = std::chrono::steady_clock::now(); + decltype(t_start) t_stop; size_t bytes_sent = 0; size_t bytes = data.size() * sizeof(DTYPE); diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 3fe765116ad80180e8597534ff2e367bb3b41e90..6e16b2ab3ff53d5cc656b079387967d20fe0c0c2 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -32,6 +32,7 @@ TEST_CASE("TCP Stream", "[net]") { REQUIRE( s2 ); ftl::protocol::Packet rpkt; + rpkt.bitrate = 20; auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { rpkt = pkt; @@ -57,7 +58,8 @@ TEST_CASE("TCP Stream", "[net]") { pkt.frame_count = 1; s1->post(spkt, pkt); - REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout); + bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; }); + REQUIRE( r ); REQUIRE( rpkt.bitrate == 10 ); REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG ); REQUIRE( rpkt.frame_count == 1 );