diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index a9476e173332deb453259401651047a24fe0e739..9b75261896fe524e85ccf0e142d6c6e9224de14a 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -206,6 +206,7 @@ class Peer { // Send buffers msgpack::vrefbuffer send_buf_; + std::mutex send_mtx_; std::string uri_; ftl::UUID peerid_; @@ -223,6 +224,7 @@ class Peer { template <typename... ARGS> int Peer::send(const std::string &s, ARGS... args) { + std::unique_lock<std::mutex> lk(send_mtx_); // Leave a blank entry for websocket header if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); auto args_obj = std::make_tuple(args...); diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index bc171dd50b4234de2fcb796bcc8b0a39b3363e83..a5ea6ed887090f9dd439e4f7511a8fbc8238e454 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -51,7 +51,7 @@ using ftl::net::Dispatcher; int Peer::rpcid__ = 0; -static ctpl::thread_pool pool(1); +static ctpl::thread_pool pool(5); // TODO(nick) Move to tcp_internal.cpp static int tcpConnect(URI &uri) { @@ -308,6 +308,7 @@ void Peer::error(int e) { } void Peer::data() { + //if (!is_waiting_) return; is_waiting_ = false; pool.push([](int id, Peer *p) { p->_data(); @@ -318,14 +319,19 @@ void Peer::data() { bool Peer::_data() { //std::unique_lock<std::mutex> lk(recv_mtx_); + std::cout << "BEGIN DATA" << std::endl; recv_buf_.reserve_buffer(kMaxMessage); - size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); + int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); + + if (rc < 0) { + std::cout << "ERR = " << std::to_string(errno) << std::endl; + return false; + } + recv_buf_.buffer_consumed(rc); msgpack::object_handle msg; while (recv_buf_.next(msg)) { - std::cout << "RECEIVING DATA" << std::endl; - msgpack::object obj = msg.get(); if (status_ != kConnected) { // First message must be a handshake diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp index 83d8700130e7ba74a7aab3b14a91c7b7d5428287..c0691a803f3ea897334c02bebb577edfac5854ae 100644 --- a/net/cpp/test/net_integration.cpp +++ b/net/cpp/test/net_integration.cpp @@ -70,6 +70,40 @@ TEST_CASE("Universe::connect()", "[net]") { //fin_server(); } +TEST_CASE("Universe::broadcast()", "[net]") { + Universe a("ftl://utu.fi"); + Universe b("ftl://utu.fi"); + + a.listen("tcp://localhost:7077"); + + SECTION("no arguments to no peers") { + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("done"); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + + SECTION("no arguments to one peer") { + b.connect("tcp://localhost:7077"); + while (a.numberOfPeers() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("hello"); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + REQUIRE( done ); + } +} + /*TEST_CASE("net::listen()", "[net]") { SECTION("tcp any interface") {