From 30a99e5bfb0a5db503e28ac4de2c3a5ce66d8a7f Mon Sep 17 00:00:00 2001
From: Sebastian Hahta <joseha@utu.fi>
Date: Wed, 7 Jun 2023 14:37:40 +0300
Subject: [PATCH] apply default socket buffer sizes

---
 src/peer.cpp              | 21 ++++++++++++
 src/streams/netstream.cpp | 72 ++++++++++++++++++++++++++-------------
 src/streams/netstream.hpp |  2 ++
 src/universe.cpp          | 10 +++---
 4 files changed, 78 insertions(+), 27 deletions(-)

diff --git a/src/peer.cpp b/src/peer.cpp
index 5dca8e1..5285ceb 100644
--- a/src/peer.cpp
+++ b/src/peer.cpp
@@ -136,6 +136,17 @@ void Peer::_bind_rpc() {
     });
 }
 
+void init_profiler() {
+    // call once if profiler is enabled to configure plots
+    #ifdef TRACY_ENABLE
+    [[maybe_unused]] static bool init = [](){
+        TracyPlotConfig("rx", tracy::PlotFormatType::Memory, false, true, 0xff0000);
+        TracyPlotConfig("tx", tracy::PlotFormatType::Memory, false, true, 0xff0000);
+        return true;
+    }();
+    #endif
+}
+
 Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatcher* d) :
         outgoing_(false),
         local_id_(0),
@@ -152,6 +163,7 @@ Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatche
     _updateURI();
     _bind_rpc();
     ++net_->peer_instances_;
+    init_profiler();
 }
 
 Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) :
@@ -161,12 +173,14 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) :
         status_(NodeStatus::kInvalid),
         can_reconnect_(true),
         net_(u),
+        sock_(nullptr),
         disp_(std::make_unique<Dispatcher>(d)) {
     /* Outgoing connection constructor */
 
     _bind_rpc();
     _connect();
     ++net_->peer_instances_;
+    init_profiler();
 }
 
 void Peer::start() {
@@ -339,6 +353,9 @@ void Peer::data() {
     }
 
     net_->rxBytes_ += rc;
+    #ifdef TRACY_ENABLE
+    TracyPlot("rx", double(rc));
+    #endif
 
     // May possibly need locking
     recv_buf_.buffer_consumed(rc);
@@ -579,6 +596,10 @@ int Peer::_send() {
     }
 
     net_->txBytes_ += c;
+    #ifdef TRACY_ENABLE
+    TracyPlot("tx", double(c));
+    #endif
+
     return c;
 }
 
diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
index 36e4470..e95a773 100644
--- a/src/streams/netstream.cpp
+++ b/src/streams/netstream.cpp
@@ -17,6 +17,8 @@
 #include "../uuidMSGPACK.hpp"
 #include "packetMsgpack.hpp"
 
+#include <ftl/profiler.hpp>
+
 #define LOGURU_REPLACE_GLOG 1
 #include <ftl/lib/loguru.hpp>
 
@@ -102,6 +104,8 @@ Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) :
     }
     base_uri_ = u.getBaseURI();
 
