Skip to content
Snippets Groups Projects
Commit 391c2cee authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Change net stream buffer mechanism

parent dcb0b320
No related branches found
No related tags found
No related merge requests found
......@@ -309,12 +309,16 @@ void Net::_run() {
int64_t cts = now - state->base_local_ts_;
bool hasNext = false;
bool hasDone = false;
// If there are any packets that should be dispatched
// Then create a thread for each and do it.
if (state->active == 0) {
//if (state->active == 0) {
{
SHARED_LOCK(state->mtx, lk2);
auto current = state->buffer.begin();
while (current != state->buffer.end()) {
lk2.unlock();
if (!current->done) {
if (state->base_local_ts_ == 0) {
state->base_local_ts_ = now;
......@@ -325,7 +329,7 @@ void Net::_run() {
int64_t pts = current->packets.first.timestamp - state->base_pkt_ts_ + buffering_;
// Should the packet be dispatched yet
if (pts <= cts) {
if (state->active == 0 && pts <= cts) {
cts = pts; // Prevent multi frames.
StreamPacket *spkt;
......@@ -346,13 +350,17 @@ void Net::_run() {
hasNext = true;
break;
}
} else {
hasDone = true;
}
lk2.lock();
++current;
}
} else {
LOG(WARNING) << "Already active";
}
/*} else {
LOG(WARNING) << "Already active";
}*/
if (!hasNext) {
nextTs = std::min(nextTs, now + 10);
......@@ -360,8 +368,17 @@ void Net::_run() {
}
// Remove consumed packets.
UNIQUE_LOCK(state->mtx, lk2);
state->buffer.remove_if([](const PacketBuffer &i) { return static_cast<bool>(i.done); });
if (hasDone) {
UNIQUE_LOCK(state->mtx, lk);
auto it = state->buffer.begin();
while (it != state->buffer.end()) {
if (it->done) {
it = state->buffer.erase(it);
} else {
break;
}
}
}
}
lk.unlock();
......
......@@ -119,7 +119,7 @@ class Net : public Stream {
struct FrameState {
ftl::protocol::FrameID id;
std::atomic_int active = 0;
MUTEX mtx;
SHARED_MUTEX mtx;
std::list<PacketBuffer> buffer;
int64_t base_pkt_ts_ = 0;
int64_t base_local_ts_ = 0;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment