diff --git a/net/cpp/CMakeLists.txt b/net/cpp/CMakeLists.txt index 7f7f63884865a3ec0acb946d4370112321238fa3..d6c361ca7d33db814cd79f8a7c67575afc00b101 100644 --- a/net/cpp/CMakeLists.txt +++ b/net/cpp/CMakeLists.txt @@ -17,7 +17,7 @@ target_include_directories(ftlnet PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:include> PRIVATE src) -target_link_libraries(ftlnet Threads::Threads glog::glog) +target_link_libraries(ftlnet Threads::Threads glog::glog ${UUID_LIBRARIES} ${URIPARSER_LIBRARIES}) install(TARGETS ftlnet EXPORT ftlnet-config ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} diff --git a/net/cpp/include/ctpl_stl.h b/net/cpp/include/ctpl_stl.h new file mode 100644 index 0000000000000000000000000000000000000000..5956cf095dfbad4402c400a7015bfa590aa09aab --- /dev/null +++ b/net/cpp/include/ctpl_stl.h @@ -0,0 +1,251 @@ +/********************************************************* +* +* Copyright (C) 2014 by Vitaliy Vitsentiy +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*********************************************************/ + + +#ifndef __ctpl_stl_thread_pool_H__ +#define __ctpl_stl_thread_pool_H__ + +#include <functional> +#include <thread> +#include <atomic> +#include <vector> +#include <memory> +#include <exception> +#include <future> +#include <mutex> +#include <queue> + + + +// thread pool to run user's functors with signature +// ret func(int id, other_params) +// where id is the index of the thread that runs the functor +// ret is some return type + + +namespace ctpl { + + namespace detail { + template <typename T> + class Queue { + public: + bool push(T const & value) { + std::unique_lock<std::mutex> lock(this->mutex); + this->q.push(value); + return true; + } + // deletes the retrieved element, do not use for non integral types + bool pop(T & v) { + std::unique_lock<std::mutex> lock(this->mutex); + if (this->q.empty()) + return false; + v = this->q.front(); + this->q.pop(); + return true; + } + bool empty() { + std::unique_lock<std::mutex> lock(this->mutex); + return this->q.empty(); + } + private: + std::queue<T> q; + std::mutex mutex; + }; + } + + class thread_pool { + + public: + + thread_pool() { this->init(); } + thread_pool(int nThreads) { this->init(); this->resize(nThreads); } + + // the destructor waits for all the functions in the queue to be finished + ~thread_pool() { + this->stop(true); + } + + // get the number of running threads in the pool + int size() { return static_cast<int>(this->threads.size()); } + + // number of idle threads + int n_idle() { return this->nWaiting; } + std::thread & get_thread(int i) { return *this->threads[i]; } + + // change the number of threads in the pool + // should be called from one thread, otherwise be careful to not interleave, also with this->stop() + // nThreads must be >= 0 + void resize(int nThreads) { + if (!this->isStop && !this->isDone) { + int oldNThreads = static_cast<int>(this->threads.size()); + if (oldNThreads <= nThreads) { // if the number of threads is increased + this->threads.resize(nThreads); + this->flags.resize(nThreads); + + for (int i = oldNThreads; i < nThreads; ++i) { + this->flags[i] = std::make_shared<std::atomic<bool>>(false); + this->set_thread(i); + } + } + else { // the number of threads is decreased + for (int i = oldNThreads - 1; i >= nThreads; --i) { + *this->flags[i] = true; // this thread will finish + this->threads[i]->detach(); + } + { + // stop the detached threads that were waiting + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_all(); + } + this->threads.resize(nThreads); // safe to delete because the threads are detached + this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals + } + } + } + + // empty the queue + void clear_queue() { + std::function<void(int id)> * _f; + while (this->q.pop(_f)) + delete _f; // empty the queue + } + + // pops a functional wrapper to the original function + std::function<void(int)> pop() { + std::function<void(int id)> * _f = nullptr; + this->q.pop(_f); + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + std::function<void(int)> f; + if (_f) + f = *_f; + return f; + } + + // wait for all computing threads to finish and stop all threads + // may be called asynchronously to not pause the calling thread while waiting + // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions + void stop(bool isWait = false) { + if (!isWait) { + if (this->isStop) + return; + this->isStop = true; + for (int i = 0, n = this->size(); i < n; ++i) { + *this->flags[i] = true; // command the threads to stop + } + this->clear_queue(); // empty the queue + } + else { + if (this->isDone || this->isStop) + return; + this->isDone = true; // give the waiting threads a command to finish + } + { + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_all(); // stop all waiting threads + } + for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish + if (this->threads[i]->joinable()) + this->threads[i]->join(); + } + // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads + // therefore delete them here + this->clear_queue(); + this->threads.clear(); + this->flags.clear(); + } + + template<typename F, typename... Rest> + auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> { + auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>( + std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...) + ); + auto _f = new std::function<void(int id)>([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + // run the user's function that excepts argument int - id of the running thread. returned value is templatized + // operator returns std::future, where the user can get the result and rethrow the catched exceptins + template<typename F> + auto push(F && f) ->std::future<decltype(f(0))> { + auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f)); + auto _f = new std::function<void(int id)>([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + + private: + + // deleted + thread_pool(const thread_pool &);// = delete; + thread_pool(thread_pool &&);// = delete; + thread_pool & operator=(const thread_pool &);// = delete; + thread_pool & operator=(thread_pool &&);// = delete; + + void set_thread(int i) { + std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag + auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { + std::atomic<bool> & _flag = *flag; + std::function<void(int id)> * _f; + bool isPop = this->q.pop(_f); + while (true) { + while (isPop) { // if there is anything in the queue + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + (*_f)(i); + if (_flag) + return; // the thread is wanted to stop, return even if the queue is not empty yet + else + isPop = this->q.pop(_f); + } + // the queue is empty here, wait for the next command + std::unique_lock<std::mutex> lock(this->mutex); + ++this->nWaiting; + this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); + --this->nWaiting; + if (!isPop) + return; // if the queue is empty and this->isDone == true or *flag then return + } + }; + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + } + + void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } + + std::vector<std::unique_ptr<std::thread>> threads; + std::vector<std::shared_ptr<std::atomic<bool>>> flags; + detail::Queue<std::function<void(int id)> *> q; + std::atomic<bool> isDone; + std::atomic<bool> isStop; + std::atomic<int> nWaiting; // how many threads are waiting + + std::mutex mutex; + std::condition_variable cv; + }; + +} + +#endif // __ctpl_stl_thread_pool_H__ diff --git a/net/cpp/include/ftl/net/dispatcher.hpp b/net/cpp/include/ftl/net/dispatcher.hpp index 46505835223ad64419d6d1db95676e4888150206..ba240b8c25c8989705b80e76108289e142f79e42 100644 --- a/net/cpp/include/ftl/net/dispatcher.hpp +++ b/net/cpp/include/ftl/net/dispatcher.hpp @@ -15,6 +15,7 @@ #include <vector> #include <string> #include <unordered_map> +#include <optional> namespace ftl { @@ -46,7 +47,7 @@ class Peer; class Dispatcher { public: - Dispatcher() {} + explicit Dispatcher(Dispatcher *parent=nullptr) : parent_(parent) {} //void dispatch(Peer &, const std::string &msg); void dispatch(Peer &, const msgpack::object &msg); @@ -134,8 +135,11 @@ class Dispatcher { std::tuple<uint32_t, uint32_t, msgpack::object, msgpack::object>; private: + Dispatcher *parent_; std::unordered_map<std::string, adaptor_type> funcs_; + std::optional<adaptor_type> _locateHandler(const std::string &name) const; + static void enforce_arg_count(std::string const &func, std::size_t found, std::size_t expected); diff --git a/net/cpp/include/ftl/net/peer.hpp b/net/cpp/include/ftl/net/peer.hpp index 16cad5138518d010df7b64a8d5b27ad6b091e27d..0b9b812288b06ebca15f2b9ba23e0d22c72ca664 100644 --- a/net/cpp/include/ftl/net/peer.hpp +++ b/net/cpp/include/ftl/net/peer.hpp @@ -20,6 +20,10 @@ #include <tuple> #include <vector> #include <type_traits> +#include <thread> +#include <mutex> +#include <condition_variable> +#include <chrono> # define ENABLE_IF(...) \ typename std::enable_if<(__VA_ARGS__), bool>::type = true @@ -56,6 +60,7 @@ struct decrypt{};*/ class Peer { public: friend class Universe; + friend class Dispatcher; enum Status { kInvalid, kConnecting, kConnected, kDisconnected, kReconnecting @@ -135,14 +140,21 @@ class Peer { void onConnect(std::function<void()> &f); void onDisconnect(std::function<void()> &f) {} + bool isWaiting() const { return is_waiting_; } + public: static const int kMaxMessage = 10*1024*1024; // 10Mb currently protected: - bool data(); // Process one message from socket + void data(); // Process one message from socket void socketError(); // Process one error from socket void error(int e); + bool _data(); + + void _dispatchResponse(uint32_t id, msgpack::object &obj); + void _sendResponse(uint32_t id, const msgpack::object &obj); + /** * Get the internal OS dependent socket. * TODO(nick) Work out if this should be private. @@ -161,7 +173,6 @@ class Peer { private: // Functions void _connected(); void _updateURI(); - void _dispatchReturn(const std::string &d); int _send(); @@ -186,13 +197,15 @@ class Peer { int sock_; ftl::URI::scheme_t scheme_; uint32_t version_; - bool destroy_disp_; // Receive buffers + bool is_waiting_; msgpack::unpacker recv_buf_; + std::mutex recv_mtx_; // Send buffers msgpack::vrefbuffer send_buf_; + std::mutex send_mtx_; std::string uri_; ftl::UUID peerid_; @@ -210,6 +223,7 @@ class Peer { template <typename... ARGS> int Peer::send(const std::string &s, ARGS... args) { + std::unique_lock<std::mutex> lk(send_mtx_); // Leave a blank entry for websocket header if (scheme_ == ftl::URI::SCHEME_WS) send_buf_.append_ref(nullptr,0); auto args_obj = std::make_tuple(args...); @@ -228,17 +242,26 @@ void Peer::bind(const std::string &name, F func) { template <typename R, typename... ARGS> R Peer::call(const std::string &name, ARGS... args) { bool hasreturned = false; + std::mutex m; + std::condition_variable cv; + R result; - asyncCall<R>(name, [&result,&hasreturned](const R &r) { + asyncCall<R>(name, [&](const R &r) { + std::unique_lock<std::mutex> lk(m); hasreturned = true; result = r; + lk.unlock(); + cv.notify_one(); }, std::forward<ARGS>(args)...); - // Loop the network - int limit = 10; - while (limit > 0 && !hasreturned) { - limit--; - // TODO REPLACE ftl::net::wait(); + { // Block thread until async callback notifies us + std::unique_lock<std::mutex> lk(m); + cv.wait_for(lk, std::chrono::seconds(1), [&hasreturned]{return hasreturned;}); + } + + if (!hasreturned) { + // TODO(nick) remove callback + throw 1; } return result; @@ -255,13 +278,12 @@ void Peer::asyncCall( LOG(INFO) << "RPC " << name << "() -> " << uri_; - std::stringstream buf; - msgpack::pack(buf, call_obj); + msgpack::pack(send_buf_, call_obj); // Register the CB callbacks_[rpcid] = std::make_unique<caller<T>>(cb); - send("__rpc__", buf.str()); + _send(); } }; diff --git a/net/cpp/include/ftl/net/universe.hpp b/net/cpp/include/ftl/net/universe.hpp index 5e2584f94dcec4559a181411d11da2efbdbf979e..d34b842fc4499231ac1e74ccc30a92e9b4b0c140 100644 --- a/net/cpp/include/ftl/net/universe.hpp +++ b/net/cpp/include/ftl/net/universe.hpp @@ -51,6 +51,8 @@ class Universe { */ bool connect(const std::string &addr); + int numberOfPeers() const { return peers_.size(); } + /** * Bind a function to an RPC or service call name. This will implicitely * be called by any peer making the request. diff --git a/net/cpp/src/dispatcher.cpp b/net/cpp/src/dispatcher.cpp index 394307e1949297f6b5fd549b80307d64d975bf46..1e71b9b9d548cadc4596f90593ca043ff552d14b 100644 --- a/net/cpp/src/dispatcher.cpp +++ b/net/cpp/src/dispatcher.cpp @@ -8,6 +8,7 @@ using ftl::net::Peer; using ftl::net::Dispatcher; using std::vector; using std::string; +using std::optional; /*static std::string hexStr(const std::string &s) { @@ -48,43 +49,69 @@ void ftl::net::Dispatcher::dispatch(Peer &s, const msgpack::object &msg) { void ftl::net::Dispatcher::dispatch_call(Peer &s, const msgpack::object &msg) { call_t the_call; - msg.convert(the_call); + + try { + msg.convert(the_call); + } catch(...) { + LOG(ERROR) << "Bad message format"; + return; + } // TODO: proper validation of protocol (and responding to it) - // auto &&type = std::get<0>(the_call); - // assert(type == 0); - + auto &&type = std::get<0>(the_call); auto &&id = std::get<1>(the_call); - auto &&name = std::get<2>(the_call); - auto &&args = std::get<3>(the_call); + auto &&name = std::get<2>(the_call); + auto &&args = std::get<3>(the_call); + // assert(type == 0); - LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); - - auto it_func = funcs_.find(name); + if (type == 1) { + LOG(INFO) << "RPC return for " << id; + s._dispatchResponse(id, args); + } else if (type == 0) { + LOG(INFO) << "RPC " << name << "() <- " << s.getURI(); + + auto it_func = funcs_.find(name); + + if (it_func != end(funcs_)) { + try { + auto result = (it_func->second)(args); //->get(); + s._sendResponse(id, result->get()); + /*response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + s.send("__return__", buf.str());*/ + } catch (const std::exception &e) { + //throw; + //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; + /*response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + s.send("__return__", buf.str());*/ + } catch (int e) { + //throw; + //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; + /*response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); + std::stringstream buf; + msgpack::pack(buf, res_obj); + s.send("__return__", buf.str());*/ + } + } + } else { + // TODO(nick) Some error + } +} - if (it_func != end(funcs_)) { - try { - auto result = (it_func->second)(args); //->get(); - response_t res_obj = std::make_tuple(1,id,msgpack::object(),result->get()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str()); - } catch (const std::exception &e) { - //throw; - //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; - response_t res_obj = std::make_tuple(1,id,msgpack::object(e.what()),msgpack::object()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str()); - } catch (int e) { - //throw; - //LOG(ERROR) << "Exception when attempting to call RPC (" << e << ")"; - response_t res_obj = std::make_tuple(1,id,msgpack::object(e),msgpack::object()); - std::stringstream buf; - msgpack::pack(buf, res_obj); - s.send("__return__", buf.str()); +optional<Dispatcher::adaptor_type> ftl::net::Dispatcher::_locateHandler(const std::string &name) const { + auto it_func = funcs_.find(name); + if (it_func == end(funcs_)) { + if (parent_ != nullptr) { + return parent_->_locateHandler(name); + } else { + return {}; } - } + } else { + return it_func->second; + } } void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const &msg) { @@ -100,11 +127,11 @@ void ftl::net::Dispatcher::dispatch_notification(Peer &s, msgpack::object const LOG(INFO) << "NOTIFICATION " << name << "() <- " << s.getURI(); - auto it_func = funcs_.find(name); + auto binding = _locateHandler(name); - if (it_func != end(funcs_)) { + if (binding) { try { - auto result = (it_func->second)(args); + auto result = (*binding)(args); } catch (int e) { throw e; } @@ -124,6 +151,7 @@ void ftl::net::Dispatcher::enforce_arg_count(std::string const &func, std::size_ void ftl::net::Dispatcher::enforce_unique_name(std::string const &func) { auto pos = funcs_.find(func); if (pos != end(funcs_)) { + LOG(ERROR) << "RPC non unique binding for " << func; throw -1; } } diff --git a/net/cpp/src/peer.cpp b/net/cpp/src/peer.cpp index 868aa168d270d5f80420e396fad8fccca56f4ea5..bc969f91bc85afef40c4f0f7cf4c673ab7fe0149 100644 --- a/net/cpp/src/peer.cpp +++ b/net/cpp/src/peer.cpp @@ -1,5 +1,6 @@ #define GLOG_NO_ABBREVIATED_SEVERITIES #include <glog/logging.h> +#include <ctpl_stl.h> #include <fcntl.h> #ifdef WIN32 @@ -50,6 +51,8 @@ using ftl::net::Dispatcher; int Peer::rpcid__ = 0; +static ctpl::thread_pool pool(5); + // TODO(nick) Move to tcp_internal.cpp static int tcpConnect(URI &uri) { int rc; @@ -127,18 +130,15 @@ Peer::Peer(int s, Dispatcher *d) : sock_(s) { status_ = (s == INVALID_SOCKET) ? kInvalid : kConnecting; _updateURI(); - if (d != nullptr) { - disp_ = d; - destroy_disp_ = false; - } else { - disp_ = new Dispatcher(); - destroy_disp_ = true; - } + disp_ = new Dispatcher(d); + + is_waiting_ = true; // Send the initiating handshake if valid if (status_ == kConnecting) { // Install return handshake handler. bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { + LOG(INFO) << "Handshake 2 received"; if (magic != ftl::net::kMagic) { close(); LOG(ERROR) << "Invalid magic during handshake"; @@ -160,13 +160,7 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { status_ = kInvalid; sock_ = INVALID_SOCKET; - if (d != nullptr) { - disp_ = d; - destroy_disp_ = false; - } else { - disp_ = new Dispatcher(); - destroy_disp_ = true; - } + disp_ = new Dispatcher(d); scheme_ = uri.getProtocol(); if (uri.getProtocol() == URI::SCHEME_TCP) { @@ -204,9 +198,12 @@ Peer::Peer(const char *pUri, Dispatcher *d) : uri_(pUri) { LOG(ERROR) << "Unrecognised connection protocol: " << pUri; } + is_waiting_ = true; + if (status_ == kConnecting) { // Install return handshake handler. bind("__handshake__", [this](uint64_t magic, uint32_t version, UUID pid) { + LOG(INFO) << "Handshake 1 received"; if (magic != ftl::net::kMagic) { close(); LOG(ERROR) << "Invalid magic during handshake"; @@ -298,9 +295,25 @@ void Peer::error(int e) { } -bool Peer::data() { +void Peer::data() { + //if (!is_waiting_) return; + is_waiting_ = false; + pool.push([](int id, Peer *p) { + p->_data(); + p->is_waiting_ = true; + }, this); +} + +bool Peer::_data() { + //std::unique_lock<std::mutex> lk(recv_mtx_); + recv_buf_.reserve_buffer(kMaxMessage); - size_t rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); + int rc = ftl::net::internal::recv(sock_, recv_buf_.buffer(), kMaxMessage, 0); + + if (rc < 0) { + return false; + } + recv_buf_.buffer_consumed(rc); msgpack::object_handle msg; @@ -308,12 +321,18 @@ bool Peer::data() { msgpack::object obj = msg.get(); if (status_ != kConnected) { // First message must be a handshake - tuple<uint32_t, std::string, msgpack::object> hs; - obj.convert(hs); - - if (get<1>(hs) != "__handshake__") { + try { + tuple<uint32_t, std::string, msgpack::object> hs; + obj.convert(hs); + + if (get<1>(hs) != "__handshake__") { + close(); + LOG(ERROR) << "Missing handshake"; + return false; + } + } catch(...) { close(); - LOG(ERROR) << "Missing handshake"; + LOG(ERROR) << "Bad first message format"; return false; } } @@ -432,21 +451,7 @@ void Socket::handshake2() { _connected(); }*/ -void Peer::_dispatchReturn(const std::string &d) { - auto unpacked = msgpack::unpack(d.data(), d.size()); - Dispatcher::response_t the_result; - unpacked.get().convert(the_result); - - // Msgpack stipulates that 1 means return message - if (std::get<0>(the_result) != 1) { - LOG(ERROR) << "Bad RPC return message"; - return; - } - - auto &&id = std::get<1>(the_result); - //auto &&err = std::get<2>(the_result); - auto &&res = std::get<3>(the_result); - +void Peer::_dispatchResponse(uint32_t id, msgpack::object &res) { // TODO Handle error reporting... if (callbacks_.count(id) > 0) { @@ -460,6 +465,12 @@ void Peer::_dispatchReturn(const std::string &d) { } } +void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { + Dispatcher::response_t res_obj = std::make_tuple(1,id,msgpack::object(),res); + msgpack::pack(send_buf_, res_obj); + _send(); +} + void Peer::onConnect(std::function<void()> &f) { if (status_ == kConnected) { f(); @@ -516,9 +527,7 @@ int Peer::_send() { Peer::~Peer() { close(); - - if (destroy_disp_) { - delete disp_; - } + + delete disp_; } diff --git a/net/cpp/src/protocol.cpp b/net/cpp/src/protocol.cpp deleted file mode 100644 index 27f1146d6977035603ecf309bf1a2a2584fc8f24..0000000000000000000000000000000000000000 --- a/net/cpp/src/protocol.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#define GLOG_NO_ABBREVIATED_SEVERITIES -#include <glog/logging.h> -#include <ftl/net/socket.hpp> -#include <ftl/net/protocol.hpp> -#include <functional> -#include <iostream> - -using ftl::net::Socket; -using ftl::net::Protocol; - -std::map<std::string,Protocol*> Protocol::protocols__; - -Protocol *Protocol::find(const std::string &id) { - if (protocols__.count(id) > 0) return protocols__[id]; - else return NULL; -} - -Protocol::Protocol(const std::string &id) : id_(id) { - protocols__[id] = this; -} - -Protocol::~Protocol() { - protocols__.erase(id_); - // TODO Make sure all dependent sockets are closed! -} - -void Protocol::bind(int service, std::function<void(uint32_t,Socket&)> func) { - if (handlers_.count(service) == 0) { - handlers_[service] = func; - } else { - LOG(ERROR) << "Message service " << service << " already bound"; - } -} - -void Protocol::dispatchRPC(Socket &s, const std::string &d) { - disp_.dispatch(s,d); -} - -void Protocol::dispatchRaw(uint32_t service, Socket &s) { - // Lookup raw message handler - if (handlers_.count(service) > 0) { - handlers_[service](service, s); - } else { - LOG(ERROR) << "Unrecognised service request (" << service << ") from " << s.getURI(); - } -} - diff --git a/net/cpp/src/universe.cpp b/net/cpp/src/universe.cpp index ea8f3159fe836ddeeec0811252b98f6dd9f0bac1..ffe195fc8ed9e7b0edd77b6972fc6f1ba1b9f311 100644 --- a/net/cpp/src/universe.cpp +++ b/net/cpp/src/universe.cpp @@ -77,7 +77,9 @@ int Universe::_setDescriptors() { n = s->_socket(); } - FD_SET(s->_socket(), &sfdread_); + if (s->isWaiting()) { + FD_SET(s->_socket(), &sfdread_); + } FD_SET(s->_socket(), &sfderror_); } } @@ -105,8 +107,8 @@ void Universe::_run() { int selres = 1; //Wait for a network event or timeout in 3 seconds - block.tv_sec = 3; - block.tv_usec = 0; + block.tv_sec = 0; + block.tv_usec = 10000; selres = select(n+1, &sfdread_, 0, &sfderror_, &block); //Some kind of error occured, it is usually possible to recover from this. @@ -151,7 +153,12 @@ void Universe::_run() { if (s != NULL && s->isValid()) { //If message received from this client then deal with it if (FD_ISSET(s->_socket(), &sfdread_)) { - s->data(); + //s->data(); + //std::cout << "QUEUE DATA PROC" << std::endl; + //p.push([](int id, Peer *s) { + // std::cout << "Processing in thread " << std::to_string(id) << std::endl; + s->data(); + //}, s); } if (FD_ISSET(s->_socket(), &sfderror_)) { s->socketError(); diff --git a/net/cpp/test/CMakeLists.txt b/net/cpp/test/CMakeLists.txt index 71b2610d50239f007d2bddad0506742532b23ed0..c50a4d6dfe31deab2cfc5501729846c1e94e657d 100644 --- a/net/cpp/test/CMakeLists.txt +++ b/net/cpp/test/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(peer_unit target_link_libraries(peer_unit ${URIPARSER_LIBRARIES} glog::glog + Threads::Threads ${UUID_LIBRARIES}) ### URI ######################################################################## @@ -32,14 +33,17 @@ target_link_libraries(uri_unit # ${UUID_LIBRARIES}) ### Net Integration ############################################################ -#add_executable(net_integration -# ./tests.cpp -# ./net_integration.cpp) -#add_dependencies(net_integration ftlnet) -#target_link_libraries(net_integration -# ftlnet -# ${URIPARSER_LIBRARIES} -# glog::glog) +add_executable(net_integration + ./tests.cpp + ../../../common/cpp/src/config.cpp + ./net_integration.cpp) +add_dependencies(net_integration ftlnet) +target_link_libraries(net_integration + ftlnet + ${URIPARSER_LIBRARIES} + glog::glog + Threads::Threads + ${UUID_LIBRARIES}) @@ -48,7 +52,7 @@ target_link_libraries(uri_unit add_test(URIUnitTest uri_unit) #add_test(ProtocolUnitTest protocol_unit) add_test(PeerUnitTest peer_unit) -#add_test(NetIntegrationTest net_integration) +add_test(NetIntegrationTest net_integration) add_custom_target(tests) add_dependencies(tests peer_unit uri_unit) diff --git a/net/cpp/test/net_integration.cpp b/net/cpp/test/net_integration.cpp index 38712c5c0e169b4ab393f553709979fda04930dd..cc2d581d01baba343f170df30562ad8f4027c66a 100644 --- a/net/cpp/test/net_integration.cpp +++ b/net/cpp/test/net_integration.cpp @@ -1,134 +1,51 @@ #include "catch.hpp" #include <ftl/net.hpp> -#include <ftl/net/socket.hpp> -#include <ftl/net/listener.hpp> -#include <memory> -#include <iostream> +#include <thread> +#include <chrono> -using ftl::net::Socket; -using ftl::net::Protocol; -using std::shared_ptr; +using ftl::net::Universe; +using std::this_thread::sleep_for; +using std::chrono::milliseconds; // --- Support ----------------------------------------------------------------- -#ifndef WIN32 -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netdb.h> -#include <arpa/inet.h> -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#endif - -#ifdef WIN32 -#include <winsock2.h> -#include <Ws2tcpip.h> -#include <windows.h> - -#pragma comment(lib, "Ws2_32.lib") -#endif - -static int ssock = INVALID_SOCKET; -static sockaddr_in slocalAddr; - -void fin_server() { - //int t = 1; - //setsockopt(ssock,SOL_SOCKET,SO_REUSEADDR,&t,sizeof(int)); - - #ifndef WIN32 - if (ssock != INVALID_SOCKET) close(ssock); - #else - if (ssock != INVALID_SOCKET) closesocket(ssock); - #endif - - ssock = INVALID_SOCKET; -} - -void init_server() { - //fin_server(); - int port = 7077; - - #ifdef WIN32 - WSAData wsaData; - //If Win32 then load winsock - if (WSAStartup(MAKEWORD(1,1), &wsaData) != 0) { - std::cerr << "Socket error\n"; - return; - } - #endif - - ssock = socket(AF_INET, SOCK_STREAM, 0); - if (ssock == INVALID_SOCKET) { - std::cerr << "Socket error 1\n"; - return; - } - - int enable = 1; - if (setsockopt(ssock, SOL_SOCKET, SO_REUSEADDR, (char*)&enable, sizeof(int)) < 0) - std::cerr << "setsockopt(SO_REUSEADDR) failed" << std::endl; - - //Specify listen port and address - //memset(&s_localAddr, 0, sizeof(s_localAddr)); - slocalAddr.sin_family = AF_INET; - slocalAddr.sin_addr.s_addr = htonl(INADDR_ANY); - slocalAddr.sin_port = htons(port); - - int rc = bind(ssock, (struct sockaddr*)&slocalAddr, sizeof(slocalAddr)); - - if (rc == SOCKET_ERROR) { - std::cerr << "Socket error 2\n"; - - #ifndef WIN32 - close(ssock); - #else - closesocket(ssock); - #endif - ssock = INVALID_SOCKET; - return; - } - - //Attempt to start listening for connection requests. - rc = ::listen(ssock, 1); - - if (rc == SOCKET_ERROR) { - std::cerr << "Socket error 3\n"; - - #ifndef WIN32 - close(ssock); - #else - closesocket(ssock); - #endif - ssock = INVALID_SOCKET; - return; - } -} - // --- Tests ------------------------------------------------------------------- -TEST_CASE("net::connect()", "[net]") { - init_server(); - shared_ptr<Socket> sock = nullptr; +TEST_CASE("Universe::connect()", "[net]") { + Universe a("ftl://utu.fi"); + Universe b("ftl://utu.fi"); + + a.listen("tcp://localhost:7077"); SECTION("valid tcp connection using ipv4") { - sock = ftl::net::connect("tcp://127.0.0.1:7077"); - REQUIRE(sock != nullptr); - REQUIRE(sock->isValid()); + REQUIRE( b.connect("tcp://127.0.0.1:7077") ); + + sleep_for(milliseconds(100)); + + REQUIRE( a.numberOfPeers() == 1 ); + REQUIRE( b.numberOfPeers() == 1 ); } SECTION("valid tcp connection using hostname") { - sock = ftl::net::connect("tcp://localhost:7077"); - REQUIRE(sock->isValid()); + REQUIRE( b.connect("tcp://localhost:7077") ); + + sleep_for(milliseconds(100)); + + REQUIRE( a.numberOfPeers() == 1 ); + REQUIRE( b.numberOfPeers() == 1 ); } SECTION("invalid protocol") { - sock = ftl::net::connect("http://127.0.0.1:7077"); - REQUIRE(!sock->isValid()); + REQUIRE( !b.connect("http://127.0.0.1:7077") ); + + sleep_for(milliseconds(100)); + + REQUIRE( a.numberOfPeers() == 0 ); + REQUIRE( b.numberOfPeers() == 0 ); } - SECTION("empty uri") { + /*SECTION("empty uri") { sock = ftl::net::connect(""); REQUIRE(!sock->isValid()); } @@ -136,7 +53,7 @@ TEST_CASE("net::connect()", "[net]") { SECTION("null uri") { sock = ftl::net::connect(NULL); REQUIRE(!sock->isValid()); - } + }*/ // Disabled due to long timeout /*SECTION("incorrect ipv4 address") { @@ -152,10 +69,85 @@ TEST_CASE("net::connect()", "[net]") { REQUIRE(!sock->isValid()); }*/ - fin_server(); + //fin_server(); +} + +TEST_CASE("Universe::broadcast()", "[net]") { + Universe a("ftl://utu.fi"); + Universe b("ftl://utu.fi"); + + a.listen("tcp://localhost:7077"); + + SECTION("no arguments to no peers") { + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("done"); + + sleep_for(milliseconds(100)); + } + + SECTION("no arguments to one peer") { + b.connect("tcp://localhost:7077"); + while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + + bool done = false; + a.bind("hello", [&done]() { + done = true; + }); + + b.broadcast("hello"); + + sleep_for(milliseconds(100)); + + REQUIRE( done ); + } + + SECTION("one argument to one peer") { + b.connect("tcp://localhost:7077"); + while (a.numberOfPeers() == 0) sleep_for(milliseconds(20)); + + int done = 0; + a.bind("hello", [&done](int v) { + done = v; + }); + + b.broadcast("hello", 676); + + sleep_for(milliseconds(100)); + + REQUIRE( done == 676 ); + } + + SECTION("one argument to two peers") { + Universe c("ftl://utu.fi"); + + b.connect("tcp://localhost:7077"); + c.connect("tcp://localhost:7077"); + while (a.numberOfPeers() < 2) sleep_for(milliseconds(20)); + + int done1 = 0; + b.bind("hello", [&done1](int v) { + done1 = v; + }); + + int done2 = 0; + c.bind("hello", [&done2](int v) { + done2 = v; + }); + + a.broadcast("hello", 676); + + sleep_for(milliseconds(100)); + + REQUIRE( done1 == 676 ); + REQUIRE( done2 == 676 ); + } } -TEST_CASE("net::listen()", "[net]") { +/*TEST_CASE("net::listen()", "[net]") { SECTION("tcp any interface") { REQUIRE( ftl::net::listen("tcp://localhost:9001")->isListening() ); @@ -229,5 +221,5 @@ TEST_CASE("Net Integration", "[integrate]") { // TODO s2->wait(100); REQUIRE( data == "hello world" ); -} +}*/ diff --git a/net/cpp/test/peer_unit.cpp b/net/cpp/test/peer_unit.cpp index 32dddd01e5b34a077f03a1fc961999707be87118..35a581657d89efe10b7d0ab19a29c3e1a3e8f41f 100644 --- a/net/cpp/test/peer_unit.cpp +++ b/net/cpp/test/peer_unit.cpp @@ -116,6 +116,14 @@ tuple<std::string, T> readResponse(int s) { return std::make_tuple(get<1>(req), get<2>(req)); } +template <typename T> +tuple<uint32_t, T> readRPC(int s) { + msgpack::object_handle msg = msgpack::unpack(fakedata[s].data(), fakedata[s].size()); + tuple<uint8_t, uint32_t, std::string, T> req; + msg.get().convert(req); + return std::make_tuple(get<1>(req), get<3>(req)); +} + // --- Files to test ----------------------------------------------------------- #include "../src/peer.cpp" @@ -171,52 +179,64 @@ TEST_CASE("Peer(int)", "[]") { } } -/*TEST_CASE("Peer::call()", "[rpc]") { +TEST_CASE("Peer::call()", "[rpc]") { MockPeer s; + send_handshake(s); + s.mock_data(); - SECTION("no argument call") { - waithandler = [&]() { - // Read fakedata sent - // TODO Validate data + SECTION("one argument call") { + REQUIRE( s.isConnected() ); + + fakedata[0] = ""; + + // Thread to provide response to otherwise blocking call + std::thread thr([&s]() { + while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - // Do a fake send - auto res_obj = std::make_tuple(1,0,msgpack::object(),66); + auto [id,value] = readRPC<tuple<int>>(0); + auto res_obj = std::make_tuple(1,id,"__return__",get<0>(value)+22); std::stringstream buf; msgpack::pack(buf, res_obj); - - fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str()); + fakedata[0] = buf.str(); s.mock_data(); - }; + }); - int res = s.call<int>("test1"); + int res = s.call<int>("test1", 44); + + thr.join(); REQUIRE( (res == 66) ); } - SECTION("one argument call") { - waithandler = [&]() { - // Read fakedata sent - // TODO Validate data + SECTION("no argument call") { + REQUIRE( s.isConnected() ); + + fakedata[0] = ""; + + // Thread to provide response to otherwise blocking call + std::thread thr([&s]() { + while (fakedata[0].size() == 0) std::this_thread::sleep_for(std::chrono::milliseconds(20)); - // Do a fake send - auto res_obj = std::make_tuple(1,1,msgpack::object(),43); + auto [id,value] = readRPC<tuple<>>(0); + auto res_obj = std::make_tuple(1,id,"__return__",77); std::stringstream buf; msgpack::pack(buf, res_obj); - - fake_send(0, FTL_PROTOCOL_RPCRETURN, buf.str()); + fakedata[0] = buf.str(); s.mock_data(); - }; + }); - int res = s.call<int>("test1", 78); + int res = s.call<int>("test1"); - REQUIRE( (res == 43) ); + thr.join(); + + REQUIRE( (res == 77) ); } - - waithandler = nullptr; -}*/ +} TEST_CASE("Peer::bind()", "[rpc]") { MockPeer s; + send_handshake(s); + s.mock_data(); SECTION("no argument call") { bool done = false; @@ -225,8 +245,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { done = true; }); - send_handshake(s); - s.mock_data(); s.send("hello"); s.mock_data(); // Force it to read the fake send... @@ -240,8 +258,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { done = a; }); - send_handshake(s); - s.mock_data(); s.send("hello", 55); s.mock_data(); // Force it to read the fake send... @@ -254,9 +270,7 @@ TEST_CASE("Peer::bind()", "[rpc]") { s.bind("hello", [&](int a, std::string b) { done = b; }); - - send_handshake(s); - s.mock_data(); + s.send("hello", 55, "world"); s.mock_data(); // Force it to read the fake send... @@ -264,26 +278,6 @@ TEST_CASE("Peer::bind()", "[rpc]") { } } -/*TEST_CASE("Socket::operator>>()", "[io]") { - MockPeer s; - - SECTION("stream ints") { - int i[2]; - i[0] = 99; - i[1] = 101; - fake_send(0, 100, std::string((char*)&i,2*sizeof(int))); - - i[0] = 0; - i[1] = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == 2*sizeof(int)) ); - s >> i; - REQUIRE( (i[0] == 99) ); - REQUIRE( (i[1] == 101) ); - } -}*/ - TEST_CASE("Socket::send()", "[io]") { MockPeer s; @@ -297,133 +291,53 @@ TEST_CASE("Socket::send()", "[io]") { REQUIRE( (get<0>(value) == 607) ); } - /*SECTION("send a string") { + SECTION("send a string") { std::string str("hello world"); - s.send(100,str); + s.send("dummy",str); - REQUIRE( (get_service(0) == 100) ); - REQUIRE( (get_size(0) == str.size()) ); - REQUIRE( (get_value<std::string>(0) == "hello world") ); + auto [name, value] = readResponse<tuple<std::string>>(0); + + REQUIRE( (name == "dummy") ); + REQUIRE( (get<0>(value) == "hello world") ); } SECTION("send const char* string") { - s.send(100,"hello world"); + s.send("dummy","hello world"); - REQUIRE( (get_service(0) == 100) ); - REQUIRE( (get_size(0) == 11) ); - REQUIRE( (get_value<std::string>(0) == "hello world") ); + auto [name, value] = readResponse<tuple<std::string>>(0); + + REQUIRE( (name == "dummy") ); + REQUIRE( (get<0>(value) == "hello world") ); } - SECTION("send const char* array") { + /*SECTION("send const char* array") { s.send(100,ftl::net::array{"hello world",10}); REQUIRE( (get_service(0) == 100) ); REQUIRE( (get_size(0) == 10) ); REQUIRE( (get_value<std::string>(0) == "hello worl") ); - } + }*/ SECTION("send a tuple") { auto tup = std::make_tuple(55,66,true,6.7); - s.send(100,tup); + s.send("dummy",tup); - REQUIRE( (get_service(0) == 100) ); - REQUIRE( (get_size(0) == sizeof(tup)) ); - REQUIRE( (get_value<decltype(tup)>(0) == tup) ); + auto [name, value] = readResponse<tuple<decltype(tup)>>(0); + + REQUIRE( (name == "dummy") ); + REQUIRE( (get<1>(get<0>(value)) == 66) ); } SECTION("send multiple strings") { std::string str("hello "); std::string str2("world"); - s.send(100,str,str2); - - REQUIRE( (get_service(0) == 100) ); - REQUIRE( (get_size(0) == str.size()+str2.size()) ); - REQUIRE( (get_value<std::string>(0) == "hello world") ); - }*/ -} - -/*TEST_CASE("Socket::read()", "[io]") { - MockSocket s; - - SECTION("read an int") { - int i = 99; - fake_send(0, 100, std::string((char*)&i,4)); - - i = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == sizeof(int)) ); - REQUIRE( (s.read(i) == sizeof(int)) ); - REQUIRE( (i == 99) ); - } - - SECTION("read two ints") { - int i[2]; - i[0] = 99; - i[1] = 101; - fake_send(0, 100, std::string((char*)&i,2*sizeof(int))); - - i[0] = 0; - i[1] = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == 2*sizeof(int)) ); - REQUIRE( (s.read(&i,2) == 2*sizeof(int)) ); - REQUIRE( (i[0] == 99) ); - REQUIRE( (i[1] == 101) ); - } - - SECTION("multiple reads") { - int i[2]; - i[0] = 99; - i[1] = 101; - fake_send(0, 100, std::string((char*)&i,2*sizeof(int))); - - i[0] = 0; - i[1] = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.read(&i[0],1) == sizeof(int)) ); - REQUIRE( (i[0] == 99) ); - REQUIRE( (s.read(&i[1],1) == sizeof(int)) ); - REQUIRE( (i[1] == 101) ); - } - - SECTION("read a string") { - std::string str; - fake_send(0, 100, std::string("hello world")); + s.send("dummy2",str,str2); - s.mock_data(); // Force a message read, but no protocol... + auto [name, value] = readResponse<tuple<std::string,std::string>>(0); - REQUIRE( (s.size() == 11) ); - REQUIRE( (s.read(str) == 11) ); - REQUIRE( (str == "hello world") ); + REQUIRE( (name == "dummy2") ); + REQUIRE( (get<0>(value) == "hello ") ); + REQUIRE( (get<1>(value) == "world") ); } - - SECTION("read into existing string") { - std::string str; - str.reserve(11); - void *ptr = str.data(); - fake_send(0, 100, std::string("hello world")); - - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == 11) ); - REQUIRE( (s.read(str) == 11) ); - REQUIRE( (str == "hello world") ); - REQUIRE( (str.data() == ptr) ); - } - - SECTION("read too much data") { - int i = 99; - fake_send(0, 100, std::string((char*)&i,4)); - - i = 0; - s.mock_data(); // Force a message read, but no protocol... - - REQUIRE( (s.size() == sizeof(int)) ); - REQUIRE( (s.read(&i,2) == sizeof(int)) ); - REQUIRE( (i == 99) ); - } -}*/ +}