diff --git a/CMakeLists.txt b/CMakeLists.txt index 9d86e4835fdb79ee34ea977f3cd23b7b632e1be1..d170d5643bf6066ebea1e4142defc48144313dd4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -117,7 +117,6 @@ if (USE_CPPCHECK) endif() include(ftl_tracy) -include(ftl_abseil) include(git_version) include(ftl_paths) @@ -213,9 +212,6 @@ target_include_directories(beyond-common PUBLIC target_link_libraries(beyond-common PRIVATE TracyClient - absl::strings - absl::flat_hash_map - absl::flat_hash_set ) ### ==================================================================================================================== diff --git a/cmake/ftl_tracy.cmake b/cmake/ftl_tracy.cmake index 8ceb2e2fdeb125238f48a0310bf771f3dbd0a06f..1ebf4de0f0905276c90e8d1e6cd34097d146b18e 100644 --- a/cmake/ftl_tracy.cmake +++ b/cmake/ftl_tracy.cmake @@ -1,13 +1,18 @@ if (ENABLE_PROFILER) - set(TRACY_CXX_OPTIONS "-DTRACY_ENABLE -DTRACY_DELAYED_INIT -DTRACY_VERBOSE -DNOMINMAX") + set(TRACY_CXX_OPTIONS "-DTRACY_ENABLE -DNOMINMAX") if (DEBUG_LOCKS) + # This doesn't seem to do anything. Log debugging/profiling should be done per-lock basis, otherwise + # profiler will quickly run out of memory. set(TRACY_CXX_OPTIONS "${TRACY_CXX_OPTIONS} -DDEBUG_LOCKS") message(STATUS "Lock profiling enabled") endif() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TRACY_CXX_OPTIONS}") + set(TRACY_ON_DEMAND OFF) # Problems in Unreal if ON + set(TRACY_DELAYED_INIT ON) + FetchContent_Declare(tracy GIT_REPOSITORY https://github.com/wolfpld/tracy.git GIT_TAG 897aec5b062664d2485f4f9a213715d2e527e0ca # tags/v0.9.1 diff --git a/include/ftl/profiler.hpp b/include/ftl/profiler.hpp index 367f68dba60ade5c989c8f840cc512be6a270faa..6ee76eee6fb06ab73d2494e5515801eb81789cd3 100644 --- a/include/ftl/profiler.hpp +++ b/include/ftl/profiler.hpp @@ -9,10 +9,21 @@ #include <ftl/protocol/config.h> #include <string> -inline void FTL_PROFILE_LOG(const std::string& message) { +namespace ftl +{ +// Enable/disable logging to profiler. Caller must synchronize (if necessary). +void profiler_logging_disable(); +void profiler_logging_enable(); +} + #ifdef TRACY_ENABLE - TracyMessage(message.c_str(), message.size()); +#include <tracy/Tracy.hpp> #endif + +inline void FTL_PROFILE_LOG(const std::string& message) { + #ifdef TRACY_ENABLE + TracyMessage(message.c_str(), message.size()); + #endif } namespace detail @@ -24,12 +35,10 @@ namespace detail inline const char* GetPersistentString(const std::string& String) { return GetPersistentString(String.c_str()); } } -#define PROFILER_RUNTIME_PERSISTENT_NAME(name) detail::GetPersistentString(name) +#define PROFILER_RUNTIME_PERSISTENT_NAME(name) ::detail::GetPersistentString(name) #ifdef TRACY_ENABLE -#include <tracy/Tracy.hpp> - #define FTL_PROFILE_SCOPE(LABEL) ZoneScopedN(LABEL) // NOTE: Tracy expects Label to be a pointer to same address (this should be the case @@ -38,8 +47,13 @@ namespace detail /** Mark (secondary) frame start and stop. Each FTL_PROFILE_FRAME_BEGIN MUST be matched * with FTL_PROFILE_FRAME_END */ -#define FTL_PROFILE_FRAME_BEGIN(LABEL) FrameMarkStart(#LABEL) -#define FTL_PROFILE_FRAME_END(LABEL) FrameMarkEnd(#LABEL) + +// Discontinuous frame (next one is not assumed to begin immediately after the previous completes) +//#define FTL_PROFILE_FRAME_BEGIN(LABEL) FrameMarkStart(LABEL) +//#define FTL_PROFILE_FRAME_END(LABEL) FrameMarkEnd(LABEL) + +/** Mark frame scope */ +#define FTL_PROFILE_FRAME_END(LABEL) FrameMarkNamed(LABEL) /** Mark end of primary frame (main rendering/capture loop etc, if applicable) */ #define FTL_PROFILE_PRIMARY_FRAME_END() FrameMark diff --git a/include/ftl/protocol/config.h.in b/include/ftl/protocol/config.h.in index d303191a844ef4e15be4160457ae2784f3e4f98b..c84c2ab98f3128078b1af8cda134b6e7f7ec6c8a 100644 --- a/include/ftl/protocol/config.h.in +++ b/include/ftl/protocol/config.h.in @@ -24,6 +24,7 @@ #cmakedefine HAVE_OPENSSL #cmakedefine HAVE_MSQUIC #cmakedefine ENABLE_PROFILER +#cmakedefine DEBUG_LOCKS extern const char *FTL_BRANCH; extern const char *FTL_VERSION_LONG; diff --git a/include/ftl/threads.hpp b/include/ftl/threads.hpp index 330d10a89ca7bd6f947588f80fdae3e8285a9a60..a2d2ba69bf608817c759e0d47582ab0945c1b792 100644 --- a/include/ftl/threads.hpp +++ b/include/ftl/threads.hpp @@ -20,7 +20,7 @@ /// consider using DECLARE_SHARED_MUTEX(name) which allows (optional) profiling #define SHARED_MUTEX std::shared_mutex -#if defined(TRACY_ENABLE) +#if defined(TRACY_ENABLE) && defined(DEBUG_LOCKS) #include <type_traits> #include <tracy/Tracy.hpp> @@ -41,6 +41,8 @@ // TODO: should automatic, but requires mutexes to be declared with DECLARE_..._MUTEX macros #define UNIQUE_LOCK_T(M) std::unique_lock<std::remove_reference<decltype(M)>::type> +#define UNIQUE_LOCK_MUTEX_T std::unique_lock<LockableBase(MUTEX_T)> + /// deprecated: use UNIQUE_LOCK_N instead #define UNIQUE_LOCK(M, L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M) /// deprecated: use SHARED_LOCK_N instead @@ -65,6 +67,8 @@ #define MARK_LOCK(M) {} #define UNIQUE_LOCK_T(M) std::unique_lock<std::remove_reference<decltype(M)>::type> +#define UNIQUE_LOCK_MUTEX_T std::unique_lock<MUTEX_T> + /// deprecated: use UNIQUE_LOCK_N instead #define UNIQUE_LOCK(M, L) std::unique_lock<std::remove_reference<decltype(M)>::type> L(M) /// deprecated: use SHARED_LOCK_N instead @@ -76,6 +80,8 @@ namespace ftl { +void set_thread_name(const std::string& name); + extern ctpl::thread_pool pool; /** FIFO task queue for thread pool. */ @@ -87,8 +93,10 @@ public: int queue(Task task) { auto lk = std::unique_lock(mtx_); if (stop_) { return -1; } + int size = queue_.size(); queue_.push_back(std::move(task)); start_and_unlock(lk); + return size; } /** Try to queue new task if queue is not larger than max_queue_size. @@ -147,7 +155,7 @@ private: busy_ = true; lk.unlock(); pool.push(std::bind(&TaskQueue<Task>::run, this)); - } + } else { lk.unlock(); } } void run() { diff --git a/src/common/profiler.cpp b/src/common/profiler.cpp index 03a309b6986a8393fd200576cb1fbc12b184cfa1..9a61f38f2ba5213a8bd00d8ada422348b3d96587 100644 --- a/src/common/profiler.cpp +++ b/src/common/profiler.cpp @@ -10,6 +10,41 @@ //#include <absl/container/flat_hash_map.h> #include <unordered_map> +#ifdef TRACY_ENABLE +void tracy_log_cb(void* user_data, const loguru::Message& message) +{ + size_t size = strlen(message.message); + if (message.verbosity <= loguru::NamedVerbosity::Verbosity_ERROR) + { + TracyMessageC(message.message, size, tracy::Color::Red1); + } + else if (message.verbosity == loguru::NamedVerbosity::Verbosity_WARNING) + { + TracyMessageC(message.message, size, tracy::Color::Yellow1); + } + else + { + TracyMessageC(message.message, size, tracy::Color::WhiteSmoke); + } +} +#endif +namespace ftl +{ +// Enable/disable logging to profiler. Caller must synchronize (if necessary). +void profiler_logging_disable() +{ + #ifdef TRACY_ENABLE + loguru::remove_callback("tracy_profiler"); + #endif +} +void profiler_logging_enable() +{ + #ifdef TRACY_ENABLE + loguru::add_callback("tracy_profiler", tracy_log_cb, nullptr, loguru::NamedVerbosity::Verbosity_1); + #endif +} +} + using map_t = std::unordered_map<std::string, const char*>; inline bool MapContains(map_t map, const std::string& key) diff --git a/src/ctpl_stl.cpp b/src/ctpl_stl.cpp index b82a72aabc77f43542102b01053791c54bdd5cfb..923e0ba510d4e26dd1c5d817bb588b329b3643c3 100644 --- a/src/ctpl_stl.cpp +++ b/src/ctpl_stl.cpp @@ -1,25 +1,14 @@ -#ifdef WIN32 -#include <Ws2tcpip.h> -#include <windows.h> -#include <string> -#endif - #include <ftl/lib/ctpl_stl.hpp> - -#ifdef TRACY_ENABLE -#include <tracy/Tracy.hpp> -#endif +#include <ftl/threads.hpp> +#include <string> void ctpl::thread_pool::set_thread(int i) { std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { - #if TRACY_ENABLE { const auto thread_name = "thread_pool/" + std::to_string(i); - tracy::SetThreadName(thread_name.c_str()); + ftl::set_thread_name(thread_name); } - #endif - std::atomic<bool> & _flag = *flag; std::function<void(int id)> * _f; bool isPop = this->q.pop(_f); @@ -42,30 +31,4 @@ void ctpl::thread_pool::set_thread(int i) { } }; this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() - - // For excess threads, ensure they only operate if needed. - /*if (i >= std::thread::hardware_concurrency()-1) { - #ifndef WIN32 - sched_param p; - p.sched_priority = sched_get_priority_min(SCHED_FIFO); - pthread_setschedparam(threads[i]->native_handle(), SCHED_FIFO, &p); - #endif - } else { - #ifndef WIN32 - sched_param p; - p.sched_priority = sched_get_priority_max(SCHED_FIFO); - pthread_setschedparam(threads[i]->native_handle(), SCHED_FIFO, &p); - #endif - }*/ - - /* - #ifdef WIN32 - SetThreadAffinityMask(this->threads[i]->native_handle(), 1 << i); - #else - cpu_set_t cpus; - CPU_ZERO(&cpus); - CPU_SET(i, &cpus); - pthread_setaffinity_np(this->threads[i]->native_handle(), sizeof(cpus), &cpus); - #endif - */ } diff --git a/src/peer_tcp.cpp b/src/peer_tcp.cpp index 07c08c3874efac366f67f685c45ce4baa218d91d..2d452c8e0e630f5fab10d0e02a6d60b4d06cda72 100644 --- a/src/peer_tcp.cpp +++ b/src/peer_tcp.cpp @@ -16,6 +16,7 @@ #include <ftl/lib/loguru.hpp> #include <ftl/lib/ctpl_stl.hpp> #include <ftl/counter.hpp> +#include <ftl/profiler.hpp> #include "common.hpp" diff --git a/src/protocol.cpp b/src/protocol.cpp index 22c1bb62327f038a08f2425c221317cb7cca42bb..db8c2ebb2ddb503df45cdd267cdaa725e68a5b89 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -10,12 +10,24 @@ #include "universe.hpp" #include "rpc.hpp" +#ifdef TRACY_ENABLE +#include <tracy/Tracy.hpp> +#endif + static std::shared_ptr<ftl::net::Universe> universe; static std::mutex globalmtx; // ctpl::thread_pool ftl::pool(std::thread::hardware_concurrency()*2); ctpl::thread_pool ftl::pool(4); +void ftl::set_thread_name(const std::string& name) { + #if TRACY_ENABLE + tracy::SetThreadName(name.c_str()); + #else + loguru::set_thread_name(name.c_str()); + #endif +} + void ftl::protocol::reset() { UNIQUE_LOCK(globalmtx, lk); universe.reset(); diff --git a/src/quic/src/quic_peer.cpp b/src/quic/src/quic_peer.cpp index 1fb249ae207d435626902c57c6bee8a81cc4cb4a..e07e05b17e36bea7e2de28e972eec847111e2fe5 100644 --- a/src/quic/src/quic_peer.cpp +++ b/src/quic/src/quic_peer.cpp @@ -39,12 +39,18 @@ QuicPeerStream::QuicPeerStream(MsQuicConnection* connection, MsQuicStreamPtr str recv_buffer_.reserve_buffer(recv_buffer_default_size_); + #ifdef ENABLE_PROFILER profiler_name_ = PROFILER_RUNTIME_PERSISTENT_NAME("QuicStream[" + std::to_string(profiler_name_ctr_++) + "]"); - profiler_id_.plt_pending_buffers = PROFILER_RUNTIME_PERSISTENT_NAME(peer_->getURI() + ": " + name_ + " pending buffers"); - profiler_id_.plt_pending_bytes = PROFILER_RUNTIME_PERSISTENT_NAME(peer_->getURI() + ":" + name_ + " pending bytes"); - TracyPlotConfig(profiler_id_.plt_pending_buffers, tracy::PlotFormatType::Percentage, false, true, tracy::Color::Red1); - TracyPlotConfig(profiler_id_.plt_pending_bytes, tracy::PlotFormatType::Memory, false, true, tracy::Color::Red1); + profiler_id_.plt_pending_buffers = PROFILER_RUNTIME_PERSISTENT_NAME(getURI() + ": " + name_ + " pending buffers"); + profiler_id_.plt_pending_bytes = PROFILER_RUNTIME_PERSISTENT_NAME(getURI() + ":" + name_ + " pending bytes"); + profiler_id_.plt_recv_size = PROFILER_RUNTIME_PERSISTENT_NAME(getURI() + ": " + name_ + " recv size"); + profiler_id_.plt_recv_size_obj = PROFILER_RUNTIME_PERSISTENT_NAME(getURI() + ":" + name_ + " recv size (msgpack objects)"); + + TracyPlotConfig(profiler_id_.plt_pending_buffers, tracy::PlotFormatType::Number, true, true, tracy::Color::Red1); + TracyPlotConfig(profiler_id_.plt_pending_bytes, tracy::PlotFormatType::Memory, true, true, tracy::Color::Red1); + TracyPlotConfig(profiler_id_.plt_recv_size, tracy::PlotFormatType::Memory, true, true, tracy::Color::Green1); + TracyPlotConfig(profiler_id_.plt_recv_size_obj, tracy::PlotFormatType::Number, true, true, tracy::Color::Green1); #endif } @@ -76,13 +82,24 @@ void QuicPeerStream::set_stream(MsQuicStreamPtr stream) void QuicPeerStream::close(bool reconnect) { - std::unique_lock<MUTEX_T> lk_send(send_mtx_, std::defer_lock); - std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock); + UNIQUE_LOCK_T(send_mtx_) lk_send(send_mtx_, std::defer_lock); + UNIQUE_LOCK_T(recv_mtx_) lk_recv(recv_mtx_, std::defer_lock); std::lock(lk_send, lk_recv); + recv_queue_.clear(); if (stream_ && stream_->IsOpen()) { auto future = stream_->Close(); + if (recv_waiting_) + { + CHECK(recv_busy_); + + // In case ProcessRecv is waiting, notify here + recv_waiting_ = false; + recv_cv_.notify_one(); // Not ideal to notify before releasing the lock (and then waiting on same cv) + recv_cv_.wait(lk_recv, [&]() { return !recv_busy_; }); + } + lk_send.unlock(); lk_recv.unlock(); @@ -97,8 +114,8 @@ void QuicPeerStream::OnShutdown(MsQuicStream* stream) void QuicPeerStream::OnShutdownComplete(MsQuicStream* stream) { - std::unique_lock<MUTEX_T> lk_send(send_mtx_, std::defer_lock); - std::unique_lock<MUTEX_T> lk_recv(recv_mtx_, std::defer_lock); + UNIQUE_LOCK_T(send_mtx_) lk_send(send_mtx_, std::defer_lock); + UNIQUE_LOCK_T(recv_mtx_) lk_recv(recv_mtx_, std::defer_lock); std::lock(lk_send, lk_recv); // MsQuic releases stream instanca after this callback. Any use of it later is a bug. @@ -281,6 +298,10 @@ int QuicPeerStream::send_buffer_(const std::string& name, msgpack_buffer_t&& buf } } + #ifdef ENABLE_PROFILER + statistics(); + #endif + return 1; } @@ -326,6 +347,7 @@ void QuicPeerStream::OnWriteComplete(MsQuicStream* stream, void* Context, bool C size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) { + // Amount of bytes for websocket header consumed size_t size_ws = 0; while (buffer_in.Length > 0) @@ -374,9 +396,9 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) } else if (status == NOT_ENOUGH_DATA) { - //CHECK(buffer_in.Length < ws_header_.size()); + CHECK(buffer_in.Length < ws_header_.size()); memcpy(ws_header_.data() + ws_partial_header_recvd_, buffer_in.Buffer, ws_header_.size() - ws_partial_header_recvd_); - ws_partial_header_recvd_ += buffer_in.Length; + ws_partial_header_recvd_ = std::min<uint32_t>(ws_header_.size(), ws_partial_header_recvd_ + buffer_in.Length); size_ws += buffer_in.Length; buffer_in.Length -= buffer_in.Length; @@ -426,9 +448,11 @@ size_t QuicPeerStream::ws_recv_(QUIC_BUFFER buffer_in, size_t& size_consumed) void QuicPeerStream::OnData(MsQuicStream* stream, nonstd::span<const QUIC_BUFFER> data) { - size_t size_consumed = 0; - size_t size_total = 0; - size_t size_ws = 0; + FTL_PROFILE_SCOPE("OnData"); + + size_t size_consumed = 0; // bytes written to current msgpack buffer + size_t size_total = 0; // bytes already passed to msgpack + size_t size_ws = 0; // number of bytes from websocket headers for (auto& buffer_in : data) { @@ -457,34 +481,93 @@ void QuicPeerStream::OnData(MsQuicStream* stream, nonstd::span<const QUIC_BUFFER } } + stream_->Consume(size_consumed + size_total + size_ws); + size_t sz = 0; + for (auto& buffer_in : data) { sz += buffer_in.Length; } + CHECK((size_consumed + size_total + size_ws) == sz); + { UNIQUE_LOCK_N(lk, recv_mtx_); + // Try to parse received buffer (msgpack parsing is fast) + // 2024/01 Increase in execution time is less than 1 microsecond per call. recv_buffer_.buffer_consumed(size_consumed); + recv_queue_.resize(recv_queue_.size() + 1); + int offset = recv_queue_.size() - 1; + int count = 0; + for(; recv_buffer_.next(recv_queue_[count + offset]); recv_queue_.resize(++count + offset + 1)); + recv_queue_.pop_back(); // Last element always invalid (next() returns false) + + #ifdef TRACY_ENABLE + TracyPlot(profiler_id_.plt_recv_size, double(size_consumed + size_total + size_ws)); + TracyPlot(profiler_id_.plt_recv_size_obj, double(recv_queue_.size())); + #endif + notify_recv_and_unlock_(lk); + } +} - if (!recv_busy_) - { - recv_busy_ = true; - ftl::pool.push([this](int){ ProcessRecv(); }); - } +void QuicPeerStream::notify_recv_and_unlock_(UNIQUE_LOCK_MUTEX_T& lk) +{ + bool notify = false; + if (!recv_busy_) + { + recv_busy_ = true; + lk.unlock(); + ftl::pool.push([this](int){ ProcessRecv(); }); } - stream_->Consume(size_consumed + size_total + size_ws); - - size_t sz = 0; - for (auto& buffer_in : data) { sz += buffer_in.Length; } - CHECK((size_consumed + size_total + size_ws) == sz); + else + { + notify = recv_waiting_ && (recv_queue_.size() > 0); + lk.unlock(); + } + + if (notify) + { + // OK outside lock, if resumed due to timeout buffer already available + recv_cv_.notify_one(); + } + + CHECK(!lk.owns_lock()); } void QuicPeerStream::ProcessRecv() { - UNIQUE_LOCK_N(lk, recv_mtx_); - msgpack::object_handle obj; + const int t_wait_ms = 300; // Time to wait before returning + std::vector<msgpack::object_handle> objs; + objs.reserve(4); - while (recv_buffer_.next(obj)) + UNIQUE_LOCK_N(lk, recv_mtx_); + while (true) { + if (recv_queue_.size() == 0) + { + auto pred = [&]() { return recv_waiting_ && recv_queue_.size() > 0; }; + // Wait a bit before exit. The idea is to use the same thread for the same connection/stream. + // TODO: Probably not a good idea to use the shared thread pool for this. + recv_waiting_ = true; + bool has_data = recv_cv_.wait_for(lk, std::chrono::milliseconds(t_wait_ms), pred); + recv_waiting_ = false; + if (!has_data) + { + // Buffer was not updated or didn't get a complete message + break; + } + else + { + continue; + } + } + + CHECK(objs.size() == 0); + std::swap(recv_queue_, objs); + lk.unlock(); - process_message_(obj); + FTL_PROFILE_SCOPE("ProcessRecv"); + for (auto& obj : objs) { process_message_(obj); } + objs.clear(); lk.lock(); } recv_busy_ = false; + lk.unlock(); + recv_cv_.notify_all(); } diff --git a/src/quic/src/quic_peer.hpp b/src/quic/src/quic_peer.hpp index 453996d5b3f69b8130eefcc1d53b9bf0e37cc106..ae95c7ef5886d5424903793def8d5551286c6f3b 100644 --- a/src/quic/src/quic_peer.hpp +++ b/src/quic/src/quic_peer.hpp @@ -66,10 +66,15 @@ private: std::string name_; const bool ws_frame_; + void notify_recv_and_unlock_(UNIQUE_LOCK_MUTEX_T& lk); + DECLARE_MUTEX(recv_mtx_); - void ProcessRecv(); + std::condition_variable_any recv_cv_; msgpack::unpacker recv_buffer_; + std::vector<msgpack::object_handle> recv_queue_; + void ProcessRecv(); bool recv_busy_ = false; + bool recv_waiting_ = false; struct SendEvent { SendEvent(msgpack_buffer_t buffer); @@ -135,6 +140,8 @@ private: const char* stream = nullptr; const char* plt_pending_buffers = nullptr; const char* plt_pending_bytes = nullptr; + const char* plt_recv_size = nullptr; + const char* plt_recv_size_obj = nullptr; } profiler_id_; void statistics(); diff --git a/src/quic/src/quic_universe.cpp b/src/quic/src/quic_universe.cpp index 816490997091cfa6ad2d1623626ba562022e7fd3..f592810843f46eae26c91e804ac421a66e762fad 100644 --- a/src/quic/src/quic_universe.cpp +++ b/src/quic/src/quic_universe.cpp @@ -13,7 +13,7 @@ static MsQuicContext MsQuic; void QuicUniverse::Unload(bool force) { - UNIQUE_LOCK_N(Lk, MsQuicMtx_); + auto lk = std::unique_lock(MsQuicMtx_); if (MsQuic.IsValid() && (force || (MsQuicCtr_ == 0))) { LOG_IF(WARNING, MsQuicCtr_ != 0) << "[QUIC] Unloading MsQuic before all users have released their resources"; @@ -28,7 +28,7 @@ std::unique_ptr<ftl::net::QuicUniverse> QuicUniverse::Create(Universe* net) QuicUniverseImpl::QuicUniverseImpl(Universe* net) : net_(net ), IsClosing(false) { - UNIQUE_LOCK_N(Lk, MsQuicMtx_); + auto lk = std::unique_lock(MsQuicMtx_); if (MsQuicCtr_++ == 0) { MsQuicContext::Open(MsQuic, "Beyond"); @@ -62,11 +62,11 @@ QuicUniverseImpl::~QuicUniverseImpl() } { - UNIQUE_LOCK_N(Lk, ClientMtx); + auto lk = std::unique_lock(ClientMtx); Client.reset(); } { - UNIQUE_LOCK_N(Lk, MsQuicMtx_); + auto lk = std::unique_lock(MsQuicMtx_); --MsQuicCtr_; } diff --git a/src/quic/src/quic_universe.hpp b/src/quic/src/quic_universe.hpp index c032327aae47f2edca1cee02a28dc2842ca4a5a2..d0442b9d2604d1a98e6711a63bf2afe8d61b2b11 100644 --- a/src/quic/src/quic_universe.hpp +++ b/src/quic/src/quic_universe.hpp @@ -30,7 +30,6 @@ public: virtual bool Listen(const ftl::URI& uri) = 0; virtual std::vector<ftl::URI> GetListeningUris() = 0; - virtual PeerPtr Connect(const ftl::URI& uri, bool is_webservice=false) = 0; virtual void ConnectProxy(const ftl::URI& url) = 0; // Connect to QuicProxy diff --git a/src/quic/src/quic_universe_impl.hpp b/src/quic/src/quic_universe_impl.hpp index f124dc736f7df2daf430331f87ad7face1a2c40c..1026e2ddc4671d3a50afa3d4d21a8c55b7c6e78d 100644 --- a/src/quic/src/quic_universe_impl.hpp +++ b/src/quic/src/quic_universe_impl.hpp @@ -47,7 +47,7 @@ public: DECLARE_MUTEX(ConnectionMtx); std::vector<MsQuicConnectionPtr> Connections; - std::condition_variable ConnectionCv; + std::condition_variable_any ConnectionCv; std::unique_ptr<ProxyClient> Proxy; }; diff --git a/src/quic/src/quic_websocket.cpp b/src/quic/src/quic_websocket.cpp index 78f446860e4c5c3378b92ea1d44272a00bb6073e..cc7330a3c35f74eadc45e11fc7872a644fbcbdbd 100644 --- a/src/quic/src/quic_websocket.cpp +++ b/src/quic/src/quic_websocket.cpp @@ -116,6 +116,7 @@ WsParseHeaderStatus WsParseHeader(uint8_t* const Data, size_t Length, WebSocketH Header.Mask = (Data[1] & 0x80) == 0x80; auto PayloadLength = (Data[1] & 0x7F); Header.HeaderSize = 2 + (PayloadLength == 126 ? 2 : 0) + (PayloadLength == 127? 8 : 0) + (Header.Mask? 4 : 0); + CHECK(Header.HeaderSize <= 14); if (Length < Header.HeaderSize) { return NOT_ENOUGH_DATA; } // header incomplete if (Header.Rsv != 0) { return INVALID; } // must be 0 according to RFC diff --git a/src/streams/filestream.cpp b/src/streams/filestream.cpp index 6f4e32ada8a3422b1440165ba99a803dae625df4..b1a69c49788eada7dc78fce2b0c2bad051cb6ba6 100644 --- a/src/streams/filestream.cpp +++ b/src/streams/filestream.cpp @@ -456,6 +456,7 @@ bool File::_open() { bool File::run() { thread_ = std::thread([this]() { + set_thread_name("filestream"); while (active_) { auto now = ftl::time::get_time(); tick(now); diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 7ebb8567994e4f3fe7c86c496d5d488ce4cd8b14..25dbcb665b5f724f9069ba0cbd4cf714086e3c84 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -56,6 +56,30 @@ std::atomic_size_t Net::tx_sample_count__ = 0; int64_t Net::last_msg__ = 0; MUTEX Net::msg_mtx__; +#ifdef DEBUG_NETSTREAM + +void dbg_check_pkt(std::mutex& mtx, std::map<std::pair<ftl::protocol::FrameID, ftl::protocol::Channel>, int64_t>& ts, const ftl::protocol::StreamPacket& spkt, const std::string& type) +{ + auto lk = std::unique_lock(mtx); + auto channel = spkt.channel; + auto id = ftl::protocol::FrameID(spkt.frameSetID(), spkt.frameNumber()); + auto key = std::make_pair(id, channel); + auto ts_new = spkt.timestamp; + if (ts.count(key) > 0) { + auto ts_old = ts[key]; + CHECK(ts_old <= ts_new) << "Out of Order send for channel " << int(channel) << ", diff: " << (ts_new - ts_old) << ", new ts. " << ts_new; + ts[key] = ts_new; + } + else { + ts[key] = ts_new; + } +} + +#define DEBUG_CHECK_PKT(mtx, ts, spkt, name) dbg_check_pkt(mtx, ts, spkt, name) +#else +#define DEBUG_CHECK_PKT(mtx, ts, spkt, name) {} +#endif + static std::list<std::string> net_streams; static SHARED_MUTEX stream_mutex; @@ -133,6 +157,16 @@ int Net::postQueueSize(FrameID frame_id, Channel channel) const { return 0; } +bool Net::net_send_(ftl::net::PeerBase* peer, const std::string &name, int16_t ttimeoff, const ftl::protocol::StreamPacket& spkt, const ftl::protocol::DataPacket& dpkt) { + DEBUG_CHECK_PKT(dbg_mtx_send_, dbg_send_, spkt, "send"); + return peer->send(name, ttimeoff, reinterpret_cast<const StreamPacketMSGPACK&>(spkt), reinterpret_cast<const PacketMSGPACK&>(dpkt)); +} + +bool Net::net_send_(const ftl::UUID &pid, const std::string &name, int16_t ttimeoff, const ftl::protocol::StreamPacket& spkt, const ftl::protocol::DataPacket& dpkt) { + DEBUG_CHECK_PKT(dbg_mtx_send_, dbg_send_, spkt, "send"); + return net_->send(pid, name, ttimeoff, reinterpret_cast<const StreamPacketMSGPACK&>(spkt), reinterpret_cast<const PacketMSGPACK&>(dpkt)); +} + bool Net::send(const StreamPacket &spkt, const DataPacket &pkt) { if (!active_) return false; if (paused_) return true; @@ -172,7 +206,8 @@ bool Net::send(const StreamPacket &spkt, const DataPacket &pkt) { // Or send non-blocking and wait auto peer = net_->getPeer(client.peerid); - if (!peer || !peer->send( + if (!peer || !net_send_( + peer.get(), base_uri_, pre_transmit_latency, // Time since timestamp for tx spkt_net, @@ -200,7 +235,7 @@ bool Net::send(const StreamPacket &spkt, const DataPacket &pkt) { try { int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp); - net_->send(*peer_, + net_send_(*peer_, base_uri_, pre_transmit_latency, // Time since timestamp for tx spkt_net, @@ -315,6 +350,7 @@ void waitUntilNext(bool hasNext, int64_t nextTs) { void Net::_run() { thread_ = std::thread([this]() { + set_thread_name("netstream"); #ifdef WIN32 timeBeginPeriod(5); #endif @@ -436,7 +472,7 @@ void Net::_run() { hasNext = true; } else { // No pending packets remain in the input buffer - LOG(WARNING) << "Buffer underun " << now; + //LOG(WARNING) << "Buffer underun " << now; ++underuns_; } @@ -487,9 +523,15 @@ bool Net::begin() { StreamPacketMSGPACK &spkt_raw, PacketMSGPACK &pkt) { + DEBUG_CHECK_PKT(dbg_mtx_recv_, dbg_recv_, spkt_raw, "recv"); + auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number)); _earlyProcessPacket(&p, ttimeoff, spkt_raw, pkt); + // HACK: disable network buffering here + //_processPacket(&p, ttimeoff, spkt_raw, pkt); + //return; + if (!host_ && !(spkt_raw.flags & ftl::protocol::kFlagOutOfBand)) { // not hosted: buffer packets (processed in separate thread Net::_run()) // or out of band which are passed to processing immediately @@ -632,7 +674,7 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun 0 }; - net_->send(*peer_, base_uri_, (int16_t)0, spkt, pkt); + net_send_(*peer_, base_uri_, (int16_t)0, spkt, pkt); hasPosted(spkt, pkt); return true; } diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index c110ef88d372e5e5f06afad879d5d3958990897f..783451b256ed98361b455aea2e3e918d3ed02f76 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -91,6 +91,17 @@ class Net : public Stream { private: bool send(const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&); + /** Build with -DDEBUG_NETSTREAM to enable send/recv asserts on packet (timestamp) order. */ + #ifdef DEBUG_NETSTREAM + std::map<std::pair<ftl::protocol::FrameID, ftl::protocol::Channel>, int64_t> dbg_send_; + std::map<std::pair<ftl::protocol::FrameID, ftl::protocol::Channel>, int64_t> dbg_recv_; + std::mutex dbg_mtx_send_; + std::mutex dbg_mtx_recv_; + #endif + + bool net_send_(const ftl::UUID &pid, const std::string &name, int16_t ttimeoff, const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&); + bool net_send_(ftl::net::PeerBase* peer, const std::string &name, int16_t ttimeoff, const ftl::protocol::StreamPacket&, const ftl::protocol::DataPacket&); + SHARED_MUTEX mutex_; bool active_ = false; ftl::net::Universe *net_; diff --git a/src/universe.cpp b/src/universe.cpp index ad39105f797e995cc8cda90c97f5729e8cb1b374..8b81859f4cdfd25fdc11b8412802eb0469f5aa5d 100644 --- a/src/universe.cpp +++ b/src/universe.cpp @@ -489,6 +489,9 @@ void Universe::_periodic() { txBytes_ = 0; stats_lastTS_ = now; +#ifdef HAVE_MSQUIC +#endif + auto i = reconnects_.begin(); while (i != reconnects_.end()) { std::string addr = i->peer->getURI(); @@ -552,6 +555,7 @@ void Universe::_garbage() { } void Universe::__start(Universe *u) { + set_thread_name("net/universe"); #ifndef WIN32 // TODO(Seb): move somewhere else (common initialization file?) signal(SIGPIPE, SIG_IGN);