diff --git a/.vscode/settings.json b/.vscode/settings.json index c6f7c1de41fad69133ad3483300676bc5fef44d4..713f547df23addb0c74a2d6fe5b32910b176613b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -65,7 +65,8 @@ "typeinfo": "cpp", "valarray": "cpp", "variant": "cpp", - "any": "cpp" + "any": "cpp", + "complex": "cpp" }, "cmake.cmakePath": "cmake" } \ No newline at end of file diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index a258a35936bdf512397b1a41ce7b72af1c3bd50d..fe8ce022fd9dbb9a70da6867987656dc948f8f91 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -62,7 +62,8 @@ enum struct StreamProperty { kName, kDescription, kTags, - kUser + kUser, + kRequestSize }; /** @@ -233,6 +234,13 @@ class Stream { */ std::unordered_set<FrameID> enabled() const; + /** + * @brief Get all enabled frames in a frameset. + * + * @return Set of frame IDs + */ + std::unordered_set<FrameID> enabled(unsigned int) const; + /** * @brief Check if a frame is enabled. * diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index a9b5c1582da753a40e485c32f47c6213e2d55696..8b630a9f8ebad099645422996f738c54d74fe5ae 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -87,7 +87,7 @@ void Net::installRPC(ftl::net::Universe *net) { } Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) : - net_(net), time_peer_(ftl::UUID(0)), uri_(uri), host_(host) { + net_(net), uri_(uri), host_(host) { ftl::URI u(uri_); if (!u.isValid() || !(u.getScheme() == ftl::URI::SCHEME_FTL)) { error(Error::kBadURI, uri_); @@ -226,36 +226,26 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket if (paused_) return; // Manage recuring requests - if (!host_ && last_frame_ != spkt.timestamp) { - UNIQUE_LOCK(mutex_, lk); - if (last_frame_ != spkt.timestamp) { - // int tc = now - last_completion_; // Milliseconds since last frame completed - frame_time_ = spkt.timestamp - last_frame_; // Milliseconds per frame - last_completion_ = now; - bytes_received_ = 0; - last_frame_ = spkt.timestamp; - - lk.unlock(); - - // Are we close to reaching the end of our frames request? - if (tally_ <= 5) { - // Yes, so send new requests - // FIXME: Do this for all frames, or use tally be frame - // for (size_t i = 0; i < size(); ++i) { - const auto &sel = enabledChannels(localFrame); - for (auto c : sel) { - _sendRequest(c, localFrame.frameset(), localFrame.source(), frames_to_request_, 255); - } - //} - tally_ = frames_to_request_; - } else { - --tally_; + if (!host_ && spkt.channel == Channel::kEndFrame && localFrame.frameset() < tally_.size()) { + frame_time_ = spkt.timestamp - last_frame_; // Milliseconds per frame + last_frame_ = spkt.timestamp; + + // Are we close to reaching the end of our frames request? + if (tally_[localFrame.frameset()] <= 5) { + // Yes, so send new requests + for (const auto f : enabled(localFrame.frameset())) { + const auto &sel = enabledChannels(f); + for (auto c : sel) { + _sendRequest(c, f.frameset(), f.source(), frames_to_request_, 255); + } } + tally_[localFrame.frameset()] = frames_to_request_; + } else { + --tally_[localFrame.frameset()]; } } bytes_received_ += pkt.data.size(); - // time_at_last_ = now; // If hosting and no data then it is a request for data // Note: a non host can receive empty data, meaning data is available @@ -362,7 +352,7 @@ bool Net::begin() { net_->broadcast("add_stream", uri_); } else { - tally_ = frames_to_request_; + for (size_t i = 0; i < tally_.size(); ++i) tally_[i] = frames_to_request_; active_ = true; } @@ -381,7 +371,8 @@ void Net::refresh() { _sendRequest(c, i.frameset(), i.source(), frames_to_request_, 255, true); } } - tally_ = frames_to_request_; + + for (size_t i = 0; i < tally_.size(); ++i) tally_[i] = frames_to_request_; } void Net::reset() { @@ -390,6 +381,7 @@ void Net::reset() { bool Net::_enable(FrameID id) { if (host_) { return false; } + if (peer_) return true; if (enabled(id)) return true; // not hosting, try to find peer now @@ -418,7 +410,7 @@ bool Net::enable(FrameID id) { if (host_) { return false; } if (!_enable(id)) return false; if (!Stream::enable(id)) return false; - _sendRequest(Channel::kColour, id.frameset(), id.source(), kFramesToRequest, 255, true); + _sendRequest(Channel::kColour, id.frameset(), id.source(), frames_to_request_, 255, true); return true; } @@ -427,7 +419,7 @@ bool Net::enable(FrameID id, Channel c) { if (host_) { return false; } if (!_enable(id)) return false; if (!Stream::enable(id, c)) return false; - _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true); + _sendRequest(c, id.frameset(), id.source(), frames_to_request_, 255, true); return true; } @@ -436,7 +428,7 @@ bool Net::enable(FrameID id, const ChannelSet &channels) { if (!_enable(id)) return false; if (!Stream::enable(id, channels)) return false; for (auto c : channels) { - _sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true); + _sendRequest(c, id.frameset(), id.source(), frames_to_request_, 255, true); } return true; } @@ -479,9 +471,6 @@ void Net::_cleanUp() { for (auto j = clients.begin(); j != clients.end();) { auto &client = *j; if (client.txcount <= 0) { - if (client.peerid == time_peer_) { - time_peer_ = ftl::UUID(0); - } DLOG(INFO) << "Remove peer: " << client.peerid.to_string(); j = clients.erase(j); } else { @@ -507,6 +496,8 @@ bool Net::_processRequest(ftl::net::Peer *p, const StreamPacket *spkt, DataPacke // Generate a batch of requests ftl::protocol::StreamPacket spkt2 = *spkt; for (const auto &i : frames()) { + if (spkt->streamID != 255 && i.frameset() != spkt->streamID) continue; + if (spkt->frame_number != 255 && i.source() != spkt->frame_number) continue; spkt2.streamID = i.frameset(); spkt2.frame_number = i.source(); _processRequest(p, &spkt2, pkt); @@ -630,6 +621,7 @@ void Net::setProperty(ftl::protocol::StreamProperty opt, std::any value) { case StreamProperty::kMaxBitrate : bitrate_ = std::any_cast<int>(value); break; case StreamProperty::kPaused : paused_ = std::any_cast<bool>(value); break; case StreamProperty::kName : name_ = std::any_cast<std::string>(value); break; + case StreamProperty::kRequestSize : frames_to_request_ = std::any_cast<int>(value); break; case StreamProperty::kObservers : case StreamProperty::kBytesSent : case StreamProperty::kBytesReceived : @@ -652,6 +644,7 @@ std::any Net::getProperty(ftl::protocol::StreamProperty opt) { case StreamProperty::kFrameRate : return (frame_time_ > 0) ? 1000 / frame_time_ : 0; case StreamProperty::kLatency : return 0; case StreamProperty::kName : return name_; + case StreamProperty::kRequestSize : return frames_to_request_; default : throw FTL_Error("Unsupported property"); } } @@ -667,6 +660,7 @@ bool Net::supportsProperty(ftl::protocol::StreamProperty opt) { case StreamProperty::kLatency : case StreamProperty::kFrameRate : case StreamProperty::kName : + case StreamProperty::kRequestSize : case StreamProperty::kURI : return true; default : return false; } diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index baabbf1afa095ee303f0b69f6f3ac954ca50b000..f803be3d6ed55058d83a1eb21ce9887c93076b7b 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -8,6 +8,7 @@ #include <string> #include <list> +#include <atomic> #include <unordered_map> #include "../universe.hpp" #include <ftl/threads.hpp> @@ -88,32 +89,21 @@ class Net : public Stream { SHARED_MUTEX mutex_; bool active_ = false; ftl::net::Universe *net_; - int64_t clock_adjust_ = 0; - ftl::UUID time_peer_; std::optional<ftl::UUID> peer_; int64_t last_frame_ = 0; - int64_t last_ping_ = 0; int64_t frame_time_ = 0; std::string uri_; std::string base_uri_; const bool host_; - int tally_ = 0; - std::array<std::atomic<int>, 32> reqtally_ = {0}; - ftl::protocol::ChannelSet last_selected_; + std::array<std::atomic_int, 5> tally_ = {}; uint8_t bitrate_ = 255; std::atomic_int64_t bytes_received_ = 0; - int64_t last_completion_ = 0; - int64_t time_at_last_ = 0; - float required_bps_ = 0.0f; - float actual_bps_ = 0.0f; bool paused_ = false; int frames_to_request_ = kFramesToRequest; std::string name_; ftl::PacketManager mgr_; ftl::Handler<ftl::net::Peer*> connect_cb_; - uint32_t local_fsid_ = 0; - static std::atomic_size_t req_bitrate__; static std::atomic_size_t tx_bitrate__; static std::atomic_size_t rx_sample_count__; diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp index ba699f0f41f58be82e758bb80e83b87f5a69336b..f64496b2ae3eba4202240e0ea6885146e85b3b5f 100644 --- a/src/streams/streams.cpp +++ b/src/streams/streams.cpp @@ -87,6 +87,18 @@ std::unordered_set<FrameID> Stream::enabled() const { return result; } +std::unordered_set<FrameID> Stream::enabled(unsigned int fs) const { + SHARED_LOCK(mtx_, lk); + std::unordered_set<FrameID> result; + for (const auto &s : state_) { + if (!s.second) continue; + if (s.second->enabled && FrameID(s.first).frameset() == fs) { + result.emplace(s.first); + } + } + return result; +} + bool Stream::enabled(FrameID id) const { auto state = _getState(id); if (!state) return false; diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index 51d7e70bd37fea44fa7ec019fccf0ff8e5aed75b..76ff57242bc00d4469e452fcf44b72b699e4bbd1 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -120,6 +120,89 @@ TEST_CASE("Net stream sending requests") { REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); } + SECTION("sends repeat requests - single frame") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + + // Thread to provide response to otherwise blocking call + std::thread thr([&p]() { + auto z = std::make_unique<msgpack::zone>(); + provideResponses(p, 0, { + {false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))}, + {true, "enable_stream", {}}, + }); + }); + + s1->setProperty(StreamProperty::kRequestSize, 10); + + REQUIRE( s1->begin() ); + + s1->forceSeen(FrameID(0, 0), Channel::kColour); + REQUIRE( s1->enable(FrameID(0, 0), Channel::kColour)); + + ftl::protocol::StreamPacketMSGPACK spkt; + ftl::protocol::PacketMSGPACK pkt; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = Channel::kEndFrame; + + thr.join(); + + s1->lastSpkt.channel = Channel::kNone; + + for (int i=0; i<20; ++i) { + spkt.timestamp = i; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); + sleep_for(milliseconds(50)); + } + + REQUIRE( s1->lastSpkt.channel == Channel::kColour ); + REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); + } + + SECTION("sends repeat requests - multi frame") { + auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false); + + // Thread to provide response to otherwise blocking call + std::thread thr([&p]() { + auto z = std::make_unique<msgpack::zone>(); + provideResponses(p, 0, { + {false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))}, + {true, "enable_stream", {}}, + }); + }); + + s1->setProperty(StreamProperty::kRequestSize, 10); + + REQUIRE( s1->begin() ); + + s1->forceSeen(FrameID(0, 0), Channel::kColour); + s1->forceSeen(FrameID(0, 1), Channel::kColour); + REQUIRE( s1->enable(FrameID(0, 0), Channel::kColour)); + REQUIRE( s1->enable(FrameID(0, 1), Channel::kColour)); + + ftl::protocol::StreamPacketMSGPACK spkt; + ftl::protocol::PacketMSGPACK pkt; + spkt.streamID = 0; + spkt.frame_number = 0; + spkt.channel = Channel::kEndFrame; + + thr.join(); + + s1->lastSpkt.channel = Channel::kNone; + + for (int i=0; i<30; ++i) { + spkt.frame_number = i & 0x1; + spkt.timestamp = i >> 1; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); + sleep_for(milliseconds(50)); + } + + REQUIRE( s1->lastSpkt.channel == Channel::kColour ); + REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 ); + } + SECTION("responds to requests") { auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);