diff --git a/src/streams/packetmanager.cpp b/src/streams/packetmanager.cpp index 139cc942e09e67e6554e88c26ae463c0ea2d76bf..e252f5b0b91fb185a5f3829196126fa120fd078b 100644 --- a/src/streams/packetmanager.cpp +++ b/src/streams/packetmanager.cpp @@ -31,10 +31,6 @@ StreamState &PacketManager::getState(FrameID id) { void PacketManager::submit(PacketPair &packets, const std::function<void(const PacketPair &)> &cb, bool noLoop) { auto &state = getState(FrameID(packets.first.frameSetID(), packets.first.frameNumber())); - if (state.timestamp == -1) { - state.timestamp = packets.first.timestamp; - } - if (state.timestamp == packets.first.timestamp) { if (packets.first.channel == Channel::kEndFrame) { state.expected = packets.second.packet_count; @@ -53,13 +49,15 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P size_t stop = state.writePos; state.readPos = stop; - state.timestamp = std::numeric_limits<int64_t>::max(); + int64_t ts = std::numeric_limits<int64_t>::max(); for (size_t i = start; i < stop; ++i) { - state.timestamp = std::min( - state.timestamp, + ts = std::min( + ts, state.buffer[i % StreamState::kMaxBuffer].first.timestamp); } + state.timestamp = ts; + lk.unlock(); // Loop over the buffer, checking for anything that can be processed for (size_t i = start; i < stop; ++i) { @@ -86,6 +84,13 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P return; } + if (state.timestamp == -1) { + state.timestamp = packets.first.timestamp; + lk.unlock(); + submit(packets, cb); + return; + } + // Add packet to buffer; auto wpos = state.writePos++; state.buffer[wpos % StreamState::kMaxBuffer] = std::move(packets); @@ -98,29 +103,33 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P if (state.bufferedEndFrames > 4) { LOG(WARNING) << "Discarding incomplete frame: " << state.timestamp; UNIQUE_LOCK(state.mtx, lk); - state.processed = 0; - state.expected = -1; - - size_t start = state.readPos; - size_t stop = state.writePos; - state.readPos = stop; - - state.timestamp = std::numeric_limits<int64_t>::max(); - for (size_t i = start; i < stop; ++i) { - state.timestamp = std::min( - state.timestamp, - state.buffer[i % StreamState::kMaxBuffer].first.timestamp); - } + if (state.bufferedEndFrames > 4) { + state.processed = 0; + state.expected = -1; - lk.unlock(); - // Loop over the buffer, checking for anything that can be processed - for (size_t i = start; i < stop; ++i) { - if (state.buffer[i].first.channel == Channel::kEndFrame) { - --state.bufferedEndFrames; + size_t start = state.readPos; + size_t stop = state.writePos; + state.readPos = stop; + + int64_t ts = std::numeric_limits<int64_t>::max(); + for (size_t i = start; i < stop; ++i) { + ts = std::min( + ts, + state.buffer[i % StreamState::kMaxBuffer].first.timestamp); + } + + state.timestamp = ts; + + lk.unlock(); + // Loop over the buffer, checking for anything that can be processed + for (size_t i = start; i < stop; ++i) { + if (state.buffer[i].first.channel == Channel::kEndFrame) { + --state.bufferedEndFrames; + } + submit(state.buffer[i], cb, true); + std::vector<uint8_t> temp; + state.buffer[i].second.data.swap(temp); } - submit(state.buffer[i], cb, true); - std::vector<uint8_t> temp; - state.buffer[i].second.data.swap(temp); } } } diff --git a/src/streams/packetmanager.hpp b/src/streams/packetmanager.hpp index e962bf037c42270f76800cdfa3d832801168f44e..dbaf9dbb35618b72b52b595bd0218abc49d24f4a 100644 --- a/src/streams/packetmanager.hpp +++ b/src/streams/packetmanager.hpp @@ -22,7 +22,7 @@ namespace ftl { struct StreamState { static constexpr int kMaxBuffer = 100; - MUTEX mtx; + SHARED_MUTEX mtx; std::array<ftl::protocol::PacketPair, kMaxBuffer> buffer; int64_t timestamp = -1;