Skip to content
Snippets Groups Projects
Commit a78ab7e8 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Add debugging output to stream buffering

parent e68febf7
Branches
Tags
No related merge requests found
...@@ -298,39 +298,72 @@ void Net::_run() { ...@@ -298,39 +298,72 @@ void Net::_run() {
while (active_) { while (active_) {
auto now = ftl::time::get_time(); auto now = ftl::time::get_time();
int64_t nextTs = now + 20; int64_t nextTs = now + 20;
int activeStates = 0;
// For every state // For every state
SHARED_LOCK(statesMtx_, lk); SHARED_LOCK(statesMtx_, lk);
for (auto &s : frameStates_) { for (auto &s : frameStates_) {
auto *state = s.second.get(); auto *state = s.second.get();
++activeStates;
if (state->active > 0) {
LOG(WARNING) << "Previous frame still processing: " << nextTs;
nextTs = now + 1;
continue;
}
//if (state->base_local_ts_ == 0) continue; //if (state->base_local_ts_ == 0) continue;
int64_t cts = now - state->base_local_ts_; int64_t cts = now - state->base_local_ts_;
bool hasNext = false; bool hasNext = false;
bool hasDone = false;
{
UNIQUE_LOCK(state->mtx, lk);
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
}
// If there are any packets that should be dispatched // If there are any packets that should be dispatched
// Then create a thread for each and do it. // Then create a thread for each and do it.
//if (state->active == 0) { //if (state->active == 0) {
{ {
SHARED_LOCK(state->mtx, lk2); SHARED_LOCK(state->mtx, lk2);
auto current = state->buffer.begin();
while (current != state->buffer.end()) { int64_t ats = 0;
lk2.unlock();
if (!current->done) { if (state->buffer.size() > 0) {
// LOG(INFO) << "Buffer size = " << state->buffer.size();
auto &front = state->buffer.front();
if (state->base_local_ts_ == 0) { if (state->base_local_ts_ == 0) {
state->base_local_ts_ = now; state->base_local_ts_ = now;
state->base_pkt_ts_ = current->packets.first.timestamp; state->base_pkt_ts_ = front.packets.first.timestamp;
cts = 0; cts = 0;
} }
int64_t pts = front.packets.first.timestamp - state->base_pkt_ts_ + buffering_;
if (pts <= cts) {
LOG(INFO) << "Presentation error = " << (cts - pts);
ats = pts;
}
}
auto current = state->buffer.begin();
while (current != state->buffer.end()) {
lk2.unlock();
int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_;
// Should the packet be dispatched yet // Should the packet be dispatched yet
if (state->active == 0 && pts <= cts) { if (pts == ats) {
cts = pts; // Prevent multi frames.
StreamPacket *spkt; StreamPacket *spkt;
DataPacket *pkt; DataPacket *pkt;
...@@ -350,40 +383,23 @@ void Net::_run() { ...@@ -350,40 +383,23 @@ void Net::_run() {
hasNext = true; hasNext = true;
break; break;
} }
} else {
hasDone = true;
}
lk2.lock(); lk2.lock();
++current; ++current;
} }
} }
/*} else {
LOG(WARNING) << "Already active";
}*/
if (!hasNext) { if (!hasNext) {
nextTs = std::min(nextTs, now + 10); nextTs = std::min(nextTs, now + 10);
// TODO: Also, increase buffering // TODO: Also, increase buffering
} LOG(WARNING) << "Buffer underun " << nextTs;
// Remove consumed packets.
if (hasDone) {
UNIQUE_LOCK(state->mtx, lk);
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
} }
} }
lk.unlock(); lk.unlock();
auto used = ftl::time::get_time(); auto used = ftl::time::get_time();
int64_t spare = nextTs - used; int64_t spare = nextTs - used;
if (activeStates > 0) LOG(INFO) << "Sleeping for " << spare;
sleep_for(milliseconds(std::max(int64_t(1), spare))); sleep_for(milliseconds(std::max(int64_t(1), spare)));
} }
#ifdef WIN32 #ifdef WIN32
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment