From aa870acadb33b442521f596c8349a7ece3e09723 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Fri, 11 Nov 2022 12:31:21 +0000
Subject: [PATCH] Immediately process packets for tally

---
 src/streams/netstream.cpp | 56 ++++++++++++++++++++-------------------
 src/streams/netstream.hpp |  1 +
 2 files changed, 30 insertions(+), 27 deletions(-)

diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
index e025c49..c551bfd 100644
--- a/src/streams/netstream.cpp
+++ b/src/streams/netstream.cpp
@@ -207,20 +207,9 @@ bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) {
     return true;
 }
 
-void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt) {
-    int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count();
-
+void Net::_earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt, DataPacket &pkt) {
     if (!active_) return;
 
-    ftl::protocol::PacketPair pair;
-    StreamPacket &spkt = pair.first;
-    spkt = spkt_raw;
-    spkt.localTimestamp = now - int64_t(ttimeoff);
-    spkt.hint_capability = 0;
-    spkt.hint_source_total = 0;
-    spkt.version = 4;
-    if (p) spkt.hint_peerid = p->localID();
-
     bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest);
 
     FrameID localFrame(spkt.streamID, spkt.frame_number);
@@ -247,6 +236,23 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
             --tally_[localFrame.frameset()];
         }
     }
+}
+
+void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt) {
+    int64_t now = time_point_cast<milliseconds>(high_resolution_clock::now()).time_since_epoch().count();
+
+    if (!active_) return;
+
+    ftl::protocol::PacketPair pair;
+    StreamPacket &spkt = pair.first;
+    spkt = spkt_raw;
+    spkt.localTimestamp = now - int64_t(ttimeoff);
+    spkt.hint_capability = 0;
+    spkt.hint_source_total = 0;
+    spkt.version = 4;
+    if (p) spkt.hint_peerid = p->localID();
+
+    bool isRequest = host_ && pkt.data.size() == 0 && (spkt.flags & ftl::protocol::kFlagRequest);
 
     bytes_received_ += pkt.data.size();
 
@@ -257,19 +263,8 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
         _processRequest(p, &spkt, pkt);
     }
 
-    /*if (!host_) {
-        pair.second = std::move(pkt);
-        mgr_.submit(pair, [this, now, ttimeoff, p](const ftl::protocol::PacketPair &pair) { 
-            const StreamPacket &spkt = pair.first;
-            const DataPacket &pkt = pair.second;
-
-            trigger(spkt, pkt);
-            if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
-        });
-    } else {*/
-        trigger(spkt, pkt);
-        if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
-    //}
+    trigger(spkt, pkt);
+    if (pkt.data.size() > 0) _checkRXRate(pkt.data.size(), now-(spkt.timestamp+ttimeoff), spkt.timestamp);
 }
 
 void Net::inject(const ftl::protocol::StreamPacket &spkt, ftl::protocol::DataPacket &pkt) {
@@ -310,6 +305,7 @@ void Net::_run() {
                 if (state->active > 0) {
                     LOG(WARNING) << "Previous frame still processing: " << nextTs;
                     nextTs = now + 1;
+                    hasNext = true;
                     continue;
                 }
 
@@ -333,7 +329,7 @@ void Net::_run() {
                         }
                         ats = raw_ats - state->base_pkt_ts_ + buffering_; 
                     } else {
-                        LOG(WARNING) << "No packets to present: " << cts;
+                        // LOG(WARNING) << "No packets to present: " << cts;
                         continue;
                     }
 
@@ -365,7 +361,11 @@ void Net::_run() {
 
                             ++state->active;
                             ftl::pool.push([this, buf = &*current, spkt, pkt, state](int ix) {
-                                _processPacket(buf->peer, 0, *spkt, *pkt);
+                                try {
+                                    _processPacket(buf->peer, 0, *spkt, *pkt);
+                                } catch (const std::exception &e) {
+                                    LOG(ERROR) << "Packet processing error: " << e.what();
+                                }
                                 buf->done = true;
                                 --state->active;
                             });
@@ -442,6 +442,8 @@ bool Net::begin() {
             PacketMSGPACK &pkt) {
 
         auto *state = _getFrameState(FrameID(spkt_raw.streamID, spkt_raw.frame_number));
+        _earlyProcessPacket(&p, ttimeoff, spkt_raw, pkt);
+
         if (!host_) {
             UNIQUE_LOCK(state->mtx, lk);
             state->timestamps.insert(spkt_raw.timestamp);
diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp
index 196323e..33c7908 100644
--- a/src/streams/netstream.hpp
+++ b/src/streams/netstream.hpp
@@ -149,6 +149,7 @@ class Net : public Stream {
         bool doreset = false);
     void _cleanUp();
     void _processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
+    void _earlyProcessPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket &spkt_raw, DataPacket &pkt);
     void _run();
 };
 
-- 
GitLab