diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index 8956d670862efaace0f0bd71cbc0cf5cee5b48fb..2accc9174e2c3aca223f82b41e959d476c2bb576 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -298,92 +298,108 @@ void Net::_run() { while (active_) { auto now = ftl::time::get_time(); int64_t nextTs = now + 20; + int activeStates = 0; // For every state SHARED_LOCK(statesMtx_, lk); for (auto &s : frameStates_) { auto *state = s.second.get(); + ++activeStates; + + if (state->active > 0) { + LOG(WARNING) << "Previous frame still processing: " << nextTs; + nextTs = now + 1; + continue; + } //if (state->base_local_ts_ == 0) continue; int64_t cts = now - state->base_local_ts_; bool hasNext = false; - bool hasDone = false; + + { + UNIQUE_LOCK(state->mtx, lk); + auto it = state->buffer.begin(); + while (it != state->buffer.end()) { + if (it->done) { + it = state->buffer.erase(it); + } else { + break; + } + } + } // If there are any packets that should be dispatched // Then create a thread for each and do it. //if (state->active == 0) { { SHARED_LOCK(state->mtx, lk2); + + int64_t ats = 0; + + if (state->buffer.size() > 0) { + // LOG(INFO) << "Buffer size = " << state->buffer.size(); + auto &front = state->buffer.front(); + + if (state->base_local_ts_ == 0) { + state->base_local_ts_ = now; + state->base_pkt_ts_ = front.packets.first.timestamp; + cts = 0; + } + + int64_t pts = front.packets.first.timestamp - state->base_pkt_ts_ + buffering_; + + if (pts <= cts) { + LOG(INFO) << "Presentation error = " << (cts - pts); + ats = pts; + } + } + auto current = state->buffer.begin(); while (current != state->buffer.end()) { lk2.unlock(); - if (!current->done) { - if (state->base_local_ts_ == 0) { - state->base_local_ts_ = now; - state->base_pkt_ts_ = current->packets.first.timestamp; - cts = 0; - } - - int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; - - // Should the packet be dispatched yet - if (state->active == 0 && pts <= cts) { - cts = pts; // Prevent multi frames. - - StreamPacket *spkt; - DataPacket *pkt; - - spkt = ¤t->packets.first; - pkt = ¤t->packets.second; - - ++state->active; - ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { - _processPacket(buf->peer, 0, *spkt, *pkt); - buf->done = true; - --state->active; - }); - } else { - int64_t next = pts + state->base_local_ts_; - nextTs = std::min(nextTs, next); - hasNext = true; - break; - } + + int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; + + // Should the packet be dispatched yet + if (pts == ats) { + + StreamPacket *spkt; + DataPacket *pkt; + + spkt = ¤t->packets.first; + pkt = ¤t->packets.second; + + ++state->active; + ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { + _processPacket(buf->peer, 0, *spkt, *pkt); + buf->done = true; + --state->active; + }); } else { - hasDone = true; + int64_t next = pts + state->base_local_ts_; + nextTs = std::min(nextTs, next); + hasNext = true; + break; } lk2.lock(); ++current; } } - /*} else { - LOG(WARNING) << "Already active"; - }*/ if (!hasNext) { nextTs = std::min(nextTs, now + 10); // TODO: Also, increase buffering - } - - // Remove consumed packets. - if (hasDone) { - UNIQUE_LOCK(state->mtx, lk); - auto it = state->buffer.begin(); - while (it != state->buffer.end()) { - if (it->done) { - it = state->buffer.erase(it); - } else { - break; - } - } + LOG(WARNING) << "Buffer underun " << nextTs; } } lk.unlock(); auto used = ftl::time::get_time(); int64_t spare = nextTs - used; + if (activeStates > 0) LOG(INFO) << "Sleeping for " << spare; sleep_for(milliseconds(std::max(int64_t(1), spare))); } #ifdef WIN32