diff --git a/net/cpp/include/ctpl_stl.h b/net/cpp/include/ctpl_stl.h new file mode 100644 index 0000000000000000000000000000000000000000..5956cf095dfbad4402c400a7015bfa590aa09aab --- /dev/null +++ b/net/cpp/include/ctpl_stl.h @@ -0,0 +1,251 @@ +/********************************************************* +* +* Copyright (C) 2014 by Vitaliy Vitsentiy +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*********************************************************/ + + +#ifndef __ctpl_stl_thread_pool_H__ +#define __ctpl_stl_thread_pool_H__ + +#include <functional> +#include <thread> +#include <atomic> +#include <vector> +#include <memory> +#include <exception> +#include <future> +#include <mutex> +#include <queue> + + + +// thread pool to run user's functors with signature +// ret func(int id, other_params) +// where id is the index of the thread that runs the functor +// ret is some return type + + +namespace ctpl { + + namespace detail { + template <typename T> + class Queue { + public: + bool push(T const & value) { + std::unique_lock<std::mutex> lock(this->mutex); + this->q.push(value); + return true; + } + // deletes the retrieved element, do not use for non integral types + bool pop(T & v) { + std::unique_lock<std::mutex> lock(this->mutex); + if (this->q.empty()) + return false; + v = this->q.front(); + this->q.pop(); + return true; + } + bool empty() { + std::unique_lock<std::mutex> lock(this->mutex); + return this->q.empty(); + } + private: + std::queue<T> q; + std::mutex mutex; + }; + } + + class thread_pool { + + public: + + thread_pool() { this->init(); } + thread_pool(int nThreads) { this->init(); this->resize(nThreads); } + + // the destructor waits for all the functions in the queue to be finished + ~thread_pool() { + this->stop(true); + } + + // get the number of running threads in the pool + int size() { return static_cast<int>(this->threads.size()); } + + // number of idle threads + int n_idle() { return this->nWaiting; } + std::thread & get_thread(int i) { return *this->threads[i]; } + + // change the number of threads in the pool + // should be called from one thread, otherwise be careful to not interleave, also with this->stop() + // nThreads must be >= 0 + void resize(int nThreads) { + if (!this->isStop && !this->isDone) { + int oldNThreads = static_cast<int>(this->threads.size()); + if (oldNThreads <= nThreads) { // if the number of threads is increased + this->threads.resize(nThreads); + this->flags.resize(nThreads); + + for (int i = oldNThreads; i < nThreads; ++i) { + this->flags[i] = std::make_shared<std::atomic<bool>>(false); + this->set_thread(i); + } + } + else { // the number of threads is decreased + for (int i = oldNThreads - 1; i >= nThreads; --i) { + *this->flags[i] = true; // this thread will finish + this->threads[i]->detach(); + } + { + // stop the detached threads that were waiting + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_all(); + } + this->threads.resize(nThreads); // safe to delete because the threads are detached + this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals + } + } + } + + // empty the queue + void clear_queue() { + std::function<void(int id)> * _f; + while (this->q.pop(_f)) + delete _f; // empty the queue + } + + // pops a functional wrapper to the original function + std::function<void(int)> pop() { + std::function<void(int id)> * _f = nullptr; + this->q.pop(_f); + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + std::function<void(int)> f; + if (_f) + f = *_f; + return f; + } + + // wait for all computing threads to finish and stop all threads + // may be called asynchronously to not pause the calling thread while waiting + // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions + void stop(bool isWait = false) { + if (!isWait) { + if (this->isStop) + return; + this->isStop = true; + for (int i = 0, n = this->size(); i < n; ++i) { + *this->flags[i] = true; // command the threads to stop + } + this->clear_queue(); // empty the queue + } + else { + if (this->isDone || this->isStop) + return; + this->isDone = true; // give the waiting threads a command to finish + } + { + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_all(); // stop all waiting threads + } + for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish + if (this->threads[i]->joinable()) + this->threads[i]->join(); + } + // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads + // therefore delete them here + this->clear_queue(); + this->threads.clear(); + this->flags.clear(); + } + + template<typename F, typename... Rest> + auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> { + auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>( + std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...) + ); + auto _f = new std::function<void(int id)>([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + // run the user's function that excepts argument int - id of the running thread. returned value is templatized + // operator returns std::future, where the user can get the result and rethrow the catched exceptins + template<typename F> + auto push(F && f) ->std::future<decltype(f(0))> { + auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f)); + auto _f = new std::function<void(int id)>([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + + private: + + // deleted + thread_pool(const thread_pool &);// = delete; + thread_pool(thread_pool &&);// = delete; + thread_pool & operator=(const thread_pool &);// = delete; + thread_pool & operator=(thread_pool &&);// = delete; + + void set_thread(int i) { + std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag + auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { + std::atomic<bool> & _flag = *flag; + std::function<void(int id)> * _f; + bool isPop = this->q.pop(_f); + while (true) { + while (isPop) { // if there is anything in the queue + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + (*_f)(i); + if (_flag) + return; // the thread is wanted to stop, return even if the queue is not empty yet + else + isPop = this->q.pop(_f); + } + // the queue is empty here, wait for the next command + std::unique_lock<std::mutex> lock(this->mutex); + ++this->nWaiting; + this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); + --this->nWaiting; + if (!isPop) + return; // if the queue is empty and this->isDone == true or *flag then return + } + }; + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + } + + void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } + + std::vector<std::unique_ptr<std::thread>> threads; + std::vector<std::shared_ptr<std::atomic<bool>>> flags; + detail::Queue<std::function<void(int id)> *> q; + std::atomic<bool> isDone; + std::atomic<bool> isStop; + std::atomic<int> nWaiting; // how many threads are waiting + + std::mutex mutex; + std::condition_variable cv; + }; + +} + +#endif // __ctpl_stl_thread_pool_H__ diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index a7756fbdcfbced330fe1dc341d13e30078b50c36..a9476e173332deb453259401651047a24fe0e739 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -140,14 +140,18 @@ class Peer { void onConnect(std::function<void()> &f); void onDisconnect(std::function<void()> &f) {} + bool isWaiting() const { return is_waiting_; } + public: static const int kMaxMessage = 10*1024*1024; // 10Mb currently protected: - bool data(); // Process one message from socket + void data(); // Process one message from socket void socketError(); // Process one error from socket void error(int e); + bool _data(); + void _dispatchResponse(uint32_t id, msgpack::object &obj); void _sendResponse(uint32_t id, const msgpack::object &obj); @@ -196,7 +200,9 @@ class Peer { bool destroy_disp_; // Receive buffers + bool is_waiting_; msgpack::unpacker recv_buf_; + std::mutex recv_mtx_; // Send buffers msgpack::vrefbuffer send_buf_; diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index 5e2584f94dcec4559a181411d11da2efbdbf979e..d34b842fc4499231ac1e74ccc30a92e9b4b0c140 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -51,6 +51,8 @@ class Universe { */ bool connect(const std::string &addr); + int numberOfPeers() const { return peers_.size(); } + /** * Bind a function to an RPC or service call name. This will implicitely * be called by any peer making the request. diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index 7b1955f9aafa6644a97143e2fd0ddd9fcd032a2e..bc171dd50b4234de2fcb796bcc8b0a39b3363e83 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -1,5 +1,6 @@ #define GLOG_NO_ABBREVIATED_SEVERITIES #include <glog/logging.h> +#include <ctpl_stl.h> #include <fcntl.h> #ifdef WIN32 @@ -50,6 +51,8 @@ using ftl::net::Dispatcher; int Peer::rpcid__ = 0; +static ctpl::thread_pool pool(1); + // TODO(nick) Move to tcp_internal.cpp static int tcpConnect(URI &uri) { int rc; @@ -135,10 +138,13 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { destroy_disp_ = true; } + is_waiting_ = true; + // Send the initiating handshake if valid if (status_ == kConnecting) { // Install return handshake handler. bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { + LOG(INFO) << "Handshake 2 received"; if (magic != ftl::net::kMagic) { close(); LOG(ERROR) << "Invalid magic during handshake"; @@ -204,9 +210,12 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { LOG(ERROR) << "Unrecognised connection protocol: " << pUri; } + is_waiting_ = true; + if (status_ == kConnecting) { // Install return handshake handler. bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { + LOG(INFO) << "Handshake 1 received"; if (magic != ftl::net::kMagic) { close(); LOG(ERROR) << "Invalid magic during handshake"; @@ -298,13 +307,25 @@ void Peer::error(int e) { } -bool Peer::data() { +void Peer::data() { + is_waiting_ = false; + pool.push([](int id, Peer *p) { + p->_data(); + p->is_waiting_ = true; + }, this); +} + +bool Peer::_data() { + //std::unique_lock<std::mutex> lk(recv_mtx_); + recv_buf_.reserve_buffer(kMaxMessage); size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); recv_buf_.buffer_consumed(rc); msgpack::object_handle msg; while (recv_buf_.next(msg)) { + std::cout << "RECEIVING DATA" << std::endl; + msgpack::object obj = msg.get(); if (status_ != kConnected) { // First message must be a handshake diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index ea8f3159fe836ddeeec0811252b98f6dd9f0bac1..ffe195fc8ed9e7b0edd77b6972fc6f1ba1b9f311 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -77,7 +77,9 @@ int Universe::_setDescriptors() { n = s->_socket(); } - FD_SET(s->_socket(), &sfdread_); + if (s->isWaiting()) { + FD_SET(s->_socket(), &sfdread_); + } FD_SET(s->_socket(), &sfderror_); } } @@ -105,8 +107,8 @@ void Universe::_run() { int selres = 1; //Wait for a network event or timeout in 3 seconds - block.tv_sec = 3; - block.tv_usec = 0; + block.tv_sec = 0; + block.tv_usec = 10000; selres = select(n+1, &sfdread_, 0, &sfderror_, &block); //Some kind of error occured, it is usually possible to recover from this. @@ -151,7 +153,12 @@ void Universe::_run() { if (s != NULL && s->isValid()) { //If message received from this client then deal with it if (FD_ISSET(s->_socket(), &sfdread_)) { - s->data(); + //s->data(); + //std::cout << "QUEUE DATA PROC" << std::endl; + //p.push([](int id, Peer *s) { + // std::cout << "Processing in thread " << std::to_string(id) << std::endl; + s->data(); + //}, s); } if (FD_ISSET(s->_socket(), &sfderror_)) { s->socketError(); diff --git a/net/cpp/test/CMakeLists.txt b/net/cpp/test/CMakeLists.txt index ce0044c9269757089e7a5243ced4f3dec40ec41a..c50a4d6dfe31deab2cfc5501729846c1e94e657d 100644 --- a/net/cpp/test/CMakeLists.txt +++ b/net/cpp/test/CMakeLists.txt @@ -33,14 +33,17 @@ target_link_libraries(uri_unit # ${UUID_LIBRARIES}) ### Net Integration ############################################################ -#add_executable(net_integration -# ./tests.cpp -# ./net_integration.cpp) -#add_dependencies(net_integration ftlnet) -#target_link_libraries(net_integration -# ftlnet -# ${URIPARSER_LIBRARIES} -# glog::glog) +add_executable(net_integration + ./tests.cpp + ../../../common/cpp/src/config.cpp + ./net_integration.cpp) +add_dependencies(net_integration ftlnet) +target_link_libraries(net_integration + ftlnet + ${URIPARSER_LIBRARIES} + glog::glog + Threads::Threads + ${UUID_LIBRARIES}) @@ -49,7 +52,7 @@ target_link_libraries(uri_unit add_test(URIUnitTest uri_unit) #add_test(ProtocolUnitTest protocol_unit) add_test(PeerUnitTest peer_unit) -#add_test(NetIntegrationTest net_integration) +add_test(NetIntegrationTest net_integration) add_custom_target(tests) add_dependencies(tests peer_unit uri_unit) diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp index 38712c5c0e169b4ab393f553709979fda04930dd..83d8700130e7ba74a7aab3b14a91c7b7d5428287 100644 --- a/net/cpp/test/net_integration.cpp +++ b/net/cpp/test/net_integration.cpp @@ -1,134 +1,49 @@ #include "catch.hpp" #include <ftl/net.hpp> -#include <ftl/net/socket.hpp> -#include <ftl/net/listener.hpp> -#include <memory> -#include <iostream> +#include <thread> +#include <chrono> -using ftl::net::Socket; -using ftl::net::Protocol; -using std::shared_ptr; +using ftl::net::Universe; // --- Support ----------------------------------------------------------------- -#ifndef WIN32 -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netdb.h> -#include <arpa/inet.h> -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#endif - -#ifdef WIN32 -#include <winsock2.h> -#include <Ws2tcpip.h> -#include <windows.h> - -#pragma comment(lib, "Ws2_32.lib") -#endif - -static int ssock = INVALID_SOCKET; -static sockaddr_in slocalAddr; - -void fin_server() { - //int t = 1; - //setsockopt(ssock,SOL_SOCKET,SO_REUSEADDR,&t,sizeof(int)); - - #ifndef WIN32 - if (ssock != INVALID_SOCKET) close(ssock); - #else - if (ssock != INVALID_SOCKET) closesocket(ssock); - #endif - - ssock = INVALID_SOCKET; -} - -void init_server() { - //fin_server(); - int port = 7077; - - #ifdef WIN32 - WSAData wsaData; - //If Win32 then load winsock - if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { - std::cerr << "Socket error\n"; - return; - } - #endif - - ssock = socket(AF_INET, SOCK_STREAM, 0); - if (ssock == INVALID_SOCKET) { - std::cerr << "Socket error 1\n"; - return; - } - - int enable = 1; - if (setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, (char*)&enable, sizeof(int)) < 0) - std::cerr << "setsockopt(SO_REUSEADDR) failed" << std::endl; - - //Specify listen port and address - //memset(&s_localAddr, 0, sizeof(s_localAddr)); - slocalAddr.sin_family = AF_INET; - slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY); - slocalAddr.sin_port = htons(port); - - int rc = bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr)); - - if (rc == SOCKET_ERROR) { - std::cerr << "Socket error 2\n"; - - #ifndef WIN32 - close(ssock); - #else - closesocket(ssock); - #endif - ssock = INVALID_SOCKET; - return; - } - - //Attempt to start listening for connection requests. - rc = ::listen(ssock, 1); - - if (rc == SOCKET_ERROR) { - std::cerr << "Socket error 3\n"; - - #ifndef WIN32 - close(ssock); - #else - closesocket(ssock); - #endif - ssock = INVALID_SOCKET; - return; - } -} - // --- Tests ------------------------------------------------------------------- -TEST_CASE("net::connect()", "[net]") { - init_server(); - shared_ptr<Socket> sock = nullptr; +TEST_CASE("Universe::connect()", "[net]") { + Universe a("ftl://utu.fi"); + Universe b("ftl://utu.fi"); + + a.listen("tcp://localhost:7077"); SECTION("valid tcp connection using ipv4") { - sock = ftl::net::connect("tcp://127.0.0.1:7077"); - REQUIRE(sock != nullptr); - REQUIRE(sock->isValid()); + REQUIRE( b.connect("tcp://127.0.0.1:7077") ); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + REQUIRE( a.numberOfPeers() == 1 ); + REQUIRE( b.numberOfPeers() == 1 ); } SECTION("valid tcp connection using hostname") { - sock = ftl::net::connect("tcp://localhost:7077"); - REQUIRE(sock->isValid()); + REQUIRE( b.connect("tcp://localhost:7077") ); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + REQUIRE( a.numberOfPeers() == 1 ); + REQUIRE( b.numberOfPeers() == 1 ); } SECTION("invalid protocol") { - sock = ftl::net::connect("http://127.0.0.1:7077"); - REQUIRE(!sock->isValid()); + REQUIRE( !b.connect("http://127.0.0.1:7077") ); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + REQUIRE( a.numberOfPeers() == 0 ); + REQUIRE( b.numberOfPeers() == 0 ); } - SECTION("empty uri") { + /*SECTION("empty uri") { sock = ftl::net::connect(""); REQUIRE(!sock->isValid()); } @@ -136,7 +51,7 @@ TEST_CASE("net::connect()", "[net]") { SECTION("null uri") { sock = ftl::net::connect(NULL); REQUIRE(!sock->isValid()); - } + }*/ // Disabled due to long timeout /*SECTION("incorrect ipv4 address") { @@ -152,10 +67,10 @@ TEST_CASE("net::connect()", "[net]") { REQUIRE(!sock->isValid()); }*/ - fin_server(); + //fin_server(); } -TEST_CASE("net::listen()", "[net]") { +/*TEST_CASE("net::listen()", "[net]") { SECTION("tcp any interface") { REQUIRE( ftl::net::listen("tcp://localhost:9001")->isListening() ); @@ -229,5 +144,5 @@ TEST_CASE("Net Integration", "[integrate]") { // TODO s2->wait(100); REQUIRE( data == "hello world" ); -} +}*/