Skip to content
Snippets Groups Projects
Commit 9d10398c authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Resolves #284 instant request send

parent e96acacb
No related branches found
No related tags found
No related merge requests found
...@@ -87,6 +87,7 @@ class Channels { ...@@ -87,6 +87,7 @@ class Channels {
inline Channels &operator-=(unsigned int c) { mask &= ~(0x1 << (c - BASE)); return *this; } inline Channels &operator-=(unsigned int c) { mask &= ~(0x1 << (c - BASE)); return *this; }
inline Channels &operator&=(const Channels<BASE> &c) { mask &= c.mask; return *this; } inline Channels &operator&=(const Channels<BASE> &c) { mask &= c.mask; return *this; }
inline Channels operator&(const Channels<BASE> &c) const { return Channels<BASE>(mask & c.mask); } inline Channels operator&(const Channels<BASE> &c) const { return Channels<BASE>(mask & c.mask); }
inline Channels operator-(const Channels<BASE> &c) const { return Channels<BASE>(mask & ~c.mask);}
inline bool has(Channel c) const { inline bool has(Channel c) const {
return (c == Channel::None || static_cast<unsigned int>(c) - BASE >= 32) ? true : mask & (0x1 << (static_cast<unsigned int>(c) - BASE)); return (c == Channel::None || static_cast<unsigned int>(c) - BASE >= 32) ? true : mask & (0x1 << (static_cast<unsigned int>(c) - BASE));
...@@ -100,6 +101,7 @@ class Channels { ...@@ -100,6 +101,7 @@ class Channels {
inline iterator end() { return iterator(*this, 32+BASE); } inline iterator end() { return iterator(*this, 32+BASE); }
inline bool operator==(const Channels<BASE> &c) const { return mask == c.mask; } inline bool operator==(const Channels<BASE> &c) const { return mask == c.mask; }
inline bool operator!=(const Channels<BASE> &c) const { return mask != c.mask; }
inline operator unsigned int() const { return mask; } inline operator unsigned int() const { return mask; }
inline operator bool() const { return mask > 0; } inline operator bool() const { return mask > 0; }
inline operator Channel() const { inline operator Channel() const {
......
...@@ -71,6 +71,8 @@ class Net : public Stream { ...@@ -71,6 +71,8 @@ class Net : public Stream {
std::string uri_; std::string uri_;
bool host_; bool host_;
int tally_; int tally_;
std::array<std::atomic<int>,32> reqtally_;
ftl::codecs::Channels<0> last_selected_;
float req_bitrate_; float req_bitrate_;
float sample_count_; float sample_count_;
......
...@@ -12,6 +12,8 @@ using std::string; ...@@ -12,6 +12,8 @@ using std::string;
using std::vector; using std::vector;
using std::optional; using std::optional;
static constexpr int kTallyScale = 10;
Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), net_(net), active_(false) { Net::Net(nlohmann::json &config, ftl::net::Universe *net) : Stream(config), net_(net), active_(false) {
// TODO: Install "find_stream" binding if not installed... // TODO: Install "find_stream" binding if not installed...
if (!net_->isBound("find_stream")) { if (!net_->isBound("find_stream")) {
...@@ -50,6 +52,18 @@ bool Net::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, c ...@@ -50,6 +52,18 @@ bool Net::onPacket(const std::function<void(const ftl::codecs::StreamPacket &, c
bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) { bool Net::post(const ftl::codecs::StreamPacket &spkt, const ftl::codecs::Packet &pkt) {
if (!active_) return false; if (!active_) return false;
// Check if the channel has been requested recently enough. If not then disable it.
if (host_ && pkt.data.size() > 0 && static_cast<int>(spkt.channel) >= 0 && static_cast<int>(spkt.channel) < 32) {
if (reqtally_[static_cast<int>(spkt.channel)] == 0) {
auto sel = selected(0);
sel -= spkt.channel;
select(0, sel);
LOG(INFO) << "Unselect Channel: " << (int)spkt.channel;
} else {
--reqtally_[static_cast<int>(spkt.channel)];
}
}
// Lock to prevent clients being added / removed // Lock to prevent clients being added / removed
{ {
SHARED_LOCK(mutex_,lk); SHARED_LOCK(mutex_,lk);
...@@ -131,19 +145,34 @@ bool Net::begin() { ...@@ -131,19 +145,34 @@ bool Net::begin() {
spkt.version = 4; spkt.version = 4;
// Manage recuring requests // Manage recuring requests
if (last_frame_ != spkt.timestamp) { if (!host_ && last_frame_ != spkt.timestamp) {
UNIQUE_LOCK(mutex_, lk); UNIQUE_LOCK(mutex_, lk);
if (last_frame_ != spkt.timestamp) { if (last_frame_ != spkt.timestamp) {
last_frame_ = spkt.timestamp; last_frame_ = spkt.timestamp;
auto sel = selected(0);
// A change in channel selections, so send those requests now
if (sel != last_selected_) {
auto changed = sel - last_selected_;
last_selected_ = sel;
if (size() > 0) {
for (auto c : changed) {
_sendRequest(c, kAllFramesets, kAllFrames, 30, 0);
}
}
}
// Are we close to reaching the end of our frames request?
if (tally_ <= 5) { if (tally_ <= 5) {
// Yes, so send new requests
if (size() > 0) { if (size() > 0) {
auto sel = selected(0);
// FIXME: Send selection changes immediately.
for (auto c : sel) { for (auto c : sel) {
_sendRequest(c, kAllFramesets, kAllFrames, 30, 0); _sendRequest(c, kAllFramesets, kAllFrames, 30, 0);
} }
} }
tally_ = 30; tally_ = 30*kTallyScale;
} else { } else {
--tally_; --tally_;
} }
...@@ -159,6 +188,7 @@ bool Net::begin() { ...@@ -159,6 +188,7 @@ bool Net::begin() {
for (int i=0; i<size(); ++i) { for (int i=0; i<size(); ++i) {
select(i, selected(i) + spkt.channel); select(i, selected(i) + spkt.channel);
} }
reqtally_[static_cast<int>(spkt.channel)] = static_cast<int>(pkt.frame_count)*size()*kTallyScale;
} else { } else {
select(spkt.frameSetID(), selected(spkt.frameSetID()) + spkt.channel); select(spkt.frameSetID(), selected(spkt.frameSetID()) + spkt.channel);
} }
...@@ -185,7 +215,8 @@ bool Net::begin() { ...@@ -185,7 +215,8 @@ bool Net::begin() {
host_ = false; host_ = false;
peer_ = *p; peer_ = *p;
tally_ = 30; tally_ = 30*kTallyScale;
for (size_t i=0; i<reqtally_.size(); ++i) reqtally_[i] = 0;
// Initially send a colour request just to create the connection // Initially send a colour request just to create the connection
_sendRequest(Channel::Colour, kAllFramesets, kAllFrames, 30, 0); _sendRequest(Channel::Colour, kAllFramesets, kAllFrames, 30, 0);
...@@ -246,7 +277,7 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { ...@@ -246,7 +277,7 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) {
if (c.peerid == p.id()) { if (c.peerid == p.id()) {
// Yes, so reset internal request counters // Yes, so reset internal request counters
c.txcount = 0; c.txcount = 0;
c.txmax = pkt.frame_count; c.txmax = static_cast<int>(pkt.frame_count)*kTallyScale;
found = true; found = true;
} }
} }
...@@ -257,7 +288,7 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) { ...@@ -257,7 +288,7 @@ bool Net::_processRequest(ftl::net::Peer &p, const ftl::codecs::Packet &pkt) {
client.peerid = p.id(); client.peerid = p.id();
client.quality = 0; // TODO: Use quality given in packet client.quality = 0; // TODO: Use quality given in packet
client.txcount = 0; client.txcount = 0;
client.txmax = pkt.frame_count; client.txmax = static_cast<int>(pkt.frame_count)*kTallyScale;
} }
// First connected peer (or reconnecting peer) becomes a time server // First connected peer (or reconnecting peer) becomes a time server
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment