Skip to content
Snippets Groups Projects
Commit f9f26406 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Change to use ordered set of timestamps

parent 0e631ed1
Branches
Tags
No related merge requests found
...@@ -313,65 +313,46 @@ void Net::_run() { ...@@ -313,65 +313,46 @@ void Net::_run() {
continue; continue;
} }
//if (state->base_local_ts_ == 0) continue;
int64_t cts = now - state->base_local_ts_; int64_t cts = now - state->base_local_ts_;
bool hasLocalNext = false;
{
UNIQUE_LOCK(state->mtx, lk2);
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
}
// If there are any packets that should be dispatched // If there are any packets that should be dispatched
// Then create a thread for each and do it. // Then create a thread for each and do it.
//if (state->active == 0) { //if (state->active == 0) {
{ {
SHARED_LOCK(state->mtx, lk2); SHARED_LOCK(state->mtx, lk2);
int64_t ats = 0; int64_t ats = -1;
if (state->timestamps.size() > 0) {
size_t size = state->buffer.size(); int64_t raw_ats = *state->timestamps.begin();
lk2.unlock(); // state->timestamps.erase(state->timestamps.begin());
if (size > 0) {
// LOG(INFO) << "Buffer size = " << state->buffer.size();
auto &front = state->buffer.front();
if (state->base_local_ts_ == 0) { if (state->base_local_ts_ == 0) {
state->base_local_ts_ = now; state->base_local_ts_ = now;
state->base_pkt_ts_ = front.packets.first.timestamp; state->base_pkt_ts_ = raw_ats;
cts = 0; cts = 0;
} }
ats = raw_ats - state->base_pkt_ts_ + buffering_;
int64_t pts = front.packets.first.timestamp - state->base_pkt_ts_ + buffering_;
if (pts <= cts) {
LOG(INFO) << "Presentation error = " << (cts - pts);
ats = pts;
} else { } else {
LOG(INFO) << "Next presentation in: " << (pts - cts); LOG(WARNING) << "No packets to present: " << cts;
} continue;
} }
if (size == 0) { size_t size = state->buffer.size();
LOG(WARNING) << "No packets to present: " << cts; lk2.unlock();
// Not ready to display this one yet.
if (ats > cts) {
LOG(INFO) << "Next presentation in: " << (ats - cts);
nextTs = std::min(nextTs, ats + state->base_local_ts_);
hasNext = true;
continue; continue;
} else {
LOG(INFO) << "Presentation error = " << (cts - ats);
} }
auto current = state->buffer.begin(); auto current = state->buffer.begin();
bool seenEnd = false;
for (size_t i = 0; i < size; ++i) {
// lk2.unlock();
for (size_t i = 0; i < size; ++i) {
int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_;
// Should the packet be dispatched yet // Should the packet be dispatched yet
...@@ -382,36 +363,44 @@ void Net::_run() { ...@@ -382,36 +363,44 @@ void Net::_run() {
spkt = &current->packets.first; spkt = &current->packets.first;
pkt = &current->packets.second; pkt = &current->packets.second;
if (spkt->channel == Channel::kEndFrame) {
seenEnd = true;
}
++state->active; ++state->active;
ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) {
_processPacket(buf->peer, 0, *spkt, *pkt); _processPacket(buf->peer, 0, *spkt, *pkt);
buf->done = true; buf->done = true;
--state->active; --state->active;
}); });
} else {
int64_t next = pts + state->base_local_ts_; if (spkt->channel == Channel::kEndFrame) {
nextTs = std::min(nextTs, next);
hasLocalNext = true;
hasNext = true;
if (seenEnd) {
break; break;
} }
} }
// lk2.lock();
++current; ++current;
} }
} }
if (!hasLocalNext) { {
// nextTs = std::min(nextTs, now + 10); UNIQUE_LOCK(state->mtx, lk2);
// TODO: Also, increase buffering state->timestamps.erase(state->timestamps.begin());
if (state->timestamps.size() > 0) {
int64_t nts = *state->timestamps.begin();
nts = nts - state->base_pkt_ts_ + buffering_ + state->base_local_ts_;
nextTs = std::min(nextTs, nts);
hasNext = true;
} else {
LOG(WARNING) << "Buffer underun " << now; LOG(WARNING) << "Buffer underun " << now;
} }
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
}
} }
lk.unlock(); lk.unlock();
...@@ -455,6 +444,7 @@ bool Net::begin() { ...@@ -455,6 +444,7 @@ bool Net::begin() {
auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number)); auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number));
if (!host_) { if (!host_) {
UNIQUE_LOCK(state->mtx, lk); UNIQUE_LOCK(state->mtx, lk);
state->timestamps.insert(spkt_raw.timestamp);
// TODO(Nick): This buffer could be faster? // TODO(Nick): This buffer could be faster?
auto &buf = state->buffer.emplace_back(); auto &buf = state->buffer.emplace_back();
buf.packets.first = spkt_raw; buf.packets.first = spkt_raw;
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <list> #include <list>
#include <atomic> #include <atomic>
#include <unordered_map> #include <unordered_map>
#include <set>
#include "../universe.hpp" #include "../universe.hpp"
#include <ftl/threads.hpp> #include <ftl/threads.hpp>
#include <ftl/protocol/packet.hpp> #include <ftl/protocol/packet.hpp>
...@@ -121,6 +122,7 @@ class Net : public Stream { ...@@ -121,6 +122,7 @@ class Net : public Stream {
std::atomic_int active = 0; std::atomic_int active = 0;
SHARED_MUTEX mtx; SHARED_MUTEX mtx;
std::list<PacketBuffer> buffer; std::list<PacketBuffer> buffer;
std::set<int64_t> timestamps;
int64_t base_pkt_ts_ = 0; int64_t base_pkt_ts_ = 0;
int64_t base_local_ts_ = 0; int64_t base_local_ts_ = 0;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment