diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index c6f596e5171297a1e857699e139a113be84364fe..8956d670862efaace0f0bd71cbc0cf5cee5b48fb 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -309,12 +309,16 @@ void Net::_run() { int64_t cts = now - state->base_local_ts_; bool hasNext = false; + bool hasDone = false; // If there are any packets that should be dispatched // Then create a thread for each and do it. - if (state->active == 0) { + //if (state->active == 0) { + { + SHARED_LOCK(state->mtx, lk2); auto current = state->buffer.begin(); while (current != state->buffer.end()) { + lk2.unlock(); if (!current->done) { if (state->base_local_ts_ == 0) { state->base_local_ts_ = now; @@ -325,7 +329,7 @@ void Net::_run() { int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; // Should the packet be dispatched yet - if (pts <= cts) { + if (state->active == 0 && pts <= cts) { cts = pts; // Prevent multi frames. StreamPacket *spkt; @@ -346,13 +350,17 @@ void Net::_run() { hasNext = true; break; } + } else { + hasDone = true; } + lk2.lock(); ++current; } - } else { - LOG(WARNING) << "Already active"; } + /*} else { + LOG(WARNING) << "Already active"; + }*/ if (!hasNext) { nextTs = std::min(nextTs, now + 10); @@ -360,8 +368,17 @@ void Net::_run() { } // Remove consumed packets. - UNIQUE_LOCK(state->mtx, lk2); - state->buffer.remove_if([](const PacketBuffer &i) { return static_cast<bool>(i.done); }); + if (hasDone) { + UNIQUE_LOCK(state->mtx, lk); + auto it = state->buffer.begin(); + while (it != state->buffer.end()) { + if (it->done) { + it = state->buffer.erase(it); + } else { + break; + } + } + } } lk.unlock(); diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index 95a4db10f74b973f850cc4737e032671b1bc9133..4f5fd835f02ec2b6d48110db2ca6d063bf1c919c 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -119,7 +119,7 @@ class Net : public Stream { struct FrameState { ftl::protocol::FrameID id; std::atomic_int active = 0; - MUTEX mtx; + SHARED_MUTEX mtx; std::list<PacketBuffer> buffer; int64_t base_pkt_ts_ = 0; int64_t base_local_ts_ = 0;