+    // callbacks for processing bound in begin()
+
     if (host_) {
         // Automatically set name
         name_.resize(1024);
@@ -285,6 +289,13 @@ Net::FrameState *Net::_getFrameState(FrameID id) {
     return p;
 }
 
+// TODO: Net::_run should be split to smaller functions and this wait code added at the end of the loop.
+void waitUntilNext(bool hasNext, int64_t nextTs) {
+    auto used = ftl::time::get_time();
+    int64_t spare = (hasNext) ? nextTs - used : 10;
+    sleep_for(milliseconds(std::max(int64_t(1), spare)));
+}
+
 void Net::_run() {
     thread_ = std::thread([this]() {
 #ifdef WIN32
@@ -292,28 +303,27 @@ void Net::_run() {
 #endif
         while (active_) {
             auto now = ftl::time::get_time();
-            int64_t nextTs = now + 200;
-            int activeStates = 0;
+            int64_t nextTs = now + 200; // FIXME: hardcoded value
             bool hasNext = false;
 
-            // For every state
+            // For every state (frame in framset)
             SHARED_LOCK(statesMtx_, lk);
             for (auto &s : frameStates_) {
                 auto *state = s.second.get();
-                ++activeStates;
 
                 if (state->active > 0) {
+                    // Previous task in thread pool still running; not a problem depending on cause
                     LOG(WARNING) << "Previous frame still processing: " << nextTs;
                     nextTs = now + 1;
-                    hasNext = true;
-                    continue;
+
+                    waitUntilNext(true, nextTs);
+                    continue; // busy loop? bug?
                 }
 
+                // Current timestamp: offset from first packet
                 int64_t cts = now - state->base_local_ts_;
 
-                // If there are any packets that should be dispatched
-                // Then create a thread for each and do it.
-                //if (state->active == 0) {
+                // Dispatch pending packets to worker thread
                 {
                     SHARED_LOCK(state->mtx, lk2);
 
@@ -322,27 +332,31 @@ void Net::_run() {
                         int64_t raw_ats = *state->timestamps.begin();
                         // state->timestamps.erase(state->timestamps.begin());
 
+                        // First frame: save local and packet timestmaps and update current ts
                         if (state->base_local_ts_ == 0) {
                             state->base_local_ts_ = now;
                             state->base_pkt_ts_ = raw_ats;
                             cts = 0;
                         }
+                        // Time from first packet + buffer
                         ats = raw_ats - state->base_pkt_ts_ + buffering_; 
                     } else {
                         // LOG(WARNING) << "No packets to present: " << cts;
                         continue;
                     }
 
-                    size_t size = state->buffer.size();
+                    size_t buffer_size = state->buffer.size();
                     lk2.unlock();
 
-                    // Not ready to display this one yet.
+                    // Not ready to display this one yet (timestamp within buffer)
                     if (ats > cts) {
+                         // ??? isn't this always true when buffering is large enough?
                         if (ats - cts > 100) {
                             ++drops_;
                         }
                         nextTs = std::min(nextTs, ats + state->base_local_ts_);
-                        hasNext = true;
+
+                        waitUntilNext(true, nextTs);
                         continue;
                     }
 
@@ -350,10 +364,11 @@ void Net::_run() {
 
                     std::list<PacketBuffer*> framePackets;
 
-                    for (size_t i = 0; i < size; ++i) {
+                    for (size_t i = 0; i < buffer_size; ++i) {
+                        // Relative packet timestamp + buffering
                         int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_;
 
-                        // Should the packet be dispatched yet
+                        // Should the packet be dispatched yet (collect packets for next unprocessed frame)
                         if (pts == ats && !current->done) {
                             framePackets.push_back(&(*current));
 
@@ -365,11 +380,14 @@ void Net::_run() {
                         ++current;
                     }
 
+                    // Process all packets for this frame in a different thread
                     ftl::pool.push([
-                            this,
-                            c = std::move(ftl::Counter(&state->active)),
-                            c2 = std::move(ftl::Counter(&jobs_)),
-                            framePackets](int ix) {
+                                this,
+                                c = std::move(ftl::Counter(&state->active)),
+                                c2 = std::move(ftl::Counter(&jobs_)),
+                                framePackets](int ix) {
+
+                        FTL_PROFILE_SCOPE("Frame Received");
                         for (auto buf : framePackets) {
                             StreamPacket *spkt;
                             DataPacket *pkt;
@@ -382,25 +400,30 @@ void Net::_run() {
                             } catch (const std::exception &e) {
                                 LOG(ERROR) << "Packet processing error: " << e.what();
                             }
+                            // Mark for removal
                             buf->done = true;
                         }
                     });
                 }
 
                 {
+                    // Upgrade to write lock
                     UNIQUE_LOCK(state->mtx, lk2);
                     state->timestamps.erase(state->timestamps.begin());
 
                     if (state->timestamps.size() > 0) {
                         int64_t nts = *state->timestamps.begin();
+                        // Next timestap in local clock time
                         nts = nts - state->base_pkt_ts_ + buffering_ + state->base_local_ts_;
                         nextTs = std::min(nextTs, nts);
                         hasNext = true;
                     } else {
+                        // No pending packets remain in the input buffer
                         LOG(WARNING) << "Buffer underun " << now;
                         ++underuns_;
                     }
 
+                    // Remove already processed packets.
                     auto it = state->buffer.begin();
                     while (it != state->buffer.end()) {
                         if (it->done) {
@@ -413,9 +436,7 @@ void Net::_run() {
             }
             lk.unlock();
 
-            auto used = ftl::time::get_time();
-            int64_t spare = (hasNext) ? nextTs - used : 10;
-            sleep_for(milliseconds(std::max(int64_t(1), spare)));
+            waitUntilNext(hasNext, nextTs);
         }
 #ifdef WIN32
         timeEndPeriod(5);
@@ -442,7 +463,7 @@ bool Net::begin() {
 
     // FIXME: Potential race between above check and new binding
 
-    // Add the RPC handler for the URI
+    // Add the RPC handler for the URI (called by Peer::_data())
     net_->bind(base_uri_, [this](
             ftl::net::Peer &p,
             int16_t ttimeoff,
@@ -453,16 +474,21 @@ bool Net::begin() {
         _earlyProcessPacket(&p, ttimeoff, spkt_raw, pkt);
 
         if (!host_) {
+            // not hosted: buffer packets (processed in separate thread Net::_run())
             UNIQUE_LOCK(state->mtx, lk);
             state->timestamps.insert(spkt_raw.timestamp);
+
             // TODO(Nick): This buffer could be faster?
             auto &buf = state->buffer.emplace_back();
             buf.packets.first = spkt_raw;
             buf.packets.first.hint_peerid = p.localID();
             buf.packets.second = std::move(pkt);
+            
             buf.peer = nullptr;
             buf.done = false;
+
         } else {
+            // process immediately
             _processPacket(&p, ttimeoff, spkt_raw, pkt);
         }
     });
@@ -474,7 +500,7 @@ bool Net::begin() {
             // Add to list of available streams
             UNIQUE_LOCK(stream_mutex, lk);
             net_streams.push_back(uri_);
-        }
+        }  
 
         active_ = true;
         net_->broadcast("add_stream", uri_);
diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp
index fb2db0e..0811ebc 100644
--- a/src/streams/netstream.hpp
+++ b/src/streams/netstream.hpp
@@ -151,6 +151,8 @@ class Net : public Stream {
     void _cleanUp();
     void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
     void _earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
+
+    // processing loop for non-hosted netstreams (runs in dedicated thread)
     void _run();
 };
 
diff --git a/src/universe.cpp b/src/universe.cpp
index 57af06a..70004a1 100644
--- a/src/universe.cpp
+++ b/src/universe.cpp
@@ -15,6 +15,7 @@
 
 #define LOGURU_REPLACE_GLOG 1
 #include <ftl/lib/loguru.hpp>
+#include <ftl/profiler.hpp>
 
 #include <ftl/time.hpp>
 
@@ -131,14 +132,15 @@ size_t Universe::getRecvBufferSize(ftl::URI::scheme_t s) {
 }
 
 void Universe::setSendBufferSize(ftl::URI::scheme_t s, size_t size) {
+    if (s == 0) return;
     switch (s) {
         case ftl::URI::scheme_t::SCHEME_WS:
         case ftl::URI::scheme_t::SCHEME_WSS:
-            ws_send_buffer_ = size;
+            ws_send_buffer_ = (size > 0) ? size : WS_SEND_BUFFER_SIZE;;
             break;
 
         default:
-            tcp_send_buffer_ = size;
+            tcp_send_buffer_ = (size > 0) ? size : TCP_SEND_BUFFER_SIZE;;
     }
 }
 
@@ -146,10 +148,10 @@ void Universe::setRecvBufferSize(ftl::URI::scheme_t s, size_t size) {
     switch (s) {
         case ftl::URI::scheme_t::SCHEME_WS:
         case ftl::URI::scheme_t::SCHEME_WSS:
-            ws_recv_buffer_ = size;
+            ws_recv_buffer_ = (size > 0) ? size : WS_RECEIVE_BUFFER_SIZE;
             break;
         default:
-            tcp_recv_buffer_ = size;
+            tcp_recv_buffer_ = (size > 0) ? size : TCP_RECEIVE_BUFFER_SIZE;
     }
 }
 
-- 
GitLab