From 6f4d775d683f785441fcea5814838e55ae78c81d Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Mon, 26 Sep 2022 17:09:30 +0100
Subject: [PATCH] Fix packet buffer overflow

---
 src/streams/netstream.cpp     | 17 ++++++++++-----
 src/streams/packetmanager.cpp | 13 ++++++-----
 src/streams/packetmanager.hpp |  1 -
 test/packetmanager_unit.cpp   | 41 +++++++++++++++++++++++++++++++++++
 4 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
index c411bbe..7f779f5 100644
--- a/src/streams/netstream.cpp
+++ b/src/streams/netstream.cpp
@@ -262,14 +262,19 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
         _processRequest(p, &spkt, pkt);
     }
 
-    pair.second = std::move(pkt);
-    mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) { 
-        const StreamPacket &spkt = pair.first;
-        const DataPacket &pkt = pair.second;
-
+    if (!host_) {
+        pair.second = std::move(pkt);
+        mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) { 
+            const StreamPacket &spkt = pair.first;
+            const DataPacket &pkt = pair.second;
+
+            trigger(spkt, pkt);
+            if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
+        });
+    } else {
         trigger(spkt, pkt);
         if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
-    });
+    }
 }
 
 void Net::inject(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
diff --git a/src/streams/packetmanager.cpp b/src/streams/packetmanager.cpp
index e252f5b..f964263 100644
--- a/src/streams/packetmanager.cpp
+++ b/src/streams/packetmanager.cpp
@@ -61,10 +61,10 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P
                     lk.unlock();
                     // Loop over the buffer, checking for anything that can be processed
                     for (size_t i = start; i < stop; ++i) {
-                        if (state.buffer[i].first.channel == Channel::kEndFrame) {
+                        if (state.buffer[i % StreamState::kMaxBuffer].first.channel == Channel::kEndFrame) {
                             --state.bufferedEndFrames;
                         }
-                        submit(state.buffer[i], cb, true);
+                        submit(state.buffer[i % StreamState::kMaxBuffer], cb, true);
                     }
                 } else {
                     state.timestamp = -1;
@@ -74,6 +74,9 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P
         }
     } else if (state.timestamp > packets.first.timestamp) {
         LOG(WARNING) << "Old packet received";
+        // Note: not ideal but still better than discarding
+        cb(packets);
+        return;
     } else {
         DLOG(WARNING) << "Buffer packets: " << packets.first.timestamp;
         // Change the current frame
@@ -123,12 +126,12 @@ void PacketManager::submit(PacketPair &packets, const std::function<void(const P
                 lk.unlock();
                 // Loop over the buffer, checking for anything that can be processed
                 for (size_t i = start; i < stop; ++i) {
-                    if (state.buffer[i].first.channel == Channel::kEndFrame) {
+                    if (state.buffer[i % StreamState::kMaxBuffer].first.channel == Channel::kEndFrame) {
                         --state.bufferedEndFrames;
                     }
-                    submit(state.buffer[i], cb, true);
+                    submit(state.buffer[i % StreamState::kMaxBuffer], cb, true);
                     std::vector<uint8_t> temp;
-                    state.buffer[i].second.data.swap(temp);
+                    state.buffer[i % StreamState::kMaxBuffer].second.data.swap(temp);
                 }
             }
         }
diff --git a/src/streams/packetmanager.hpp b/src/streams/packetmanager.hpp
index dbaf9db..659ca82 100644
--- a/src/streams/packetmanager.hpp
+++ b/src/streams/packetmanager.hpp
@@ -26,7 +26,6 @@ struct StreamState {
     std::array<ftl::protocol::PacketPair, kMaxBuffer> buffer;
 
     int64_t timestamp = -1;
-    int64_t minTimestamp = -1;
     int expected = -1;
     std::atomic_int processed = 0;
     size_t readPos = 0;
diff --git a/test/packetmanager_unit.cpp b/test/packetmanager_unit.cpp
index d073c75..bd402eb 100644
--- a/test/packetmanager_unit.cpp
+++ b/test/packetmanager_unit.cpp
@@ -180,3 +180,44 @@ TEST_CASE( "Incomplete frames" ) {
 
     REQUIRE(count == 7);
 }
+
+TEST_CASE( "Overflow the buffer" ) {
+	PacketManager mgr;
+
+    int count = 0;
+
+    PacketPair p;
+
+    p = makePair(400, Channel::kEndFrame);
+    p.second.packet_count = 2;
+    mgr.submit(p, [&count](const PacketPair &pp) {
+        ++count;
+    });
+
+    p = makePair(401, Channel::kColour);
+    for (int i = 0; i<95; ++i) {
+        mgr.submit(p, [&count](const PacketPair &pp) {
+            ++count;
+        });
+    }
+
+    p = makePair(400, Channel::kColour);
+    mgr.submit(p, [&count](const PacketPair &pp) {
+        ++count;
+    });
+
+    p = makePair(402, Channel::kColour);
+    for (int i = 0; i<95; ++i) {
+        mgr.submit(p, [&count](const PacketPair &pp) {
+            ++count;
+        });
+    }
+
+    p = makePair(401, Channel::kEndFrame);
+    p.second.packet_count = 96;
+    mgr.submit(p, [&count](const PacketPair &pp) {
+        ++count;
+    });
+
+    REQUIRE(count == 96 + 95 + 2);
+}
-- 
GitLab