Skip to content
Snippets Groups Projects
Commit 485d1bdb authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Make peer send threadsafe, add broadcast tests and fix bug of not handing recv errors

parent 6f5f2350
No related branches found
No related tags found
No related merge requests found
......@@ -206,6 +206,7 @@ class Peer {
// Send buffers
msgpack::vrefbuffer send_buf_;
std::mutex send_mtx_;
std::string uri_;
ftl::UUID peerid_;
......@@ -223,6 +224,7 @@ class Peer {
template <typename... ARGS>
int Peer::send(const std::string &s, ARGS... args) {
std::unique_lock<std::mutex> lk(send_mtx_);
// Leave a blank entry for websocket header
if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0);
auto args_obj = std::make_tuple(args...);
......
......@@ -51,7 +51,7 @@ using ftl::net::Dispatcher;
int Peer::rpcid__ = 0;
static ctpl::thread_pool pool(1);
static ctpl::thread_pool pool(5);
// TODO(nick) Move to tcp_internal.cpp
static int tcpConnect(URI &uri) {
......@@ -308,6 +308,7 @@ void Peer::error(int e) {
}
void Peer::data() {
//if (!is_waiting_) return;
is_waiting_ = false;
pool.push([](int id, Peer *p) {
p->_data();
......@@ -318,14 +319,19 @@ void Peer::data() {
bool Peer::_data() {
//std::unique_lock<std::mutex> lk(recv_mtx_);
std::cout << "BEGIN DATA" << std::endl;
recv_buf_.reserve_buffer(kMaxMessage);
size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0);
if (rc < 0) {
std::cout << "ERR = " << std::to_string(errno) << std::endl;
return false;
}
recv_buf_.buffer_consumed(rc);
msgpack::object_handle msg;
while (recv_buf_.next(msg)) {
std::cout << "RECEIVING DATA" << std::endl;
msgpack::object obj = msg.get();
if (status_ != kConnected) {
// First message must be a handshake
......
......@@ -70,6 +70,40 @@ TEST_CASE("Universe::connect()", "[net]") {
//fin_server();
}
TEST_CASE("Universe::broadcast()", "[net]") {
Universe a("ftl://utu.fi");
Universe b("ftl://utu.fi");
a.listen("tcp://localhost:7077");
SECTION("no arguments to no peers") {
bool done = false;
a.bind("hello", [&done]() {
done = true;
});
b.broadcast("done");
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
SECTION("no arguments to one peer") {
b.connect("tcp://localhost:7077");
while (a.numberOfPeers() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20));
bool done = false;
a.bind("hello", [&done]() {
done = true;
});
b.broadcast("hello");
std::this_thread::sleep_for(std::chrono::milliseconds(200));
REQUIRE( done );
}
}
/*TEST_CASE("net::listen()", "[net]") {
SECTION("tcp any interface") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment