From 391c2cee96b2a5dd0bceb7982c4de22248bd5290 Mon Sep 17 00:00:00 2001 From: Nicolas Pope <nicolas.pope@utu.fi> Date: Tue, 8 Nov 2022 09:56:15 +0000 Subject: [PATCH] Change net stream buffer mechanism --- src/streams/netstream.cpp | 29 +++++++++++++++++++++++------ src/streams/netstream.hpp | 2 +- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index c6f596e..8956d67 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -309,12 +309,16 @@ void Net::_run() { int64_t cts = now - state->base_local_ts_; bool hasNext = false; + bool hasDone = false; // If there are any packets that should be dispatched // Then create a thread for each and do it. - if (state->active == 0) { + //if (state->active == 0) { + { + SHARED_LOCK(state->mtx, lk2); auto current = state->buffer.begin(); while (current != state->buffer.end()) { + lk2.unlock(); if (!current->done) { if (state->base_local_ts_ == 0) { state->base_local_ts_ = now; @@ -325,7 +329,7 @@ void Net::_run() { int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; // Should the packet be dispatched yet - if (pts <= cts) { + if (state->active == 0 && pts <= cts) { cts = pts; // Prevent multi frames. StreamPacket *spkt; @@ -346,13 +350,17 @@ void Net::_run() { hasNext = true; break; } + } else { + hasDone = true; } + lk2.lock(); ++current; } - } else { - LOG(WARNING) << "Already active"; } + /*} else { + LOG(WARNING) << "Already active"; + }*/ if (!hasNext) { nextTs = std::min(nextTs, now + 10); @@ -360,8 +368,17 @@ void Net::_run() { } // Remove consumed packets. - UNIQUE_LOCK(state->mtx, lk2); - state->buffer.remove_if([](const PacketBuffer &i) { return static_cast<bool>(i.done); }); + if (hasDone) { + UNIQUE_LOCK(state->mtx, lk); + auto it = state->buffer.begin(); + while (it != state->buffer.end()) { + if (it->done) { + it = state->buffer.erase(it); + } else { + break; + } + } + } } lk.unlock(); diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index 95a4db1..4f5fd83 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -119,7 +119,7 @@ class Net : public Stream { struct FrameState { ftl::protocol::FrameID id; std::atomic_int active = 0; - MUTEX mtx; + SHARED_MUTEX mtx; std::list<PacketBuffer> buffer; int64_t base_pkt_ts_ = 0; int64_t base_local_ts_ = 0; -- GitLab