Skip to content
Snippets Groups Projects
Commit 3c349fa1 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'bug/netstream-dbg5' into 'main'

Change to use ordered set of timestamps

See merge request beyondaka/beyond-protocol!66
parents 0e631ed1 f9f26406
No related branches found
No related tags found
No related merge requests found
...@@ -313,65 +313,46 @@ void Net::_run() { ...@@ -313,65 +313,46 @@ void Net::_run() {
continue; continue;
} }
//if (state->base_local_ts_ == 0) continue;
int64_t cts = now - state->base_local_ts_; int64_t cts = now - state->base_local_ts_;
bool hasLocalNext = false;
{
UNIQUE_LOCK(state->mtx, lk2);
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
}
// If there are any packets that should be dispatched // If there are any packets that should be dispatched
// Then create a thread for each and do it. // Then create a thread for each and do it.
//if (state->active == 0) { //if (state->active == 0) {
{ {
SHARED_LOCK(state->mtx, lk2); SHARED_LOCK(state->mtx, lk2);
int64_t ats = 0; int64_t ats = -1;
if (state->timestamps.size() > 0) {
size_t size = state->buffer.size(); int64_t raw_ats = *state->timestamps.begin();
lk2.unlock(); // state->timestamps.erase(state->timestamps.begin());
if (size > 0) {
// LOG(INFO) << "Buffer size = " << state->buffer.size();
auto &front = state->buffer.front();
if (state->base_local_ts_ == 0) { if (state->base_local_ts_ == 0) {
state->base_local_ts_ = now; state->base_local_ts_ = now;
state->base_pkt_ts_ = front.packets.first.timestamp; state->base_pkt_ts_ = raw_ats;
cts = 0; cts = 0;
} }
ats = raw_ats - state->base_pkt_ts_ + buffering_;
int64_t pts = front.packets.first.timestamp - state->base_pkt_ts_ + buffering_; } else {
LOG(WARNING) << "No packets to present: " << cts;
if (pts <= cts) { continue;
LOG(INFO) << "Presentation error = " << (cts - pts);
ats = pts;
} else {
LOG(INFO) << "Next presentation in: " << (pts - cts);
}
} }
if (size == 0) { size_t size = state->buffer.size();
LOG(WARNING) << "No packets to present: " << cts; lk2.unlock();
// Not ready to display this one yet.
if (ats > cts) {
LOG(INFO) << "Next presentation in: " << (ats - cts);
nextTs = std::min(nextTs, ats + state->base_local_ts_);
hasNext = true;
continue; continue;
} else {
LOG(INFO) << "Presentation error = " << (cts - ats);
} }
auto current = state->buffer.begin(); auto current = state->buffer.begin();
bool seenEnd = false;
for (size_t i = 0; i < size; ++i) { for (size_t i = 0; i < size; ++i) {
// lk2.unlock();
int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_; int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_;
// Should the packet be dispatched yet // Should the packet be dispatched yet
...@@ -382,35 +363,43 @@ void Net::_run() { ...@@ -382,35 +363,43 @@ void Net::_run() {
spkt = &current->packets.first; spkt = &current->packets.first;
pkt = &current->packets.second; pkt = &current->packets.second;
if (spkt->channel == Channel::kEndFrame) {
seenEnd = true;
}
++state->active; ++state->active;
ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) { ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) {
_processPacket(buf->peer, 0, *spkt, *pkt); _processPacket(buf->peer, 0, *spkt, *pkt);
buf->done = true; buf->done = true;
--state->active; --state->active;
}); });
} else {
int64_t next = pts + state->base_local_ts_; if (spkt->channel == Channel::kEndFrame) {
nextTs = std::min(nextTs, next);
hasLocalNext = true;
hasNext = true;
if (seenEnd) {
break; break;
} }
} }
// lk2.lock();
++current; ++current;
} }
} }
if (!hasLocalNext) { {
// nextTs = std::min(nextTs, now + 10); UNIQUE_LOCK(state->mtx, lk2);
// TODO: Also, increase buffering state->timestamps.erase(state->timestamps.begin());
LOG(WARNING) << "Buffer underun " << now;
if (state->timestamps.size() > 0) {
int64_t nts = *state->timestamps.begin();
nts = nts - state->base_pkt_ts_ + buffering_ + state->base_local_ts_;
nextTs = std::min(nextTs, nts);
hasNext = true;
} else {
LOG(WARNING) << "Buffer underun " << now;
}
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
} }
} }
lk.unlock(); lk.unlock();
...@@ -455,6 +444,7 @@ bool Net::begin() { ...@@ -455,6 +444,7 @@ bool Net::begin() {
auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number)); auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number));
if (!host_) { if (!host_) {
UNIQUE_LOCK(state->mtx, lk); UNIQUE_LOCK(state->mtx, lk);
state->timestamps.insert(spkt_raw.timestamp);
// TODO(Nick): This buffer could be faster? // TODO(Nick): This buffer could be faster?
auto &buf = state->buffer.emplace_back(); auto &buf = state->buffer.emplace_back();
buf.packets.first = spkt_raw; buf.packets.first = spkt_raw;
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <list> #include <list>
#include <atomic> #include <atomic>
#include <unordered_map> #include <unordered_map>
#include <set>
#include "../universe.hpp" #include "../universe.hpp"
#include <ftl/threads.hpp> #include <ftl/threads.hpp>
#include <ftl/protocol/packet.hpp> #include <ftl/protocol/packet.hpp>
...@@ -121,6 +122,7 @@ class Net : public Stream { ...@@ -121,6 +122,7 @@ class Net : public Stream {
std::atomic_int active = 0; std::atomic_int active = 0;
SHARED_MUTEX mtx; SHARED_MUTEX mtx;
std::list<PacketBuffer> buffer; std::list<PacketBuffer> buffer;
std::set<int64_t> timestamps;
int64_t base_pkt_ts_ = 0; int64_t base_pkt_ts_ = 0;
int64_t base_local_ts_ = 0; int64_t base_local_ts_ = 0;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment