Skip to content
Snippets Groups Projects
Commit 30a99e5b authored by Sebastian Hahta's avatar Sebastian Hahta
Browse files

apply default socket buffer sizes

parent 5eeef5a7
No related branches found
No related tags found
No related merge requests found
......@@ -136,6 +136,17 @@ void Peer::_bind_rpc() {
});
}
void init_profiler() {
// call once if profiler is enabled to configure plots
#ifdef TRACY_ENABLE
[[maybe_unused]] static bool init = [](){
TracyPlotConfig("rx", tracy::PlotFormatType::Memory, false, true, 0xff0000);
TracyPlotConfig("tx", tracy::PlotFormatType::Memory, false, true, 0xff0000);
return true;
}();
#endif
}
Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatcher* d) :
outgoing_(false),
local_id_(0),
......@@ -152,6 +163,7 @@ Peer::Peer(std::unique_ptr<internal::SocketConnection> s, Universe* u, Dispatche
_updateURI();
_bind_rpc();
++net_->peer_instances_;
init_profiler();
}
Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) :
......@@ -161,12 +173,14 @@ Peer::Peer(const ftl::URI& uri, Universe *u, Dispatcher *d) :
status_(NodeStatus::kInvalid),
can_reconnect_(true),
net_(u),
sock_(nullptr),
disp_(std::make_unique<Dispatcher>(d)) {
/* Outgoing connection constructor */
_bind_rpc();
_connect();
++net_->peer_instances_;
init_profiler();
}
void Peer::start() {
......@@ -339,6 +353,9 @@ void Peer::data() {
}
net_->rxBytes_ += rc;
#ifdef TRACY_ENABLE
TracyPlot("rx", double(rc));
#endif
// May possibly need locking
recv_buf_.buffer_consumed(rc);
......@@ -579,6 +596,10 @@ int Peer::_send() {
}
net_->txBytes_ += c;
#ifdef TRACY_ENABLE
TracyPlot("tx", double(c));
#endif
return c;
}
......
......@@ -17,6 +17,8 @@
#include "../uuidMSGPACK.hpp"
#include "packetMsgpack.hpp"
#include <ftl/profiler.hpp>
#define LOGURU_REPLACE_GLOG 1
#include <ftl/lib/loguru.hpp>
......@@ -102,6 +104,8 @@ Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) :
}
base_uri_ = u.getBaseURI();
// callbacks for processing bound in begin()
if (host_) {
// Automatically set name
name_.resize(1024);
......@@ -285,6 +289,13 @@ Net::FrameState *Net::_getFrameState(FrameID id) {
return p;
}
// TODO: Net::_run should be split to smaller functions and this wait code added at the end of the loop.
void waitUntilNext(bool hasNext, int64_t nextTs) {
auto used = ftl::time::get_time();
int64_t spare = (hasNext) ? nextTs - used : 10;
sleep_for(milliseconds(std::max(int64_t(1), spare)));
}
void Net::_run() {
thread_ = std::thread([this]() {
#ifdef WIN32
......@@ -292,28 +303,27 @@ void Net::_run() {
#endif
while (active_) {
auto now = ftl::time::get_time();
int64_t nextTs = now + 200;
int activeStates = 0;
int64_t nextTs = now + 200; // FIXME: hardcoded value
bool hasNext = false;
// For every state
// For every state (frame in framset)
SHARED_LOCK(statesMtx_, lk);
for (auto &s : frameStates_) {
auto *state = s.second.get();
++activeStates;
if (state->active > 0) {
// Previous task in thread pool still running; not a problem depending on cause
LOG(WARNING) << "Previous frame still processing: " << nextTs;
nextTs = now + 1;
hasNext = true;
continue;
waitUntilNext(true, nextTs);
continue; // busy loop? bug?
}
// Current timestamp: offset from first packet
int64_t cts = now - state->base_local_ts_;
// If there are any packets that should be dispatched
// Then create a thread for each and do it.
//if (state->active == 0) {
// Dispatch pending packets to worker thread
{
SHARED_LOCK(state->mtx, lk2);
......@@ -322,27 +332,31 @@ void Net::_run() {
int64_t raw_ats = *state->timestamps.begin();
// state->timestamps.erase(state->timestamps.begin());
// First frame: save local and packet timestmaps and update current ts
if (state->base_local_ts_ == 0) {
state->base_local_ts_ = now;
state->base_pkt_ts_ = raw_ats;
cts = 0;
}
// Time from first packet + buffer
ats = raw_ats - state->base_pkt_ts_ + buffering_;
} else {
// LOG(WARNING) << "No packets to present: " << cts;
continue;
}
size_t size = state->buffer.size();
size_t buffer_size = state->buffer.size();
lk2.unlock();
// Not ready to display this one yet.
// Not ready to display this one yet (timestamp within buffer)
if (ats > cts) {
// ??? isn't this always true when buffering is large enough?
if (ats - cts > 100) {
++drops_;
}
nextTs = std::min(nextTs, ats + state->base_local_ts_);
hasNext = true;
waitUntilNext(true, nextTs);
continue;
}
......@@ -350,10 +364,11 @@ void Net::_run() {
std::list<PacketBuffer*> framePackets;
for (size_t i = 0; i < size; ++i) {
for (size_t i = 0; i < buffer_size; ++i) {
// Relative packet timestamp + buffering
int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_;
// Should the packet be dispatched yet
// Should the packet be dispatched yet (collect packets for next unprocessed frame)
if (pts == ats && !current->done) {
framePackets.push_back(&(*current));
......@@ -365,11 +380,14 @@ void Net::_run() {
++current;
}
// Process all packets for this frame in a different thread
ftl::pool.push([
this,
c = std::move(ftl::Counter(&state->active)),
c2 = std::move(ftl::Counter(&jobs_)),
framePackets](int ix) {
FTL_PROFILE_SCOPE("Frame Received");
for (auto buf : framePackets) {
StreamPacket *spkt;
DataPacket *pkt;
......@@ -382,25 +400,30 @@ void Net::_run() {
} catch (const std::exception &e) {
LOG(ERROR) << "Packet processing error: " << e.what();
}
// Mark for removal
buf->done = true;
}
});
}
{
// Upgrade to write lock
UNIQUE_LOCK(state->mtx, lk2);
state->timestamps.erase(state->timestamps.begin());
if (state->timestamps.size() > 0) {
int64_t nts = *state->timestamps.begin();
// Next timestap in local clock time
nts = nts - state->base_pkt_ts_ + buffering_ + state->base_local_ts_;
nextTs = std::min(nextTs, nts);
hasNext = true;
} else {
// No pending packets remain in the input buffer
LOG(WARNING) << "Buffer underun " << now;
++underuns_;
}
// Remove already processed packets.
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
......@@ -413,9 +436,7 @@ void Net::_run() {
}
lk.unlock();
auto used = ftl::time::get_time();
int64_t spare = (hasNext) ? nextTs - used : 10;
sleep_for(milliseconds(std::max(int64_t(1), spare)));
waitUntilNext(hasNext, nextTs);
}
#ifdef WIN32
timeEndPeriod(5);
......@@ -442,7 +463,7 @@ bool Net::begin() {
// FIXME: Potential race between above check and new binding
// Add the RPC handler for the URI
// Add the RPC handler for the URI (called by Peer::_data())
net_->bind(base_uri_, [this](
ftl::net::Peer &p,
int16_t ttimeoff,
......@@ -453,16 +474,21 @@ bool Net::begin() {
_earlyProcessPacket(&p, ttimeoff, spkt_raw, pkt);
if (!host_) {
// not hosted: buffer packets (processed in separate thread Net::_run())
UNIQUE_LOCK(state->mtx, lk);
state->timestamps.insert(spkt_raw.timestamp);
// TODO(Nick): This buffer could be faster?
auto &buf = state->buffer.emplace_back();
buf.packets.first = spkt_raw;
buf.packets.first.hint_peerid = p.localID();
buf.packets.second = std::move(pkt);
buf.peer = nullptr;
buf.done = false;
} else {
// process immediately
_processPacket(&p, ttimeoff, spkt_raw, pkt);
}
});
......
......@@ -151,6 +151,8 @@ class Net : public Stream {
void _cleanUp();
void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
void _earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
// processing loop for non-hosted netstreams (runs in dedicated thread)
void _run();
};
......
......@@ -15,6 +15,7 @@
#define LOGURU_REPLACE_GLOG 1
#include <ftl/lib/loguru.hpp>
#include <ftl/profiler.hpp>
#include <ftl/time.hpp>
......@@ -131,14 +132,15 @@ size_t Universe::getRecvBufferSize(ftl::URI::scheme_t s) {
}
void Universe::setSendBufferSize(ftl::URI::scheme_t s, size_t size) {
if (s == 0) return;
switch (s) {
case ftl::URI::scheme_t::SCHEME_WS:
case ftl::URI::scheme_t::SCHEME_WSS:
ws_send_buffer_ = size;
ws_send_buffer_ = (size > 0) ? size : WS_SEND_BUFFER_SIZE;;
break;
default:
tcp_send_buffer_ = size;
tcp_send_buffer_ = (size > 0) ? size : TCP_SEND_BUFFER_SIZE;;
}
}
......@@ -146,10 +148,10 @@ void Universe::setRecvBufferSize(ftl::URI::scheme_t s, size_t size) {
switch (s) {
case ftl::URI::scheme_t::SCHEME_WS:
case ftl::URI::scheme_t::SCHEME_WSS:
ws_recv_buffer_ = size;
ws_recv_buffer_ = (size > 0) ? size : WS_RECEIVE_BUFFER_SIZE;
break;
default:
tcp_recv_buffer_ = size;
tcp_recv_buffer_ = (size > 0) ? size : TCP_RECEIVE_BUFFER_SIZE;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment