Skip to content
Snippets Groups Projects
Commit 9ac0cd04 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Fixes for timestamp race issue

parent 7900f05f
No related branches found
No related tags found
No related merge requests found
......@@ -31,10 +31,6 @@ StreamState &PacketManager::getState(FrameID id) {
void PacketManager::submit(PacketPair &packets, const std::function<void(const PacketPair &)> &cb, bool noLoop) {
auto &state = getState(FrameID(packets.first.frameSetID(), packets.first.frameNumber()));
if (state.timestamp == -1) {
state.timestamp = packets.first.timestamp;
}
if (state.timestamp == packets.first.timestamp) {
if (packets.first.channel == Channel::kEndFrame) {
state.expected = packets.second.packet_count;
......@@ -53,13 +49,15 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P
size_t stop = state.writePos;
state.readPos = stop;
state.timestamp = std::numeric_limits<int64_t>::max();
int64_t ts = std::numeric_limits<int64_t>::max();
for (size_t i = start; i < stop; ++i) {
state.timestamp = std::min(
state.timestamp,
ts = std::min(
ts,
state.buffer[i % StreamState::kMaxBuffer].first.timestamp);
}
state.timestamp = ts;
lk.unlock();
// Loop over the buffer, checking for anything that can be processed
for (size_t i = start; i < stop; ++i) {
......@@ -86,6 +84,13 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P
return;
}
if (state.timestamp == -1) {
state.timestamp = packets.first.timestamp;
lk.unlock();
submit(packets, cb);
return;
}
// Add packet to buffer;
auto wpos = state.writePos++;
state.buffer[wpos % StreamState::kMaxBuffer] = std::move(packets);
......@@ -98,29 +103,33 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P
if (state.bufferedEndFrames > 4) {
LOG(WARNING) << "Discarding incomplete frame: " << state.timestamp;
UNIQUE_LOCK(state.mtx, lk);
state.processed = 0;
state.expected = -1;
size_t start = state.readPos;
size_t stop = state.writePos;
state.readPos = stop;
state.timestamp = std::numeric_limits<int64_t>::max();
for (size_t i = start; i < stop; ++i) {
state.timestamp = std::min(
state.timestamp,
state.buffer[i % StreamState::kMaxBuffer].first.timestamp);
}
if (state.bufferedEndFrames > 4) {
state.processed = 0;
state.expected = -1;
lk.unlock();
// Loop over the buffer, checking for anything that can be processed
for (size_t i = start; i < stop; ++i) {
if (state.buffer[i].first.channel == Channel::kEndFrame) {
--state.bufferedEndFrames;
size_t start = state.readPos;
size_t stop = state.writePos;
state.readPos = stop;
int64_t ts = std::numeric_limits<int64_t>::max();
for (size_t i = start; i < stop; ++i) {
ts = std::min(
ts,
state.buffer[i % StreamState::kMaxBuffer].first.timestamp);
}
state.timestamp = ts;
lk.unlock();
// Loop over the buffer, checking for anything that can be processed
for (size_t i = start; i < stop; ++i) {
if (state.buffer[i].first.channel == Channel::kEndFrame) {
--state.bufferedEndFrames;
}
submit(state.buffer[i], cb, true);
std::vector<uint8_t> temp;
state.buffer[i].second.data.swap(temp);
}
submit(state.buffer[i], cb, true);
std::vector<uint8_t> temp;
state.buffer[i].second.data.swap(temp);
}
}
}
......
......@@ -22,7 +22,7 @@ namespace ftl {
struct StreamState {
static constexpr int kMaxBuffer = 100;
MUTEX mtx;
SHARED_MUTEX mtx;
std::array<ftl::protocol::PacketPair, kMaxBuffer> buffer;
int64_t timestamp = -1;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment