diff --git a/.gitignore b/.gitignore index 18f64c8b2db5668b6f68761678daf19e5ab45d72..fc8082d8b494800577d5951d3d104a872ac17a43 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,4 @@ docs/html /build* /.vs /CMakeSettings.json -.vscode/settings.json +.vscode/* diff --git a/.vscode/launch.json b/.vscode/launch.json index c86bce66099f93f2a10bf28bb18c8a0b3be623df..da7ac6001e96c9e5fb6c91597bc3db3c5eb0abcd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,12 +5,16 @@ "version": "0.2.0", "configurations": [ { - "name": "g++ - Debug Node", + "name": "(gdb) Launch", "type": "cppdbg", "request": "launch", + "program": "${command:cmake.launchTargetPath}", "args": [], "stopAtEntry": false, "cwd": "${workspaceFolder}/build", + "environment": [ + {"name" : "ASAN_OPTIONS", "value" : "abort_on_error=1,protect_shadow_gap=0"} + ], "externalConsole": false, "MIMode": "gdb", "setupCommands": [ @@ -18,15 +22,44 @@ "description": "Enable pretty-printing for gdb", "text": "-enable-pretty-printing", "ignoreFailures": true + }, + { + "description": "Set Disassembly Flavor to Intel", + "text": "-gdb-set disassembly-flavor intel", + "ignoreFailures": true } ], - "miDebuggerPath": "/usr/bin/gdb", "sourceFileMap": { "${workspaceFolder}": { "editorPath": "${workspaceFolder}", "useForBreakpoints": "true" } - } + } + }, + { + "name": "(gdb) Attach", + "type": "cppdbg", + "request": "attach", + "program": "${command:cmake.launchTargetPath}", + "MIMode": "gdb", + "setupCommands": [ + { + "description": "Enable pretty-printing for gdb", + "text": "-enable-pretty-printing", + "ignoreFailures": true + }, + { + "description": "Set Disassembly Flavor to Intel", + "text": "-gdb-set disassembly-flavor intel", + "ignoreFailures": true + } + ], + "sourceFileMap": { + "${workspaceFolder}": { + "editorPath": "${workspaceFolder}", + "useForBreakpoints": "true" + } + } } ] } \ No newline at end of file diff --git a/src/peer.cpp b/src/peer.cpp index 89612f1aea3216bd666774157969841e05abb1fb..5dca8e14638e22f077dd6f1c18a65d7df2fecc14 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -17,8 +17,6 @@ #include <ftl/lib/ctpl_stl.hpp> #include <ftl/counter.hpp> -#include <ftl/profiler.hpp> - #include "common.hpp" #include <ftl/uri.hpp> @@ -149,13 +147,6 @@ Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatche disp_(std::make_unique<Dispatcher>(d)) { /* Incoming connection constructor */ -#ifdef TRACY - TracyPlotConfig("SendBuf", tracy::PlotFormatType::Memory, false, true, tracy::Color::Red4); -#endif - - // TODO: use dedicated thread instead of stealing one from the pool - ftl::pool.push([this](int){ send_(); }); - CHECK(sock_) << "incoming SocketConnection pointer null"; _set_socket_options(); _updateURI(); @@ -173,12 +164,6 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) : disp_(std::make_unique<Dispatcher>(d)) { /* Outgoing connection constructor */ -#ifdef TRACY - TracyPlotConfig("SendBuf", tracy::PlotFormatType::Memory, false, true, tracy::Color::Red); -#endif - // TODO: use dedicated thread instead of stealing one from the pool - ftl::pool.push([this](int){ send_(); }); - _bind_rpc(); _connect(); ++net_->peer_instances_; @@ -386,8 +371,6 @@ bool Peer::_data() { // UNIQUE_LOCK(recv_mtx_, lk); // msgpack::object is valid as long as handle is - FTL_PROFILE_SCOPE("Peer::_data [recv]"); - msgpack::object_handle msg_handle; try { @@ -513,14 +496,14 @@ void Peer::_sendResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1, id, msgpack::object(), res); UNIQUE_LOCK(send_mtx_, lk); msgpack::pack(send_buf_, res_obj); - queue_send_(); + _send(); } void Peer::_sendErrorResponse(uint32_t id, const msgpack::object &res) { Dispatcher::response_t res_obj = std::make_tuple(1, id, res, msgpack::object()); UNIQUE_LOCK(send_mtx_, lk); msgpack::pack(send_buf_, res_obj); - queue_send_(); + _send(); } void Peer::_waitCall(int id, std::condition_variable &cv, bool &hasreturned, const std::string &name) { @@ -567,77 +550,36 @@ bool Peer::waitConnection(int s) { return status_ == NodeStatus::kConnected; } -void Peer::queue_send_() { - UNIQUE_LOCK(send_mtx_, lk); -#ifdef TRACY - TracyPlot("SendBuf", send_buf_.size()); -#endif - send_cv_.notify_all(); -} - -void Peer::send_() { - while (true) - { - if (status_ == NodeStatus::kDisconnected) { - break; - } - - T_UNIQUE_LOCK(send_working_mtx_) send_lock(send_working_mtx_, std::defer_lock); - - { - UNIQUE_LOCK(send_mtx_, lk); - - if (send_buf_.size() == 0) { - send_cv_.wait(lk); - } - - if (status_ == NodeStatus::kDisconnected) { - break; - } +int Peer::_send() { + if (!sock_->is_valid()) return -1; - if (!sock_->is_valid()) { continue; } - if (send_buf_.size() == 0) { continue; } + ssize_t c = 0; - if (send_lock.try_lock()) { - send_buf_working_.swap(send_buf_); - } - else { continue; } + try { + c = sock_->writev(send_buf_.vector(), send_buf_.vector_size()); + if (c <= 0) { + // writev() should probably throw exception which is reported here + // at the moment, error message is (should be) printed by writev() + net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev() failed"); + return c; } - CHECK(send_lock.owns_lock()); - - try { - FTL_PROFILE_SCOPE("send_"); - - ssize_t sent = 0; - auto sz = send_buf_working_.size(); - - /* msgpack::vrefbuffer version - - c = sock_->writev(send_buf_working_.vector(), send_buf_working_.vector_size()); - ssize_t sz = 0; for (size_t i = 0; i < send_buf_working_.vector_size(); i++) { - sz += send_buf_working_.vector()[i].iov_len; - } - */ - - iovec vec { (void*)send_buf_working_.data(), (size_t)send_buf_working_.size() }; - sent = sock_->writev(&vec, 1); - - if (sent != sz) { - net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev(): incomplete send"); - _close(reconnect_on_socket_error_); - } - - send_buf_working_.clear(); - net_->txBytes_ += sent; - - } catch (std::exception& ex) { - net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what()); + ssize_t sz = 0; for (size_t i = 0; i < send_buf_.vector_size(); i++) { + sz += send_buf_.vector()[i].iov_len; + } + if (c != sz) { + net_->_notifyError(this, ftl::protocol::Error::kSocketError, "writev(): incomplete send"); _close(reconnect_on_socket_error_); } - send_lock.release(); + send_buf_.clear(); + } catch (std::exception& ex) { + net_->_notifyError(this, ftl::protocol::Error::kSocketError, ex.what()); + _close(reconnect_on_socket_error_); } + + net_->txBytes_ += c; + return c; } Peer::~Peer() { diff --git a/src/peer.hpp b/src/peer.hpp index 96c560525bb9ef4726a21e1219730785694f9de8..4e7d742e5b794a2e3493cda57a084d35ab3198e2 100644 --- a/src/peer.hpp +++ b/src/peer.hpp @@ -44,41 +44,6 @@ extern int setDescriptors(); namespace ftl { namespace net { -/** trivial vector-backed buffer for msgpack */ -struct vectorbuffer -{ - vectorbuffer(int initial_size=4096) : buffer_(initial_size), head_(0) {} - - void clear() { - head_ = 0; - } - - const char* data() { return buffer_.data(); } - - const int64_t size() { return head_; } - - void write(const char* buf, size_t len) { - if ((head_ + len) > buffer_.size()) { - // just double buffer size - buffer_.resize((head_ + len)*2); - } - memcpy(buffer_.data() + head_, buf, len); - head_ += len; - } - - void swap(vectorbuffer& other) { - std::swap(buffer_, other.buffer_); - std::swap(head_, other.head_); - } - - vectorbuffer(const vectorbuffer&) = delete; - vectorbuffer& operator=(const vectorbuffer&) = delete; - -private: - std::vector<char> buffer_; - int64_t head_; -}; - class Universe; /** @@ -273,6 +238,7 @@ class Peer { void _bind_rpc(); void _connect(); + int _send(); void _createJob(); @@ -290,17 +256,11 @@ class Peer { msgpack::unpacker recv_buf_; size_t recv_buf_max_ = kDefaultMessage; - DECLARE_MUTEX(recv_mtx_); + MUTEX recv_mtx_; - void queue_send_(); - void send_(); // Send buffers - vectorbuffer send_buf_; + msgpack::vrefbuffer send_buf_; DECLARE_RECURSIVE_MUTEX(send_mtx_); - vectorbuffer send_buf_working_; - DECLARE_RECURSIVE_MUTEX(send_working_mtx_); - std::condition_variable_any send_cv_; - DECLARE_RECURSIVE_MUTEX(cb_mtx_); const bool outgoing_; @@ -337,13 +297,9 @@ int Peer::send(const std::string &s, ARGS&&... args) { UNIQUE_LOCK(send_mtx_, lk); auto args_obj = std::make_tuple(args...); auto call_obj = std::make_tuple(0, s, args_obj); - - auto size_old = send_buf_.size(); msgpack::pack(send_buf_, call_obj); - auto send_size = send_buf_.size() - size_old; - - queue_send_(); - return send_size; + int rc = _send(); + return rc; } template <typename F> @@ -391,7 +347,7 @@ std::future<T> Peer::asyncCall(const std::string &name, ARGS... args) { UNIQUE_LOCK(send_mtx_, lk); msgpack::pack(send_buf_, call_obj); - queue_send_(); + _send(); return future; }