From 95cfc32743d3e3930e0d3faa2995aad9d209d5b6 Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Fri, 28 Oct 2022 06:01:59 +0000
Subject: [PATCH] Netstream request tally improvements

---
 .vscode/settings.json            |  3 +-
 include/ftl/protocol/streams.hpp | 10 +++-
 src/streams/netstream.cpp        | 62 +++++++++++-------------
 src/streams/netstream.hpp        | 14 +-----
 src/streams/streams.cpp          | 12 +++++
 test/netstream_unit.cpp          | 83 ++++++++++++++++++++++++++++++++
 6 files changed, 136 insertions(+), 48 deletions(-)

diff --git a/.vscode/settings.json b/.vscode/settings.json
index c6f7c1d..713f547 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -65,7 +65,8 @@
         "typeinfo": "cpp",
         "valarray": "cpp",
         "variant": "cpp",
-        "any": "cpp"
+        "any": "cpp",
+        "complex": "cpp"
     },
     "cmake.cmakePath": "cmake"
 }
\ No newline at end of file
diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp
index a258a35..fe8ce02 100644
--- a/include/ftl/protocol/streams.hpp
+++ b/include/ftl/protocol/streams.hpp
@@ -62,7 +62,8 @@ enum struct StreamProperty {
     kName,
     kDescription,
     kTags,
-    kUser
+    kUser,
+    kRequestSize
 };
 
 /**
@@ -233,6 +234,13 @@ class Stream {
      */
     std::unordered_set<FrameID> enabled() const;
 
+    /**
+     * @brief Get all enabled frames in a frameset.
+     * 
+     * @return Set of frame IDs
+     */
+    std::unordered_set<FrameID> enabled(unsigned int) const;
+
     /**
      * @brief Check if a frame is enabled.
      * 
diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
index a9b5c15..8b630a9 100644
--- a/src/streams/netstream.cpp
+++ b/src/streams/netstream.cpp
@@ -87,7 +87,7 @@ void Net::installRPC(ftl::net::Universe *net) {
 }
 
 Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) :
-        net_(net), time_peer_(ftl::UUID(0)), uri_(uri), host_(host) {
+        net_(net), uri_(uri), host_(host) {
     ftl::URI u(uri_);
     if (!u.isValid() || !(u.getScheme() == ftl::URI::SCHEME_FTL)) {
         error(Error::kBadURI, uri_);
@@ -226,36 +226,26 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
     if (paused_) return;
 
     // Manage recuring requests
-    if (!host_ && last_frame_ != spkt.timestamp) {
-        UNIQUE_LOCK(mutex_, lk);
-        if (last_frame_ != spkt.timestamp) {
-            // int tc = now - last_completion_;          // Milliseconds since last frame completed
-            frame_time_ = spkt.timestamp - last_frame_;  // Milliseconds per frame
-            last_completion_ = now;
-            bytes_received_ = 0;
-            last_frame_ = spkt.timestamp;
-
-            lk.unlock();
-
-            // Are we close to reaching the end of our frames request?
-            if (tally_ <= 5) {
-                // Yes, so send new requests
-                // FIXME: Do this for all frames, or use tally be frame
-                // for (size_t i = 0; i < size(); ++i) {
-                    const auto &sel = enabledChannels(localFrame);
-                    for (auto c : sel) {
-                        _sendRequest(c, localFrame.frameset(), localFrame.source(), frames_to_request_, 255);
-                    }
-                //}
-                tally_ = frames_to_request_;
-            } else {
-                --tally_;
+    if (!host_ && spkt.channel == Channel::kEndFrame && localFrame.frameset() < tally_.size()) {
+        frame_time_ = spkt.timestamp - last_frame_;  // Milliseconds per frame
+        last_frame_ = spkt.timestamp;
+
+        // Are we close to reaching the end of our frames request?
+        if (tally_[localFrame.frameset()] <= 5) {
+            // Yes, so send new requests
+            for (const auto f : enabled(localFrame.frameset())) {
+                const auto &sel = enabledChannels(f);
+                for (auto c : sel) {
+                    _sendRequest(c, f.frameset(), f.source(), frames_to_request_, 255);
+                }
             }
+            tally_[localFrame.frameset()] = frames_to_request_;
+        } else {
+            --tally_[localFrame.frameset()];
         }
     }
 
     bytes_received_ += pkt.data.size();
-    // time_at_last_ = now;
 
     // If hosting and no data then it is a request for data
     // Note: a non host can receive empty data, meaning data is available
@@ -362,7 +352,7 @@ bool Net::begin() {
         net_->broadcast("add_stream", uri_);
 
     } else {
-        tally_ = frames_to_request_;
+        for (size_t i = 0; i < tally_.size(); ++i) tally_[i] = frames_to_request_;
         active_ = true;
     }
 
@@ -381,7 +371,8 @@ void Net::refresh() {
             _sendRequest(c, i.frameset(), i.source(), frames_to_request_, 255, true);
         }
     }
-    tally_ = frames_to_request_;
+
+    for (size_t i = 0; i < tally_.size(); ++i) tally_[i] = frames_to_request_;
 }
 
 void Net::reset() {
@@ -390,6 +381,7 @@ void Net::reset() {
 
 bool Net::_enable(FrameID id) {
     if (host_) { return false; }
+    if (peer_) return true;
     if (enabled(id)) return true;
 
     // not hosting, try to find peer now
@@ -418,7 +410,7 @@ bool Net::enable(FrameID id) {
     if (host_) { return false; }
     if (!_enable(id)) return false;
     if (!Stream::enable(id)) return false;
-    _sendRequest(Channel::kColour, id.frameset(), id.source(), kFramesToRequest, 255, true);
+    _sendRequest(Channel::kColour, id.frameset(), id.source(), frames_to_request_, 255, true);
 
     return true;
 }
@@ -427,7 +419,7 @@ bool Net::enable(FrameID id, Channel c) {
     if (host_) { return false; }
     if (!_enable(id)) return false;
     if (!Stream::enable(id, c)) return false;
-    _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true);
+    _sendRequest(c, id.frameset(), id.source(), frames_to_request_, 255, true);
     return true;
 }
 
@@ -436,7 +428,7 @@ bool Net::enable(FrameID id, const ChannelSet &channels) {
     if (!_enable(id)) return false;
     if (!Stream::enable(id, channels)) return false;
     for (auto c : channels) {
-        _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true);
+        _sendRequest(c, id.frameset(), id.source(), frames_to_request_, 255, true);
     }
     return true;
 }
@@ -479,9 +471,6 @@ void Net::_cleanUp() {
         for (auto j = clients.begin(); j != clients.end();) {
             auto &client = *j;
             if (client.txcount <= 0) {
-                if (client.peerid == time_peer_) {
-                    time_peer_ = ftl::UUID(0);
-                }
                 DLOG(INFO) << "Remove peer: " << client.peerid.to_string();
                 j = clients.erase(j);
             } else {
@@ -507,6 +496,8 @@ bool Net::_processRequest(ftl::net::Peer *p, const StreamPacket *spkt, DataPacke
         // Generate a batch of requests
         ftl::protocol::StreamPacket spkt2 = *spkt;
         for (const auto &i : frames()) {
+            if (spkt->streamID != 255 && i.frameset() != spkt->streamID) continue;
+            if (spkt->frame_number != 255 && i.source() != spkt->frame_number) continue;
             spkt2.streamID = i.frameset();
             spkt2.frame_number = i.source();
             _processRequest(p, &spkt2, pkt);
@@ -630,6 +621,7 @@ void Net::setProperty(ftl::protocol::StreamProperty opt, std::any value) {
     case StreamProperty::kMaxBitrate    :  bitrate_ = std::any_cast<int>(value); break;
     case StreamProperty::kPaused        :  paused_ = std::any_cast<bool>(value); break;
     case StreamProperty::kName          :  name_ = std::any_cast<std::string>(value); break;
+    case StreamProperty::kRequestSize   :  frames_to_request_ = std::any_cast<int>(value); break;
     case StreamProperty::kObservers     :
     case StreamProperty::kBytesSent     :
     case StreamProperty::kBytesReceived :
@@ -652,6 +644,7 @@ std::any Net::getProperty(ftl::protocol::StreamProperty opt) {
     case StreamProperty::kFrameRate     :  return (frame_time_ > 0) ? 1000 / frame_time_ : 0;
     case StreamProperty::kLatency       :  return 0;
     case StreamProperty::kName          :  return name_;
+    case StreamProperty::kRequestSize   :  return frames_to_request_;
     default                             :  throw FTL_Error("Unsupported property");
     }
 }
@@ -667,6 +660,7 @@ bool Net::supportsProperty(ftl::protocol::StreamProperty opt) {
     case StreamProperty::kLatency       :
     case StreamProperty::kFrameRate     :
     case StreamProperty::kName          :
+    case StreamProperty::kRequestSize   :
     case StreamProperty::kURI           :  return true;
     default                             :  return false;
     }
diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp
index baabbf1..f803be3 100644
--- a/src/streams/netstream.hpp
+++ b/src/streams/netstream.hpp
@@ -8,6 +8,7 @@
 
 #include <string>
 #include <list>
+#include <atomic>
 #include <unordered_map>
 #include "../universe.hpp"
 #include <ftl/threads.hpp>
@@ -88,32 +89,21 @@ class Net : public Stream {
     SHARED_MUTEX mutex_;
     bool active_ = false;
     ftl::net::Universe *net_;
-    int64_t clock_adjust_ = 0;
-    ftl::UUID time_peer_;
     std::optional<ftl::UUID> peer_;
     int64_t last_frame_ = 0;
-    int64_t last_ping_ = 0;
     int64_t frame_time_ = 0;
     std::string uri_;
     std::string base_uri_;
     const bool host_;
-    int tally_ = 0;
-    std::array<std::atomic<int>, 32> reqtally_ = {0};
-    ftl::protocol::ChannelSet last_selected_;
+    std::array<std::atomic_int, 5> tally_ = {};
     uint8_t bitrate_ = 255;
     std::atomic_int64_t bytes_received_ = 0;
-    int64_t last_completion_ = 0;
-    int64_t time_at_last_ = 0;
-    float required_bps_ = 0.0f;
-    float actual_bps_ = 0.0f;
     bool paused_ = false;
     int frames_to_request_ = kFramesToRequest;
     std::string name_;
     ftl::PacketManager mgr_;
     ftl::Handler<ftl::net::Peer*> connect_cb_;
 
-    uint32_t local_fsid_ = 0;
-
     static std::atomic_size_t req_bitrate__;
     static std::atomic_size_t tx_bitrate__;
     static std::atomic_size_t rx_sample_count__;
diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp
index ba699f0..f64496b 100644
--- a/src/streams/streams.cpp
+++ b/src/streams/streams.cpp
@@ -87,6 +87,18 @@ std::unordered_set<FrameID> Stream::enabled() const {
     return result;
 }
 
+std::unordered_set<FrameID> Stream::enabled(unsigned int fs) const {
+    SHARED_LOCK(mtx_, lk);
+    std::unordered_set<FrameID> result;
+    for (const auto &s : state_) {
+        if (!s.second) continue;
+        if (s.second->enabled && FrameID(s.first).frameset() == fs) {
+            result.emplace(s.first);
+        }
+    }
+    return result;
+}
+
 bool Stream::enabled(FrameID id) const {
     auto state = _getState(id);
     if (!state) return false;
diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp
index 51d7e70..76ff572 100644
--- a/test/netstream_unit.cpp
+++ b/test/netstream_unit.cpp
@@ -120,6 +120,89 @@ TEST_CASE("Net stream sending requests") {
         REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
     }
 
+    SECTION("sends repeat requests - single frame") {
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
+        
+        // Thread to provide response to otherwise blocking call
+        std::thread thr([&p]() {
+            auto z = std::make_unique<msgpack::zone>();
+            provideResponses(p, 0, {
+                {false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))},
+                {true, "enable_stream", {}},
+            });
+        });
+
+        s1->setProperty(StreamProperty::kRequestSize, 10);
+
+        REQUIRE( s1->begin() );
+
+        s1->forceSeen(FrameID(0, 0), Channel::kColour);
+        REQUIRE( s1->enable(FrameID(0, 0), Channel::kColour));
+
+        ftl::protocol::StreamPacketMSGPACK spkt;
+        ftl::protocol::PacketMSGPACK pkt;
+        spkt.streamID = 0;
+        spkt.frame_number = 0;
+        spkt.channel = Channel::kEndFrame;
+
+        thr.join();
+
+        s1->lastSpkt.channel = Channel::kNone;
+
+        for (int i=0; i<20; ++i) {
+            spkt.timestamp = i;
+            writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
+            p->data();
+            sleep_for(milliseconds(50));
+        }
+
+        REQUIRE( s1->lastSpkt.channel == Channel::kColour );
+        REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
+    }
+
+    SECTION("sends repeat requests - multi frame") {
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
+        
+        // Thread to provide response to otherwise blocking call
+        std::thread thr([&p]() {
+            auto z = std::make_unique<msgpack::zone>();
+            provideResponses(p, 0, {
+                {false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))},
+                {true, "enable_stream", {}},
+            });
+        });
+
+        s1->setProperty(StreamProperty::kRequestSize, 10);
+
+        REQUIRE( s1->begin() );
+
+        s1->forceSeen(FrameID(0, 0), Channel::kColour);
+        s1->forceSeen(FrameID(0, 1), Channel::kColour);
+        REQUIRE( s1->enable(FrameID(0, 0), Channel::kColour));
+        REQUIRE( s1->enable(FrameID(0, 1), Channel::kColour));
+
+        ftl::protocol::StreamPacketMSGPACK spkt;
+        ftl::protocol::PacketMSGPACK pkt;
+        spkt.streamID = 0;
+        spkt.frame_number = 0;
+        spkt.channel = Channel::kEndFrame;
+
+        thr.join();
+
+        s1->lastSpkt.channel = Channel::kNone;
+
+        for (int i=0; i<30; ++i) {
+            spkt.frame_number = i & 0x1;
+            spkt.timestamp = i >> 1;
+            writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
+            p->data();
+            sleep_for(milliseconds(50));
+        }
+
+        REQUIRE( s1->lastSpkt.channel == Channel::kColour );
+        REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
+    }
+
     SECTION("responds to requests") {
         auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
         
-- 
GitLab