diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index f8af2089d1af1ae923ef3620612c2986ce0237f9..86cb4cdf37bc3e9bc51868094fedec66b2cf02fe 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -31,6 +31,11 @@ struct Request { ftl::protocol::Codec codec; }; +/** + * The maximum number of frames a client can request in a single request. + */ +static const int kMaxFrames = 100; + using RequestCallback = std::function<bool(const ftl::protocol::Request&)>; using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>; diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index a0442ae993842ca654267853d45e4fab3a3734ff..7a7403f0fda20010e2ea559cb4946092fbe77e93 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -6,6 +6,7 @@ #include <list> #include <string> +#include <algorithm> #include "netstream.hpp" #include <ftl/time.hpp> #include "../uuidMSGPACK.hpp" @@ -29,8 +30,6 @@ using ftl::protocol::Channel; using ftl::protocol::Codec; using ftl::protocol::FrameID; using ftl::protocol::Error; -using ftl::protocol::kAllFrames; -using ftl::protocol::kAllFramesets; using ftl::protocol::StreamProperty; using std::string; using std::optional; @@ -133,40 +132,47 @@ bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) { if (host_) { SHARED_LOCK(mutex_, lk); - for (auto &client : clients_) { - // Strip packet data if channel is not wanted by client - const bool strip = - static_cast<int>(spkt.channel) < 32 && pkt.data.size() > 0 - && ((1 << static_cast<int>(spkt.channel)) & client.channels) == 0; - - try { - int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp); - - // TODO(Nick): msgpack only once and broadcast. - // TODO(Nick): send in parallel and then wait on all futures? - // Or send non-blocking and wait - if (!net_->send(client.peerid, - base_uri_, - pre_transmit_latency, // Time since timestamp for tx - spkt_net, - (strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) { - // Send failed so mark as client stream completed - client.txcount = client.txmax; - } else { - if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); - - // Count frame as completed only if last block and channel is 0 - // FIXME: This is unreliable, colour might not exist etc. - if (spkt_net.streamID == 0 && spkt.frame_number == 0 && spkt.channel == Channel::kColour) { - ++client.txcount; + + const FrameID frameId(spkt.streamID, spkt.frame_number); + + // If this particular frame has clients then loop over them + if (clients_.count(frameId) > 0) { + auto &clients = clients_.at(frameId); + + for (auto &client : clients) { + // Strip packet data if channel is not wanted by client + const bool strip = + static_cast<int>(spkt.channel) < 32 && pkt.data.size() > 0 + && ((1 << static_cast<int>(spkt.channel)) & client.channels) == 0; + + try { + int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp); + + // TODO(Nick): msgpack only once and broadcast. + // TODO(Nick): send in parallel and then wait on all futures? + // Or send non-blocking and wait + if (!net_->send(client.peerid, + base_uri_, + pre_transmit_latency, // Time since timestamp for tx + spkt_net, + (strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) { + // Send failed so mark as client stream completed + client.txcount = 0; + } else { + if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp); + + // Count every frame sent + if (spkt.channel == Channel::kEndFrame) { + --client.txcount; + } } + } catch(...) { + client.txcount = 0; } - } catch(...) { - client.txcount = client.txmax; - } - if (client.txcount >= client.txmax) { - hasStale = true; + if (client.txcount <= 0) { + hasStale = true; + } } } } else { @@ -230,7 +236,7 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket // for (size_t i = 0; i < size(); ++i) { const auto &sel = enabledChannels(localFrame); for (auto c : sel) { - _sendRequest(c, localFrame.frameset(), kAllFrames, frames_to_request_, 255); + _sendRequest(c, localFrame.frameset(), localFrame.source(), frames_to_request_, 255); } //} tally_ = frames_to_request_; @@ -307,7 +313,7 @@ void Net::refresh() { auto sel = enabledChannels(i); for (auto c : sel) { - _sendRequest(c, i.frameset(), kAllFrames, frames_to_request_, 255, true); + _sendRequest(c, i.frameset(), i.source(), frames_to_request_, 255, true); } } tally_ = frames_to_request_; @@ -347,7 +353,7 @@ bool Net::enable(FrameID id) { if (host_) { return false; } if (!_enable(id)) return false; if (!Stream::enable(id)) return false; - _sendRequest(Channel::kColour, id.frameset(), kAllFrames, kFramesToRequest, 255, true); + _sendRequest(Channel::kColour, id.frameset(), id.source(), kFramesToRequest, 255, true); return true; } @@ -356,7 +362,7 @@ bool Net::enable(FrameID id, Channel c) { if (host_) { return false; } if (!_enable(id)) return false; if (!Stream::enable(id, c)) return false; - _sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true); + _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true); return true; } @@ -365,7 +371,7 @@ bool Net::enable(FrameID id, const ChannelSet &channels) { if (!_enable(id)) return false; if (!Stream::enable(id, channels)) return false; for (auto c : channels) { - _sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true); + _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true); } return true; } @@ -403,14 +409,24 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun void Net::_cleanUp() { UNIQUE_LOCK(mutex_, lk); - for (auto i = clients_.begin(); i != clients_.end(); ++i) { - auto &client = *i; - if (client.txcount >= client.txmax) { - if (client.peerid == time_peer_) { - time_peer_ = ftl::UUID(0); + for (auto i = clients_.begin(); i != clients_.end();) { + auto &clients = i->second; + for (auto j = clients.begin(); j != clients.end();) { + auto &client = *j; + if (client.txcount <= 0) { + if (client.peerid == time_peer_) { + time_peer_ = ftl::UUID(0); + } + DLOG(INFO) << "Remove peer: " << client.peerid.to_string(); + j = clients.erase(j); + } else { + ++j; } - DLOG(INFO) << "Remove peer: " << client.peerid.to_string(); + } + if (clients.size() == 0) { i = clients_.erase(i); + } else { + ++i; } } } @@ -423,19 +439,25 @@ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacke bool found = false; DLOG(INFO) << "processing request: " << int(spkt->streamID) << ", " << int(spkt->channel); + const FrameID frameId(spkt->streamID, spkt->frame_number); + if (p) { SHARED_LOCK(mutex_, lk); - // Does the client already exist - for (auto &c : clients_) { - if (c.peerid == p->id()) { - // Yes, so reset internal request counters - c.txcount = 0; - c.txmax = static_cast<int>(pkt.frame_count); - if (static_cast<int>(spkt->channel) < 32) { - c.channels |= 1 << static_cast<int>(spkt->channel); + + if (clients_.count(frameId) > 0) { + auto &clients = clients_.at(frameId); + + // Does the client already exist + for (auto &c : clients) { + if (c.peerid == p->id()) { + // Yes, so reset internal request counters + c.txcount = std::max(static_cast<int>(c.txcount), static_cast<int>(pkt.frame_count)); + if (static_cast<int>(spkt->channel) < 32) { + c.channels |= 1 << static_cast<int>(spkt->channel); + } + found = true; + // break; } - found = true; - // break; } } } @@ -445,11 +467,11 @@ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacke { UNIQUE_LOCK(mutex_, lk); - auto &client = clients_.emplace_back(); + auto &clients = clients_[frameId]; + auto &client = clients.emplace_back(); client.peerid = p->id(); client.quality = 255; // TODO(Nick): Use quality given in packet - client.txcount = 0; - client.txmax = static_cast<int>(pkt.frame_count); + client.txcount = std::max(static_cast<int>(client.txcount), static_cast<int>(pkt.frame_count)); if (static_cast<int>(spkt->channel) < 32) { client.channels |= 1 << static_cast<int>(spkt->channel); } diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index dde225e13609aa3d12027e108c4202e371b1f503..565b94273cb7e174e118f6ec8af53eb1b019427e 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -8,6 +8,7 @@ #include <string> #include <list> +#include <unordered_map> #include "../universe.hpp" #include <ftl/threads.hpp> #include <ftl/protocol/packet.hpp> @@ -21,17 +22,11 @@ namespace detail { struct StreamClient { ftl::UUID peerid; std::atomic<int> txcount; // Frames sent since last request - int txmax; // Frames to send in request std::atomic<uint32_t> channels; // A channel mask, those that have been requested uint8_t quality; }; } -/** - * The maximum number of frames a client can request in a single request. - */ -static const int kMaxFrames = 100; - struct NetStats { float rxRate; float txRate; @@ -124,7 +119,7 @@ class Net : public Stream { static int64_t last_msg__; static MUTEX msg_mtx__; - std::list<detail::StreamClient> clients_; + std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_; bool _enable(FrameID id); bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt); diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index 71701b084d79fe3661ecfc9da9a7049a3bbff1ae..d2b15271f2724cfb6ce447d98e3941df9eda564b 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -41,23 +41,23 @@ class MockNetStream : public ftl::protocol::Net { // --- Tests ------------------------------------------------------------------- TEST_CASE("Net stream options") { - SECTION("can get correct URI") { - auto s1 = ftl::createStream("ftl://mystream?opt=none"); - REQUIRE( s1 ); - REQUIRE( s1->begin() ); + SECTION("can get correct URI") { + auto s1 = ftl::createStream("ftl://mystream?opt=none"); + REQUIRE( s1 ); + REQUIRE( s1->begin() ); - REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" ); - } + REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" ); + } SECTION("can get a name") { - auto s1 = ftl::createStream("ftl://mystream?opt=none"); - REQUIRE( s1 ); - REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 ); - } + auto s1 = ftl::createStream("ftl://mystream?opt=none"); + REQUIRE( s1 ); + REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 ); + } - SECTION("can pause the stream") { - auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); - REQUIRE( s1->begin() ); + SECTION("can pause the stream") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + REQUIRE( s1->begin() ); StreamPacket spkt; spkt.timestamp = 100; @@ -77,34 +77,34 @@ TEST_CASE("Net stream options") { spkt.timestamp = 200; REQUIRE( s1->post(spkt, pkt) ); REQUIRE( s1->lastSpkt.timestamp == 100 ); - REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) ); - } + REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) ); + } } TEST_CASE("Net stream sending requests") { auto p = createMockPeer(0); fakedata[0] = ""; send_handshake(*p.get()); - p->data(); - sleep_for(milliseconds(50)); + p->data(); + sleep_for(milliseconds(50)); - SECTION("cannot enable if not seen") { - auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); - REQUIRE( s1->begin() ); + SECTION("cannot enable if not seen") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + REQUIRE( s1->begin() ); REQUIRE( !s1->enable(FrameID(1, 1), Channel::kDepth)); - } + } SECTION("sends request on enable") { - auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); - - // Thread to provide response to otherwise blocking call - std::thread thr([&p]() { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + + // Thread to provide response to otherwise blocking call + std::thread thr([&p]() { auto z = std::make_unique<msgpack::zone>(); provideResponses(p, 0, { {false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))}, {true, "enable_stream", {}}, }); - }); + }); REQUIRE( s1->begin() ); @@ -115,14 +115,14 @@ TEST_CASE("Net stream sending requests") { thr.join(); REQUIRE( s1->lastSpkt.streamID == 1 ); - REQUIRE( int(s1->lastSpkt.frame_number) == 255 ); // TODO: update when this is fixed + REQUIRE( int(s1->lastSpkt.frame_number) == 1 ); // TODO: update when this is fixed REQUIRE( s1->lastSpkt.channel == Channel::kDepth ); REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); - } + } SECTION("responds to requests") { - auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); - + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + REQUIRE( s1->begin() ); bool seenReq = false; @@ -143,7 +143,7 @@ TEST_CASE("Net stream sending requests") { sleep_for(milliseconds(50)); REQUIRE( seenReq ); - } + } p.reset(); ftl::protocol::reset(); @@ -153,12 +153,12 @@ TEST_CASE("Net stream can see received data") { auto p = createMockPeer(0); fakedata[0] = ""; send_handshake(*p.get()); - p->data(); - sleep_for(milliseconds(50)); + p->data(); + sleep_for(milliseconds(50)); SECTION("available if packet is seen") { - auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); - + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true); + REQUIRE( s1->begin() ); bool seenReq = false; @@ -179,7 +179,7 @@ TEST_CASE("Net stream can see received data") { sleep_for(milliseconds(50)); REQUIRE( seenReq ); REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) ); - } + } p.reset(); ftl::protocol::reset(); diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 9eeb565f6496427d349cf390035dd55ff9c91699..fd98adeb835591aadeb894e35889f276bd8e3a73 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -16,85 +16,128 @@ using ftl::protocol::StreamProperty; // --- Tests ------------------------------------------------------------------- TEST_CASE("TCP Stream", "[net]") { - std::mutex mtx; - - auto self = ftl::createDummySelf(); - self->listen(ftl::URI("tcp://localhost:0")); - - auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); - LOG(INFO) << uri; - auto p = ftl::connectNode(uri); - p->waitConnection(5); - - SECTION("fails if stream doesn't exist") { - auto s1 = self->getStream("ftl://mystream_bad"); - REQUIRE( s1 ); - - auto seenError = ftl::protocol::Error::kNoError; - auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) { - seenError = err; - return true; - }); - - REQUIRE( s1->begin() ); - REQUIRE( !s1->enable(FrameID(0, 0)) ); - REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist ); - } - - SECTION("single enabled packet stream") { - std::condition_variable cv; - std::unique_lock<std::mutex> lk(mtx); - - auto s1 = ftl::createStream("ftl://mystream"); - REQUIRE( s1 ); - - auto s2 = self->getStream("ftl://mystream"); - REQUIRE( s2 ); - - ftl::protocol::DataPacket rpkt; - rpkt.bitrate = 20; - - auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { - rpkt = pkt; - cv.notify_one(); - return true; - }); - - bool seenReq = false; - auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) { - seenReq = true; - return true; - }); - - s1->begin(); - s2->begin(); - - s2->enable(FrameID(0, 0)); - - // TODO: Find better option - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - - REQUIRE( seenReq ); - - ftl::protocol::StreamPacket spkt; - spkt.streamID = 0; - spkt.frame_number = 0; - spkt.channel = ftl::protocol::Channel::kColour; - ftl::protocol::DataPacket pkt; - pkt.bitrate = 10; - pkt.codec = ftl::protocol::Codec::kJPG; - pkt.frame_count = 1; - s1->post(spkt, pkt); - - bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; }); - REQUIRE( r ); - REQUIRE( rpkt.bitrate == 10 ); - REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG ); - REQUIRE( rpkt.frame_count == 1 ); - - REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 ); - } - - p.reset(); - ftl::protocol::reset(); + std::mutex mtx; + + auto self = ftl::createDummySelf(); + self->listen(ftl::URI("tcp://localhost:0")); + + auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort()); + LOG(INFO) << uri; + auto p = ftl::connectNode(uri); + p->waitConnection(5); + + SECTION("fails if stream doesn't exist") { + auto s1 = self->getStream("ftl://mystream_bad"); + REQUIRE( s1 ); + + auto seenError = ftl::protocol::Error::kNoError; + auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) { + seenError = err; + return true; + }); + + REQUIRE( s1->begin() ); + REQUIRE( !s1->enable(FrameID(0, 0)) ); + REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist ); + } + + SECTION("single enabled packet stream") { + std::condition_variable cv; + std::unique_lock<std::mutex> lk(mtx); + + auto s1 = ftl::createStream("ftl://mystream"); + REQUIRE( s1 ); + + auto s2 = self->getStream("ftl://mystream"); + REQUIRE( s2 ); + + ftl::protocol::DataPacket rpkt; + rpkt.bitrate = 20; + + auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { + rpkt = pkt; + cv.notify_one(); + return true; + }); + + bool seenReq = false; + auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) { + seenReq = true; + return true; + }); + + s1->begin(); + s2->begin(); + + s2->enable(FrameID(0, 0)); + + // TODO: Find better option + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + REQUIRE( seenReq ); + + ftl::protocol::StreamPacket spkt; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = ftl::protocol::Channel::kColour; + ftl::protocol::DataPacket pkt; + pkt.bitrate = 10; + pkt.codec = ftl::protocol::Codec::kJPG; + pkt.frame_count = 1; + s1->post(spkt, pkt); + + bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; }); + REQUIRE( r ); + REQUIRE( rpkt.bitrate == 10 ); + REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG ); + REQUIRE( rpkt.frame_count == 1 ); + + REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 ); + } + + SECTION("stops sending when request expires") { + std::atomic_int rcount = 0; + auto s1 = ftl::createStream("ftl://mystream"); + REQUIRE( s1 ); + + auto s2 = self->getStream("ftl://mystream"); + REQUIRE( s2 ); + + auto h = s2->onPacket([&rcount](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { + ++rcount; + return true; + }); + + s1->begin(); + s2->begin(); + + s2->enable(FrameID(0, 0)); + + // TODO: Find better option + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + ftl::protocol::StreamPacket spkt; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = ftl::protocol::Channel::kEndFrame; + ftl::protocol::DataPacket pkt; + pkt.bitrate = 10; + pkt.codec = ftl::protocol::Codec::kJPG; + pkt.frame_count = 1; + + for (int i=0; i<30 + 20; ++i) { + spkt.timestamp = i; + s1->post(spkt, pkt); + } + + // TODO: Find better option + int k = 10; + while (--k > 0 && rcount < 30) { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + REQUIRE( rcount == 30 ); + } + + p.reset(); + ftl::protocol::reset(); }