diff --git a/components/codecs/include/ftl/codecs/channels.hpp b/components/codecs/include/ftl/codecs/channels.hpp index 5eacf6a2c1a2ad2540517fb28f9497adf8323796..ac5b19a12d735a8e68fe27f5c08940996b52bfcd 100644 --- a/components/codecs/include/ftl/codecs/channels.hpp +++ b/components/codecs/include/ftl/codecs/channels.hpp @@ -87,6 +87,7 @@ class Channels { inline Channels &operator-=(unsigned int c) { mask &= ~(0x1 << (c - BASE)); return *this; } inline Channels &operator&=(const Channels<BASE> &c) { mask &= c.mask; return *this; } inline Channels operator&(const Channels<BASE> &c) const { return Channels<BASE>(mask & c.mask); } + inline Channels operator-(const Channels<BASE> &c) const { return Channels<BASE>(mask & ~c.mask);} inline bool has(Channel c) const { return (c == Channel::None || static_cast<unsigned int>(c) - BASE >= 32) ? true : mask & (0x1 << (static_cast<unsigned int>(c) - BASE)); @@ -100,6 +101,7 @@ class Channels { inline iterator end() { return iterator(*this, 32+BASE); } inline bool operator==(const Channels<BASE> &c) const { return mask == c.mask; } + inline bool operator!=(const Channels<BASE> &c) const { return mask != c.mask; } inline operator unsigned int() const { return mask; } inline operator bool() const { return mask > 0; } inline operator Channel() const { diff --git a/components/streams/include/ftl/streams/netstream.hpp b/components/streams/include/ftl/streams/netstream.hpp index 85edde6cfa7530bf93e70e3c15a0583e8c8da590..04cf48afede0172d7ec12feb24d640dd30b605cd 100644 --- a/components/streams/include/ftl/streams/netstream.hpp +++ b/components/streams/include/ftl/streams/netstream.hpp @@ -71,6 +71,8 @@ class Net : public Stream { std::string uri_; bool host_; int tally_; + std::array<std::atomic<int>,32> reqtally_; + ftl::codecs::Channels<0> last_selected_; float req_bitrate_; float sample_count_; diff --git a/components/streams/src/netstream.cpp b/components/streams/src/netstream.cpp index 964965edf6cabbcae39b5d999292b0a0a2ea726d..b5039d71aeb34bfb1f31d1dc72ba21f8e2593e39 100644 --- a/components/streams/src/netstream.cpp +++ b/components/streams/src/netstream.cpp @@ -12,6 +12,8 @@ using std::string; using std::vector; using std::optional; +static constexpr int kTallyScale = 10; + Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), net_(net), active_(false) { // TODO: Install "find_stream" binding if not installed... if (!net_->isBound("find_stream")) { @@ -50,6 +52,18 @@ bool Net::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, c bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { if (!active_) return false; + // Check if the channel has been requested recently enough. If not then disable it. + if (host_ && pkt.data.size() > 0 && static_cast<int>(spkt.channel) >= 0 && static_cast<int>(spkt.channel) < 32) { + if (reqtally_[static_cast<int>(spkt.channel)] == 0) { + auto sel = selected(0); + sel -= spkt.channel; + select(0, sel); + LOG(INFO) << "Unselect Channel: " << (int)spkt.channel; + } else { + --reqtally_[static_cast<int>(spkt.channel)]; + } + } + // Lock to prevent clients being added / removed { SHARED_LOCK(mutex_,lk); @@ -131,19 +145,34 @@ bool Net::begin() { spkt.version = 4; // Manage recuring requests - if (last_frame_ != spkt.timestamp) { + if (!host_ && last_frame_ != spkt.timestamp) { UNIQUE_LOCK(mutex_, lk); if (last_frame_ != spkt.timestamp) { last_frame_ = spkt.timestamp; + + auto sel = selected(0); + + // A change in channel selections, so send those requests now + if (sel != last_selected_) { + auto changed = sel - last_selected_; + last_selected_ = sel; + + if (size() > 0) { + for (auto c : changed) { + _sendRequest(c, kAllFramesets, kAllFrames, 30, 0); + } + } + } + + // Are we close to reaching the end of our frames request? if (tally_ <= 5) { + // Yes, so send new requests if (size() > 0) { - auto sel = selected(0); - // FIXME: Send selection changes immediately. for (auto c : sel) { _sendRequest(c, kAllFramesets, kAllFrames, 30, 0); } } - tally_ = 30; + tally_ = 30*kTallyScale; } else { --tally_; } @@ -159,6 +188,7 @@ bool Net::begin() { for (int i=0; i<size(); ++i) { select(i, selected(i) + spkt.channel); } + reqtally_[static_cast<int>(spkt.channel)] = static_cast<int>(pkt.frame_count)*size()*kTallyScale; } else { select(spkt.frameSetID(), selected(spkt.frameSetID()) + spkt.channel); } @@ -185,7 +215,8 @@ bool Net::begin() { host_ = false; peer_ = *p; - tally_ = 30; + tally_ = 30*kTallyScale; + for (size_t i=0; i<reqtally_.size(); ++i) reqtally_[i] = 0; // Initially send a colour request just to create the connection _sendRequest(Channel::Colour, kAllFramesets, kAllFrames, 30, 0); @@ -246,7 +277,7 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { if (c.peerid == p.id()) { // Yes, so reset internal request counters c.txcount = 0; - c.txmax = pkt.frame_count; + c.txmax = static_cast<int>(pkt.frame_count)*kTallyScale; found = true; } } @@ -257,7 +288,7 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { client.peerid = p.id(); client.quality = 0; // TODO: Use quality given in packet client.txcount = 0; - client.txmax = pkt.frame_count; + client.txmax = static_cast<int>(pkt.frame_count)*kTallyScale; } // First connected peer (or reconnecting peer) becomes a time server