diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 7a1cb03556814767a9d1cd42f291f5784c7c3df9..4c809d8a7aa430b9c4607a9310281f20ce424852 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -313,65 +313,46 @@ void Net::_run() { continue; } - //if (state->base_local_ts_ == 0) continue; - int64_t cts = now - state->base_local_ts_; - bool hasLocalNext = false; - - { - UNIQUE_LOCK(state->mtx, lk2); - auto it = state->buffer.begin(); - while (it != state->buffer.end()) { - if (it->done) { - it = state->buffer.erase(it); - } else { - break; - } - } - } - // If there are any packets that should be dispatched // Then create a thread for each and do it. //if (state->active == 0) { { SHARED_LOCK(state->mtx, lk2); - int64_t ats = 0; - - size_t size = state->buffer.size(); - lk2.unlock(); - - if (size > 0) { - // LOG(INFO) << "Buffer size = " << state->buffer.size(); - auto &front = state->buffer.front(); + int64_t ats = -1; + if (state->timestamps.size() > 0) { + int64_t raw_ats = *state->timestamps.begin(); + // state->timestamps.erase(state->timestamps.begin()); if (state->base_local_ts_ == 0) { state->base_local_ts_ = now; - state->base_pkt_ts_ = front.packets.first.timestamp; + state->base_pkt_ts_ = raw_ats; cts = 0; } - - int64_t pts = front.packets.first.timestamp - state->base_pkt_ts_ + buffering_; - - if (pts <= cts) { - LOG(INFO) << "Presentation error = " << (cts - pts); - ats = pts; - } else { - LOG(INFO) << "Next presentation in: " << (pts - cts); - } + ats = raw_ats - state->base_pkt_ts_ + buffering_; + } else { + LOG(WARNING) << "No packets to present: " << cts; + continue; } - if (size == 0) { - LOG(WARNING) << "No packets to present: " << cts; + size_t size = state->buffer.size(); + lk2.unlock(); + + // Not ready to display this one yet. + if (ats > cts) { + LOG(INFO) << "Next presentation in: " << (ats - cts); + nextTs = std::min(nextTs, ats + state->base_local_ts_); + hasNext = true; continue; + } else { + LOG(INFO) << "Presentation error = " << (cts - ats); } auto current = state->buffer.begin(); - bool seenEnd = false; + for (size_t i = 0; i < size; ++i) { - // lk2.unlock(); - int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; // Should the packet be dispatched yet @@ -382,35 +363,43 @@ void Net::_run() { spkt = ¤t->packets.first; pkt = ¤t->packets.second; - if (spkt->channel == Channel::kEndFrame) { - seenEnd = true; - } - ++state->active; ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { _processPacket(buf->peer, 0, *spkt, *pkt); buf->done = true; --state->active; }); - } else { - int64_t next = pts + state->base_local_ts_; - nextTs = std::min(nextTs, next); - hasLocalNext = true; - hasNext = true; - if (seenEnd) { + + if (spkt->channel == Channel::kEndFrame) { break; } } - // lk2.lock(); ++current; } } - if (!hasLocalNext) { - // nextTs = std::min(nextTs, now + 10); - // TODO: Also, increase buffering - LOG(WARNING) << "Buffer underun " << now; + { + UNIQUE_LOCK(state->mtx, lk2); + state->timestamps.erase(state->timestamps.begin()); + + if (state->timestamps.size() > 0) { + int64_t nts = *state->timestamps.begin(); + nts = nts - state->base_pkt_ts_ + buffering_ + state->base_local_ts_; + nextTs = std::min(nextTs, nts); + hasNext = true; + } else { + LOG(WARNING) << "Buffer underun " << now; + } + + auto it = state->buffer.begin(); + while (it != state->buffer.end()) { + if (it->done) { + it = state->buffer.erase(it); + } else { + break; + } + } } } lk.unlock(); @@ -455,6 +444,7 @@ bool Net::begin() { auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number)); if (!host_) { UNIQUE_LOCK(state->mtx, lk); + state->timestamps.insert(spkt_raw.timestamp); // TODO(Nick): This buffer could be faster? auto &buf = state->buffer.emplace_back(); buf.packets.first = spkt_raw; diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index 4f5fd835f02ec2b6d48110db2ca6d063bf1c919c..a0064388bcc52b7e664c285d37f66a39f95b0377 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -10,6 +10,7 @@ #include <list> #include <atomic> #include <unordered_map> +#include <set> #include "../universe.hpp" #include <ftl/threads.hpp> #include <ftl/protocol/packet.hpp> @@ -121,6 +122,7 @@ class Net : public Stream { std::atomic_int active = 0; SHARED_MUTEX mtx; std::list<PacketBuffer> buffer; + std::set<int64_t> timestamps; int64_t base_pkt_ts_ = 0; int64_t base_local_ts_ = 0; };