Skip to content
Snippets Groups Projects
Commit 9709fefd authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/#7' into 'main'

#7 Fix unreliable network tests

See merge request nicolas.pope/beyond-protocol!2
parents ca864e1a e4708ece
No related branches found
No related tags found
No related merge requests found
......@@ -189,7 +189,12 @@ windows:test:
script:
- $env:PATH+=";C:/Shared/Deploy"
- cd build
- ctest -V --output-on-failure --timeout 60
- ctest -V --output-on-failure --timeout 60 --output-junit report.xml
artifacts:
when: always
reports:
junit: build/report.xml
windows:pack:
only:
......
......@@ -12,7 +12,7 @@
#define POOL_SIZE 10
//#define DEBUG_MUTEX
// #define DEBUG_MUTEX
#define MUTEX_TIMEOUT 2
#if defined DEBUG_MUTEX
......
......@@ -66,12 +66,12 @@ void Peer::_set_socket_options() {
sock_->set_send_buffer_size(net_->getSendBufferSize(sock_->scheme()));
sock_->set_recv_buffer_size(net_->getRecvBufferSize(sock_->scheme()));
LOG(1) << "send buffer size: " << (sock_->get_send_buffer_size() >> 10) << "KiB, "
DLOG(1) << "send buffer size: " << (sock_->get_send_buffer_size() >> 10) << "KiB, "
<< "recv buffer size: " << (sock_->get_recv_buffer_size() >> 10) << "KiB";
}
void Peer::_send_handshake() {
LOG(1) << "(" << (outgoing_ ? "connecting" : "listening")
DLOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening")
<< " peer) handshake sent, status: "
<< (isConnected() ? "connected" : "connecting");
......@@ -91,7 +91,7 @@ void Peer::_process_handshake(uint64_t magic, uint32_t version, UUID pid) {
} else {
if (version != ftl::net::kVersion) LOG(WARNING) << "net protocol using different versions!";
LOG(1) << "(" << (outgoing_ ? "connecting" : "listening")
DLOG(INFO) << "(" << (outgoing_ ? "connecting" : "listening")
<< " peer) handshake received from remote for " << pid.to_string();
status_ = NodeStatus::kConnected;
......@@ -127,39 +127,48 @@ void Peer::_bind_rpc() {
}
Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatcher* d) :
outgoing_(false), local_id_(0),
uri_("0"), status_(NodeStatus::kConnecting), can_reconnect_(false),
net_(u), sock_(std::move(s)) {
outgoing_(false),
local_id_(0),
uri_("0"),
status_(NodeStatus::kConnecting),
can_reconnect_(false),
net_(u),
sock_(std::move(s)),
disp_(std::make_unique<Dispatcher>(d)) {
/* Incoming connection constructor */
CHECK(sock_) << "incoming SocketConnection pointer null";
_set_socket_options();
_updateURI();
disp_ = std::make_unique<Dispatcher>(d);
_bind_rpc();
_send_handshake();
++net_->peer_instances_;
}
Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) :
outgoing_(true), local_id_(0), uri_(uri),
status_(NodeStatus::kInvalid), can_reconnect_(true), net_(u) {
outgoing_(true),
local_id_(0),
uri_(uri),
status_(NodeStatus::kInvalid),
can_reconnect_(true),
net_(u),
disp_(std::make_unique<Dispatcher>(d)) {
/* Outgoing connection constructor */
// Must do to prevent receiving message before handlers are installed
//UNIQUE_LOCK(recv_mtx_,lk);
disp_ = std::make_unique<Dispatcher>(d);
_bind_rpc();
_connect();
++net_->peer_instances_;
}
void Peer::start() {
if (outgoing_) {
// Connect needs to be in constructor
} else {
_send_handshake();
}
}
void Peer::_connect() {
sock_ = ftl::net::internal::createConnection(uri_); // throws on bad uri
_set_socket_options();
......@@ -174,7 +183,15 @@ bool Peer::reconnect() {
URI uri(uri_);
LOG(1) << "Reconnecting to " << uri_.to_string() << " ...";
DLOG(INFO) << "Reconnecting to " << uri_.to_string() << " ...";
// First, ensure all stale jobs and buffer data are removed.
while (job_count_ > 0 && ftl::pool.size() > 0) {
DLOG(1) << "Waiting on peer jobs before reconnect " << job_count_;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}
recv_buf_.remove_nonparsed_buffer();
recv_buf_.reset();
try {
_connect();
......@@ -203,7 +220,9 @@ void Peer::rawClose() {
void Peer::close(bool retry) {
// Attempt to inform about disconnect
if (sock_->is_valid() && status_ == NodeStatus::kConnected) { send("__disconnect__"); }
if (sock_->is_valid() && status_ == NodeStatus::kConnected) {
send("__disconnect__");
}
UNIQUE_LOCK(send_mtx_, lk_send);
//UNIQUE_LOCK(recv_mtx_, lk_recv);
......@@ -217,7 +236,6 @@ void Peer::_close(bool retry) {
// Attempt auto reconnect?
if (retry && can_reconnect_) {
status_ = NodeStatus::kReconnecting;
} else {
status_ = NodeStatus::kDisconnected;
}
......@@ -256,6 +274,19 @@ NodeType Peer::getType() const {
return NodeType::kNode;
}
void Peer::_createJob() {
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
}
void Peer::data() {
if (!sock_->is_valid()) { return; }
......@@ -304,17 +335,7 @@ void Peer::data() {
recv_checked_.clear();
if (!already_processing_.test_and_set()) {
//lk.unlock();
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
_createJob();
}
}
......@@ -400,15 +421,7 @@ bool Peer::_data() {
net_->_notifyError(this, ftl::protocol::Error::kDispatchFailed, e.what());
}
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
_createJob();
return true;
}
} catch(...) {
......@@ -428,16 +441,8 @@ bool Peer::_data() {
//}
}
// more data: repeat (loop)
++job_count_;
ftl::pool.push([this](int id) {
try {
_data();
} catch (const std::exception &e) {
net_->_notifyError(this, ftl::protocol::Error::kUnknown, e.what());
}
--job_count_;
});
// Process more data...
_createJob();
try {
disp_->dispatch(*this, obj);
......@@ -514,8 +519,8 @@ bool Peer::waitConnection(int s) {
else if (status_ == NodeStatus::kDisconnected) return false;
std::mutex m;
std::unique_lock<std::mutex> lk(m);
std::condition_variable cv;
m.lock();
std::condition_variable_any cv;
auto h = net_->onConnect([this, &cv](const PeerPtr &p) {
if (p.get() == this) {
......@@ -524,7 +529,8 @@ bool Peer::waitConnection(int s) {
return true;
});
cv.wait_for(lk, seconds(s), [this]() { return status_ == NodeStatus::kConnected;});
cv.wait_for(m, seconds(s), [this]() { return status_ == NodeStatus::kConnected;});
m.unlock();
return status_ == NodeStatus::kConnected;
}
......@@ -569,7 +575,9 @@ Peer::~Peer() {
}
// Prevent deletion if there are any jobs remaining
while (job_count_ > 0 && ftl::pool.size() > 0) {
if (job_count_ > 0 && ftl::pool.size() > 0) {
DLOG(1) << "Waiting on peer jobs... " << job_count_;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
if (job_count_ > 0) LOG(FATAL) << "Peer jobs not terminated";
}
}
......@@ -69,6 +69,8 @@ class Peer {
explicit Peer(std::unique_ptr<internal::SocketConnection> s, ftl::net::Universe*, ftl::net::Dispatcher* d=nullptr);
~Peer();
void start();
/**
* Close the peer if open. Setting retry parameter to true will initiate
......@@ -237,6 +239,8 @@ private: // Functions
void _connect();
int _send();
void _createJob();
void _waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name);
template<typename... ARGS>
......
......@@ -178,8 +178,10 @@ void Universe::shutdown() {
thread_.join();
// FIXME: This shouldn't be needed
while (peer_instances_ > 0 && ftl::pool.size() > 0) {
if (peer_instances_ > 0 && ftl::pool.size() > 0) {
DLOG(WARNING) << "Waiting on peer destruction... " << peer_instances_;
std::this_thread::sleep_for(std::chrono::milliseconds(2));
if (peer_instances_ > 0) LOG(FATAL) << "Peers not destroyed";
}
}
......@@ -256,14 +258,10 @@ PeerPtr Universe::connect(const ftl::URI &u) {
auto p = std::make_shared<Peer>(u, this, &disp_);
if (p->status() != NodeStatus::kInvalid) {
_insertPeer(p);
}
else {
DLOG(ERROR) << "Peer in invalid state";
}
_insertPeer(p);
_installBindings(p);
p->start();
return p;
}
......@@ -285,13 +283,7 @@ int Universe::waitConnections(int seconds) {
});
}
socket_t Universe::_setDescriptors() {
//Reset all file descriptors
//FD_ZERO(&impl_->sfdread_);
//FD_ZERO(&impl_->sfderror_);
socket_t n = 0;
void Universe::_setDescriptors() {
SHARED_LOCK(net_mutex_, lk);
impl_->pollfds.clear();
......@@ -312,11 +304,7 @@ socket_t Universe::_setDescriptors() {
fdentry.revents = 0;
impl_->pollfds.push_back(fdentry);
impl_->idMap[sock] = impl_->pollfds.size() - 1;
//FD_SET(sock, &impl_->sfdread_);
//FD_SET(sock, &impl_->sfderror_);
}
n = std::max<socket_t>(n, l->fd());
}
}
......@@ -324,7 +312,6 @@ socket_t Universe::_setDescriptors() {
for (const auto &s : peers_) {
if (s && s->isValid()) {
auto sock = s->_socket();
n = std::max<socket_t>(n, sock);
if (sock != INVALID_SOCKET) {
pollfd fdentry;
#ifdef WIN32
......@@ -336,14 +323,9 @@ socket_t Universe::_setDescriptors() {
fdentry.revents = 0;
impl_->pollfds.push_back(fdentry);
impl_->idMap[sock] = impl_->pollfds.size() - 1;
//FD_SET(sock, &impl_->sfdread_);
//FD_SET(s->_socket(), &impl_->sfderror_);
}
}
}
return n;
}
void Universe::_installBindings(const PeerPtr &p) {
......@@ -549,6 +531,7 @@ void Universe::_run() {
if (csock) {
auto p = std::make_shared<Peer>(std::move(csock), this, &disp_);
_insertPeer(p);
p->start();
}
lk.lock();
......
......@@ -175,10 +175,9 @@ public:
private:
void _run();
SOCKET _setDescriptors(); // TODO: move to implementation
void _setDescriptors();
void _installBindings();
void _installBindings(const ftl::net::PeerPtr&);
//bool _subscribe(const std::string &res);
void _cleanupPeers();
void _notifyConnect(ftl::net::Peer *);
void _notifyDisconnect(ftl::net::Peer *);
......
......@@ -37,7 +37,7 @@ TEST_CASE("Listen and Connect", "[net]") {
REQUIRE( p->waitConnection(5) );
REQUIRE( self->numberOfNodes() == 1 );
REQUIRE( self->waitConnections(5) == 1 );
REQUIRE( ftl::getSelf()->numberOfNodes() == 1);
}
......@@ -65,10 +65,6 @@ TEST_CASE("Listen and Connect", "[net]") {
}
SECTION("automatic reconnect from originating connection") {
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort());
auto p_connecting = ftl::connectNode(uri);
......@@ -82,10 +78,6 @@ TEST_CASE("Listen and Connect", "[net]") {
}
SECTION("automatic reconnect from remote termination") {
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
auto uri = "tcp://localhost:" + std::to_string(self->getListeningURIs().front().getPort());
auto p_connecting = ftl::connectNode(uri);
......@@ -97,11 +89,12 @@ TEST_CASE("Listen and Connect", "[net]") {
auto nodes = self->getNodes();
REQUIRE( nodes.size() == 1 );
for (auto &node : nodes) {
node->waitConnection(5);
node->close();
}
bool r = try_for(500, [p_connecting]{ return p_connecting->connectionCount() >= 2; });
// REQUIRE( r );
REQUIRE( r );
}
ftl::protocol::reset();
......
......@@ -23,14 +23,14 @@ static std::vector<DTYPE> data_test;
static std::atomic_uint64_t recv_cnt_ = 0;
static auto t_last_recv_ = std::chrono::steady_clock::now();
static void recv_data(std::vector<DTYPE> data) {
static void recv_data(const std::vector<DTYPE> &data) {
recv_cnt_.fetch_add(data.size() * sizeof(DTYPE));
t_last_recv_ = std::chrono::steady_clock::now();
}
static float peer_send(ftl::net::Peer* p, const std::vector<DTYPE>& data, int cnt) {
auto t_start = std::chrono::steady_clock::now();
auto t_stop = std::chrono::steady_clock::now();
decltype(t_start) t_stop;
size_t bytes_sent = 0;
size_t bytes = data.size() * sizeof(DTYPE);
......
......@@ -32,6 +32,7 @@ TEST_CASE("TCP Stream", "[net]") {
REQUIRE( s2 );
ftl::protocol::Packet rpkt;
rpkt.bitrate = 20;
auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) {
rpkt = pkt;
......@@ -57,7 +58,8 @@ TEST_CASE("TCP Stream", "[net]") {
pkt.frame_count = 1;
s1->post(spkt, pkt);
REQUIRE(cv.wait_for(lk, std::chrono::seconds(5)) == std::cv_status::no_timeout);
bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; });
REQUIRE( r );
REQUIRE( rpkt.bitrate == 10 );
REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG );
REQUIRE( rpkt.frame_count == 1 );
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment