From ac9227c993872d4ef7a4724e096dad0ad0c7183d Mon Sep 17 00:00:00 2001
From: Nicolas Pope <nicolas.pope@utu.fi>
Date: Wed, 8 Jun 2022 08:58:35 +0000
Subject: [PATCH] #23 Fix netstream tx counting

---
 include/ftl/protocol/streams.hpp |   5 +
 src/streams/netstream.cpp        | 136 +++++++++++---------
 src/streams/netstream.hpp        |   9 +-
 test/netstream_unit.cpp          |  72 +++++------
 test/stream_integration.cpp      | 205 +++++++++++++++++++------------
 5 files changed, 246 insertions(+), 181 deletions(-)

diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp
index f8af208..86cb4cd 100644
--- a/include/ftl/protocol/streams.hpp
+++ b/include/ftl/protocol/streams.hpp
@@ -31,6 +31,11 @@ struct Request {
     ftl::protocol::Codec codec;
 };
 
+/**
+ * The maximum number of frames a client can request in a single request.
+ */
+static const int kMaxFrames = 100;
+
 using RequestCallback = std::function<bool(const ftl::protocol::Request&)>;
 
 using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>;
diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp
index a0442ae..7a7403f 100644
--- a/src/streams/netstream.cpp
+++ b/src/streams/netstream.cpp
@@ -6,6 +6,7 @@
 
 #include <list>
 #include <string>
+#include <algorithm>
 #include "netstream.hpp"
 #include <ftl/time.hpp>
 #include "../uuidMSGPACK.hpp"
@@ -29,8 +30,6 @@ using ftl::protocol::Channel;
 using ftl::protocol::Codec;
 using ftl::protocol::FrameID;
 using ftl::protocol::Error;
-using ftl::protocol::kAllFrames;
-using ftl::protocol::kAllFramesets;
 using ftl::protocol::StreamProperty;
 using std::string;
 using std::optional;
@@ -133,40 +132,47 @@ bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) {
 
     if (host_) {
         SHARED_LOCK(mutex_, lk);
-        for (auto &client : clients_) {
-            // Strip packet data if channel is not wanted by client
-            const bool strip =
-                static_cast<int>(spkt.channel) < 32 && pkt.data.size() > 0
-                && ((1 << static_cast<int>(spkt.channel)) & client.channels) == 0;
-
-            try {
-                int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp);
-
-                // TODO(Nick): msgpack only once and broadcast.
-                // TODO(Nick): send in parallel and then wait on all futures?
-                // Or send non-blocking and wait
-                if (!net_->send(client.peerid,
-                        base_uri_,
-                        pre_transmit_latency,  // Time since timestamp for tx
-                        spkt_net,
-                        (strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) {
-                    // Send failed so mark as client stream completed
-                    client.txcount = client.txmax;
-                } else {
-                    if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
-
-                    // Count frame as completed only if last block and channel is 0
-                    // FIXME: This is unreliable, colour might not exist etc.
-                    if (spkt_net.streamID == 0 && spkt.frame_number == 0 && spkt.channel == Channel::kColour) {
-                        ++client.txcount;
+
+        const FrameID frameId(spkt.streamID, spkt.frame_number);
+
+        // If this particular frame has clients then loop over them
+        if (clients_.count(frameId) > 0) {
+            auto &clients = clients_.at(frameId);
+
+            for (auto &client : clients) {
+                // Strip packet data if channel is not wanted by client
+                const bool strip =
+                    static_cast<int>(spkt.channel) < 32 && pkt.data.size() > 0
+                    && ((1 << static_cast<int>(spkt.channel)) & client.channels) == 0;
+
+                try {
+                    int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp);
+
+                    // TODO(Nick): msgpack only once and broadcast.
+                    // TODO(Nick): send in parallel and then wait on all futures?
+                    // Or send non-blocking and wait
+                    if (!net_->send(client.peerid,
+                            base_uri_,
+                            pre_transmit_latency,  // Time since timestamp for tx
+                            spkt_net,
+                            (strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) {
+                        // Send failed so mark as client stream completed
+                        client.txcount = 0;
+                    } else {
+                        if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
+
+                        // Count every frame sent
+                        if (spkt.channel == Channel::kEndFrame) {
+                            --client.txcount;
+                        }
                     }
+                } catch(...) {
+                    client.txcount = 0;
                 }
-            } catch(...) {
-                client.txcount = client.txmax;
-            }
 
-            if (client.txcount >= client.txmax) {
-                hasStale = true;
+                if (client.txcount <= 0) {
+                    hasStale = true;
+                }
             }
         }
     } else {
@@ -230,7 +236,7 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
                 // for (size_t i = 0; i < size(); ++i) {
                     const auto &sel = enabledChannels(localFrame);
                     for (auto c : sel) {
-                        _sendRequest(c, localFrame.frameset(), kAllFrames, frames_to_request_, 255);
+                        _sendRequest(c, localFrame.frameset(), localFrame.source(), frames_to_request_, 255);
                     }
                 //}
                 tally_ = frames_to_request_;
@@ -307,7 +313,7 @@ void Net::refresh() {
         auto sel = enabledChannels(i);
 
         for (auto c : sel) {
-            _sendRequest(c, i.frameset(), kAllFrames, frames_to_request_, 255, true);
+            _sendRequest(c, i.frameset(), i.source(), frames_to_request_, 255, true);
         }
     }
     tally_ = frames_to_request_;
@@ -347,7 +353,7 @@ bool Net::enable(FrameID id) {
     if (host_) { return false; }
     if (!_enable(id)) return false;
     if (!Stream::enable(id)) return false;
-    _sendRequest(Channel::kColour, id.frameset(), kAllFrames, kFramesToRequest, 255, true);
+    _sendRequest(Channel::kColour, id.frameset(), id.source(), kFramesToRequest, 255, true);
 
     return true;
 }
@@ -356,7 +362,7 @@ bool Net::enable(FrameID id, Channel c) {
     if (host_) { return false; }
     if (!_enable(id)) return false;
     if (!Stream::enable(id, c)) return false;
-    _sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true);
+    _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true);
     return true;
 }
 
@@ -365,7 +371,7 @@ bool Net::enable(FrameID id, const ChannelSet &channels) {
     if (!_enable(id)) return false;
     if (!Stream::enable(id, channels)) return false;
     for (auto c : channels) {
-        _sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true);
+        _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true);
     }
     return true;
 }
@@ -403,14 +409,24 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun
 
 void Net::_cleanUp() {
     UNIQUE_LOCK(mutex_, lk);
-    for (auto i = clients_.begin(); i != clients_.end(); ++i) {
-        auto &client = *i;
-        if (client.txcount >= client.txmax) {
-            if (client.peerid == time_peer_) {
-                time_peer_ = ftl::UUID(0);
+    for (auto i = clients_.begin(); i != clients_.end();) {
+        auto &clients = i->second;
+        for (auto j = clients.begin(); j != clients.end();) {
+            auto &client = *j;
+            if (client.txcount <= 0) {
+                if (client.peerid == time_peer_) {
+                    time_peer_ = ftl::UUID(0);
+                }
+                DLOG(INFO) << "Remove peer: " << client.peerid.to_string();
+                j = clients.erase(j);
+            } else {
+                ++j;
             }
-            DLOG(INFO) << "Remove peer: " << client.peerid.to_string();
+        }
+        if (clients.size() == 0) {
             i = clients_.erase(i);
+        } else {
+            ++i;
         }
     }
 }
@@ -423,19 +439,25 @@ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacke
     bool found = false;
     DLOG(INFO) << "processing request: " << int(spkt->streamID) << ", " << int(spkt->channel);
 
+    const FrameID frameId(spkt->streamID, spkt->frame_number);
+
     if (p) {
         SHARED_LOCK(mutex_, lk);
-        // Does the client already exist
-        for (auto &c : clients_) {
-            if (c.peerid == p->id()) {
-                // Yes, so reset internal request counters
-                c.txcount = 0;
-                c.txmax = static_cast<int>(pkt.frame_count);
-                if (static_cast<int>(spkt->channel) < 32) {
-                    c.channels |= 1 << static_cast<int>(spkt->channel);
+
+        if (clients_.count(frameId) > 0) {
+            auto &clients = clients_.at(frameId);
+
+            // Does the client already exist
+            for (auto &c : clients) {
+                if (c.peerid == p->id()) {
+                    // Yes, so reset internal request counters
+                    c.txcount = std::max(static_cast<int>(c.txcount), static_cast<int>(pkt.frame_count));
+                    if (static_cast<int>(spkt->channel) < 32) {
+                        c.channels |= 1 << static_cast<int>(spkt->channel);
+                    }
+                    found = true;
+                    // break;
                 }
-                found = true;
-                // break;
             }
         }
     }
@@ -445,11 +467,11 @@ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacke
         {
             UNIQUE_LOCK(mutex_, lk);
 
-            auto &client = clients_.emplace_back();
+            auto &clients = clients_[frameId];
+            auto &client = clients.emplace_back();
             client.peerid = p->id();
             client.quality = 255;  // TODO(Nick): Use quality given in packet
-            client.txcount = 0;
-            client.txmax = static_cast<int>(pkt.frame_count);
+            client.txcount = std::max(static_cast<int>(client.txcount), static_cast<int>(pkt.frame_count));
             if (static_cast<int>(spkt->channel) < 32) {
                 client.channels |= 1 << static_cast<int>(spkt->channel);
             }
diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp
index dde225e..565b942 100644
--- a/src/streams/netstream.hpp
+++ b/src/streams/netstream.hpp
@@ -8,6 +8,7 @@
 
 #include <string>
 #include <list>
+#include <unordered_map>
 #include "../universe.hpp"
 #include <ftl/threads.hpp>
 #include <ftl/protocol/packet.hpp>
@@ -21,17 +22,11 @@ namespace detail {
 struct StreamClient {
     ftl::UUID peerid;
     std::atomic<int> txcount;           // Frames sent since last request
-    int txmax;                          // Frames to send in request
     std::atomic<uint32_t> channels;     // A channel mask, those that have been requested
     uint8_t quality;
 };
 }
 
-/**
- * The maximum number of frames a client can request in a single request.
- */
-static const int kMaxFrames = 100;
-
 struct NetStats {
     float rxRate;
     float txRate;
@@ -124,7 +119,7 @@ class Net : public Stream {
     static int64_t last_msg__;
     static MUTEX msg_mtx__;
 
-    std::list<detail::StreamClient> clients_;
+    std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_;
 
     bool _enable(FrameID id);
     bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt);
diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp
index 71701b0..d2b1527 100644
--- a/test/netstream_unit.cpp
+++ b/test/netstream_unit.cpp
@@ -41,23 +41,23 @@ class MockNetStream : public ftl::protocol::Net {
 // --- Tests -------------------------------------------------------------------
 
 TEST_CASE("Net stream options") {
-	SECTION("can get correct URI") {
-		auto s1 = ftl::createStream("ftl://mystream?opt=none");
-		REQUIRE( s1 );
-		REQUIRE( s1->begin() );
+    SECTION("can get correct URI") {
+        auto s1 = ftl::createStream("ftl://mystream?opt=none");
+        REQUIRE( s1 );
+        REQUIRE( s1->begin() );
 
-		REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" );
-	}
+        REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" );
+    }
 
     SECTION("can get a name") {
-		auto s1 = ftl::createStream("ftl://mystream?opt=none");
-		REQUIRE( s1 );
-		REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 );
-	}
+        auto s1 = ftl::createStream("ftl://mystream?opt=none");
+        REQUIRE( s1 );
+        REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 );
+    }
 
-	SECTION("can pause the stream") {
-		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
-		REQUIRE( s1->begin() );
+    SECTION("can pause the stream") {
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
+        REQUIRE( s1->begin() );
 
         StreamPacket spkt;
         spkt.timestamp = 100;
@@ -77,34 +77,34 @@ TEST_CASE("Net stream options") {
         spkt.timestamp = 200;
         REQUIRE( s1->post(spkt, pkt) );
         REQUIRE( s1->lastSpkt.timestamp == 100 );
-		REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) );
-	}
+        REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) );
+    }
 }
 
 TEST_CASE("Net stream sending requests") {
     auto p = createMockPeer(0);
     fakedata[0] = "";
     send_handshake(*p.get());
-	p->data();
-	sleep_for(milliseconds(50));
+    p->data();
+    sleep_for(milliseconds(50));
 
-	SECTION("cannot enable if not seen") {
-		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
-		REQUIRE( s1->begin() );
+    SECTION("cannot enable if not seen") {
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
+        REQUIRE( s1->begin() );
         REQUIRE( !s1->enable(FrameID(1, 1), Channel::kDepth));
-	}
+    }
 
     SECTION("sends request on enable") {
-		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
-		
-		// Thread to provide response to otherwise blocking call
-		std::thread thr([&p]() {
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
+        
+        // Thread to provide response to otherwise blocking call
+        std::thread thr([&p]() {
             auto z = std::make_unique<msgpack::zone>();
             provideResponses(p, 0, {
                 {false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))},
                 {true, "enable_stream", {}},
             });
-		});
+        });
 
         REQUIRE( s1->begin() );
 
@@ -115,14 +115,14 @@ TEST_CASE("Net stream sending requests") {
         thr.join();
 
         REQUIRE( s1->lastSpkt.streamID == 1 );
-        REQUIRE( int(s1->lastSpkt.frame_number) == 255 );  // TODO: update when this is fixed
+        REQUIRE( int(s1->lastSpkt.frame_number) == 1 );  // TODO: update when this is fixed
         REQUIRE( s1->lastSpkt.channel == Channel::kDepth );
         REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
-	}
+    }
 
     SECTION("responds to requests") {
-		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
-		
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
+        
         REQUIRE( s1->begin() );
 
         bool seenReq = false;
@@ -143,7 +143,7 @@ TEST_CASE("Net stream sending requests") {
 
         sleep_for(milliseconds(50));
         REQUIRE( seenReq );
-	}
+    }
 
     p.reset();
     ftl::protocol::reset();
@@ -153,12 +153,12 @@ TEST_CASE("Net stream can see received data") {
     auto p = createMockPeer(0);
     fakedata[0] = "";
     send_handshake(*p.get());
-	p->data();
-	sleep_for(milliseconds(50));
+    p->data();
+    sleep_for(milliseconds(50));
 
     SECTION("available if packet is seen") {
-		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
-		
+        auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
+        
         REQUIRE( s1->begin() );
 
         bool seenReq = false;
@@ -179,7 +179,7 @@ TEST_CASE("Net stream can see received data") {
         sleep_for(milliseconds(50));
         REQUIRE( seenReq );
         REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) );
-	}
+    }
 
     p.reset();
     ftl::protocol::reset();
diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp
index 9eeb565..fd98ade 100644
--- a/test/stream_integration.cpp
+++ b/test/stream_integration.cpp
@@ -16,85 +16,128 @@ using ftl::protocol::StreamProperty;
 // --- Tests -------------------------------------------------------------------
 
 TEST_CASE("TCP Stream", "[net]") {
-	std::mutex mtx;
-
-	auto self = ftl::createDummySelf();
-	self->listen(ftl::URI("tcp://localhost:0")); 
-
-	auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
-	LOG(INFO) << uri;
-	auto p = ftl::connectNode(uri);
-	p->waitConnection(5);
-
-	SECTION("fails if stream doesn't exist") {
-		auto s1 = self->getStream("ftl://mystream_bad");
-		REQUIRE( s1 );
-
-		auto seenError = ftl::protocol::Error::kNoError;
-		auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) {
-			seenError = err;
-			return true;
-		});
-
-		REQUIRE( s1->begin() );
-		REQUIRE( !s1->enable(FrameID(0, 0)) );
-		REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist );
-	}
-
-	SECTION("single enabled packet stream") {
-		std::condition_variable cv;
-		std::unique_lock<std::mutex> lk(mtx);
-
-		auto s1 = ftl::createStream("ftl://mystream");
-		REQUIRE( s1 );
-
-		auto s2 = self->getStream("ftl://mystream");
-		REQUIRE( s2 );
-
-		ftl::protocol::DataPacket rpkt;
-		rpkt.bitrate = 20;
-
-		auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
-			rpkt = pkt;
-			cv.notify_one();
-			return true;
-		});
-
-		bool seenReq = false;
-		auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) {
-			seenReq = true;
-			return true;
-		});
-
-		s1->begin();
-		s2->begin();
-
-		s2->enable(FrameID(0, 0));
-
-		// TODO: Find better option
-		std::this_thread::sleep_for(std::chrono::milliseconds(10));
-
-		REQUIRE( seenReq );
-
-		ftl::protocol::StreamPacket spkt;
-		spkt.streamID = 0;
-		spkt.frame_number = 0;
-		spkt.channel = ftl::protocol::Channel::kColour;
-		ftl::protocol::DataPacket pkt;
-		pkt.bitrate = 10;
-		pkt.codec = ftl::protocol::Codec::kJPG;
-		pkt.frame_count = 1;
-		s1->post(spkt, pkt);
-
-		bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; });
-		REQUIRE( r );
-		REQUIRE( rpkt.bitrate == 10 );
-		REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG );
-		REQUIRE( rpkt.frame_count == 1 );
-
-		REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 );
-	}
-
-	p.reset();
-	ftl::protocol::reset();
+    std::mutex mtx;
+
+    auto self = ftl::createDummySelf();
+    self->listen(ftl::URI("tcp://localhost:0")); 
+
+    auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
+    LOG(INFO) << uri;
+    auto p = ftl::connectNode(uri);
+    p->waitConnection(5);
+
+    SECTION("fails if stream doesn't exist") {
+        auto s1 = self->getStream("ftl://mystream_bad");
+        REQUIRE( s1 );
+
+        auto seenError = ftl::protocol::Error::kNoError;
+        auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) {
+            seenError = err;
+            return true;
+        });
+
+        REQUIRE( s1->begin() );
+        REQUIRE( !s1->enable(FrameID(0, 0)) );
+        REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist );
+    }
+
+    SECTION("single enabled packet stream") {
+        std::condition_variable cv;
+        std::unique_lock<std::mutex> lk(mtx);
+
+        auto s1 = ftl::createStream("ftl://mystream");
+        REQUIRE( s1 );
+
+        auto s2 = self->getStream("ftl://mystream");
+        REQUIRE( s2 );
+
+        ftl::protocol::DataPacket rpkt;
+        rpkt.bitrate = 20;
+
+        auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
+            rpkt = pkt;
+            cv.notify_one();
+            return true;
+        });
+
+        bool seenReq = false;
+        auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) {
+            seenReq = true;
+            return true;
+        });
+
+        s1->begin();
+        s2->begin();
+
+        s2->enable(FrameID(0, 0));
+
+        // TODO: Find better option
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+        REQUIRE( seenReq );
+
+        ftl::protocol::StreamPacket spkt;
+        spkt.streamID = 0;
+        spkt.frame_number = 0;
+        spkt.channel = ftl::protocol::Channel::kColour;
+        ftl::protocol::DataPacket pkt;
+        pkt.bitrate = 10;
+        pkt.codec = ftl::protocol::Codec::kJPG;
+        pkt.frame_count = 1;
+        s1->post(spkt, pkt);
+
+        bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; });
+        REQUIRE( r );
+        REQUIRE( rpkt.bitrate == 10 );
+        REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG );
+        REQUIRE( rpkt.frame_count == 1 );
+
+        REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 );
+    }
+
+    SECTION("stops sending when request expires") {
+        std::atomic_int rcount = 0;
+        auto s1 = ftl::createStream("ftl://mystream");
+        REQUIRE( s1 );
+
+        auto s2 = self->getStream("ftl://mystream");
+        REQUIRE( s2 );
+
+        auto h = s2->onPacket([&rcount](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
+            ++rcount;
+            return true;
+        });
+
+        s1->begin();
+        s2->begin();
+
+        s2->enable(FrameID(0, 0));
+
+        // TODO: Find better option
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
+        ftl::protocol::StreamPacket spkt;
+        spkt.streamID = 0;
+        spkt.frame_number = 0;
+        spkt.channel = ftl::protocol::Channel::kEndFrame;
+        ftl::protocol::DataPacket pkt;
+        pkt.bitrate = 10;
+        pkt.codec = ftl::protocol::Codec::kJPG;
+        pkt.frame_count = 1;
+
+        for (int i=0; i<30 + 20; ++i) {
+            spkt.timestamp = i;
+            s1->post(spkt, pkt);
+        }
+
+        // TODO: Find better option
+        int k = 10;
+        while (--k > 0 && rcount < 30) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(20));
+        }
+        REQUIRE( rcount == 30 );
+    }
+
+    p.reset();
+    ftl::protocol::reset();
 }
-- 
GitLab