Skip to content
Snippets Groups Projects
Commit 2acc22b1 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/#23' into 'main'

#23 Fix netstream tx counting

See merge request beyondaka/beyond-protocol!21
parents 81fc2e94 ac9227c9
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,11 @@ struct Request {
ftl::protocol::Codec codec;
};
/**
* The maximum number of frames a client can request in a single request.
*/
static const int kMaxFrames = 100;
using RequestCallback = std::function<bool(const ftl::protocol::Request&)>;
using StreamCallback = std::function<bool(const ftl::protocol::StreamPacket &, const ftl::protocol::DataPacket &)>;
......
......@@ -6,6 +6,7 @@
#include <list>
#include <string>
#include <algorithm>
#include "netstream.hpp"
#include <ftl/time.hpp>
#include "../uuidMSGPACK.hpp"
......@@ -29,8 +30,6 @@ using ftl::protocol::Channel;
using ftl::protocol::Codec;
using ftl::protocol::FrameID;
using ftl::protocol::Error;
using ftl::protocol::kAllFrames;
using ftl::protocol::kAllFramesets;
using ftl::protocol::StreamProperty;
using std::string;
using std::optional;
......@@ -133,40 +132,47 @@ bool Net::post(const StreamPacket &spkt, const DataPacket &pkt) {
if (host_) {
SHARED_LOCK(mutex_, lk);
for (auto &client : clients_) {
// Strip packet data if channel is not wanted by client
const bool strip =
static_cast<int>(spkt.channel) < 32 && pkt.data.size() > 0
&& ((1 << static_cast<int>(spkt.channel)) & client.channels) == 0;
try {
int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp);
// TODO(Nick): msgpack only once and broadcast.
// TODO(Nick): send in parallel and then wait on all futures?
// Or send non-blocking and wait
if (!net_->send(client.peerid,
base_uri_,
pre_transmit_latency, // Time since timestamp for tx
spkt_net,
(strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) {
// Send failed so mark as client stream completed
client.txcount = client.txmax;
} else {
if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
// Count frame as completed only if last block and channel is 0
// FIXME: This is unreliable, colour might not exist etc.
if (spkt_net.streamID == 0 && spkt.frame_number == 0 && spkt.channel == Channel::kColour) {
++client.txcount;
const FrameID frameId(spkt.streamID, spkt.frame_number);
// If this particular frame has clients then loop over them
if (clients_.count(frameId) > 0) {
auto &clients = clients_.at(frameId);
for (auto &client : clients) {
// Strip packet data if channel is not wanted by client
const bool strip =
static_cast<int>(spkt.channel) < 32 && pkt.data.size() > 0
&& ((1 << static_cast<int>(spkt.channel)) & client.channels) == 0;
try {
int16_t pre_transmit_latency = int16_t(ftl::time::get_time() - spkt.localTimestamp);
// TODO(Nick): msgpack only once and broadcast.
// TODO(Nick): send in parallel and then wait on all futures?
// Or send non-blocking and wait
if (!net_->send(client.peerid,
base_uri_,
pre_transmit_latency, // Time since timestamp for tx
spkt_net,
(strip) ? pkt_strip : reinterpret_cast<const PacketMSGPACK&>(pkt))) {
// Send failed so mark as client stream completed
client.txcount = 0;
} else {
if (!strip && pkt.data.size() > 0) _checkTXRate(pkt.data.size(), 0, spkt.timestamp);
// Count every frame sent
if (spkt.channel == Channel::kEndFrame) {
--client.txcount;
}
}
} catch(...) {
client.txcount = 0;
}
} catch(...) {
client.txcount = client.txmax;
}
if (client.txcount >= client.txmax) {
hasStale = true;
if (client.txcount <= 0) {
hasStale = true;
}
}
}
} else {
......@@ -230,7 +236,7 @@ void Net::_processPacket(ftl::net::Peer *p, int16_t ttimeoff, const StreamPacket
// for (size_t i = 0; i < size(); ++i) {
const auto &sel = enabledChannels(localFrame);
for (auto c : sel) {
_sendRequest(c, localFrame.frameset(), kAllFrames, frames_to_request_, 255);
_sendRequest(c, localFrame.frameset(), localFrame.source(), frames_to_request_, 255);
}
//}
tally_ = frames_to_request_;
......@@ -307,7 +313,7 @@ void Net::refresh() {
auto sel = enabledChannels(i);
for (auto c : sel) {
_sendRequest(c, i.frameset(), kAllFrames, frames_to_request_, 255, true);
_sendRequest(c, i.frameset(), i.source(), frames_to_request_, 255, true);
}
}
tally_ = frames_to_request_;
......@@ -347,7 +353,7 @@ bool Net::enable(FrameID id) {
if (host_) { return false; }
if (!_enable(id)) return false;
if (!Stream::enable(id)) return false;
_sendRequest(Channel::kColour, id.frameset(), kAllFrames, kFramesToRequest, 255, true);
_sendRequest(Channel::kColour, id.frameset(), id.source(), kFramesToRequest, 255, true);
return true;
}
......@@ -356,7 +362,7 @@ bool Net::enable(FrameID id, Channel c) {
if (host_) { return false; }
if (!_enable(id)) return false;
if (!Stream::enable(id, c)) return false;
_sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true);
_sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true);
return true;
}
......@@ -365,7 +371,7 @@ bool Net::enable(FrameID id, const ChannelSet &channels) {
if (!_enable(id)) return false;
if (!Stream::enable(id, channels)) return false;
for (auto c : channels) {
_sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true);
_sendRequest(c, id.frameset(), id.source(), kFramesToRequest, 255, true);
}
return true;
}
......@@ -403,14 +409,24 @@ bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t coun
void Net::_cleanUp() {
UNIQUE_LOCK(mutex_, lk);
for (auto i = clients_.begin(); i != clients_.end(); ++i) {
auto &client = *i;
if (client.txcount >= client.txmax) {
if (client.peerid == time_peer_) {
time_peer_ = ftl::UUID(0);
for (auto i = clients_.begin(); i != clients_.end();) {
auto &clients = i->second;
for (auto j = clients.begin(); j != clients.end();) {
auto &client = *j;
if (client.txcount <= 0) {
if (client.peerid == time_peer_) {
time_peer_ = ftl::UUID(0);
}
DLOG(INFO) << "Remove peer: " << client.peerid.to_string();
j = clients.erase(j);
} else {
++j;
}
DLOG(INFO) << "Remove peer: " << client.peerid.to_string();
}
if (clients.size() == 0) {
i = clients_.erase(i);
} else {
++i;
}
}
}
......@@ -423,19 +439,25 @@ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacke
bool found = false;
DLOG(INFO) << "processing request: " << int(spkt->streamID) << ", " << int(spkt->channel);
const FrameID frameId(spkt->streamID, spkt->frame_number);
if (p) {
SHARED_LOCK(mutex_, lk);
// Does the client already exist
for (auto &c : clients_) {
if (c.peerid == p->id()) {
// Yes, so reset internal request counters
c.txcount = 0;
c.txmax = static_cast<int>(pkt.frame_count);
if (static_cast<int>(spkt->channel) < 32) {
c.channels |= 1 << static_cast<int>(spkt->channel);
if (clients_.count(frameId) > 0) {
auto &clients = clients_.at(frameId);
// Does the client already exist
for (auto &c : clients) {
if (c.peerid == p->id()) {
// Yes, so reset internal request counters
c.txcount = std::max(static_cast<int>(c.txcount), static_cast<int>(pkt.frame_count));
if (static_cast<int>(spkt->channel) < 32) {
c.channels |= 1 << static_cast<int>(spkt->channel);
}
found = true;
// break;
}
found = true;
// break;
}
}
}
......@@ -445,11 +467,11 @@ bool Net::_processRequest(ftl::net::Peer *p, StreamPacket *spkt, const DataPacke
{
UNIQUE_LOCK(mutex_, lk);
auto &client = clients_.emplace_back();
auto &clients = clients_[frameId];
auto &client = clients.emplace_back();
client.peerid = p->id();
client.quality = 255; // TODO(Nick): Use quality given in packet
client.txcount = 0;
client.txmax = static_cast<int>(pkt.frame_count);
client.txcount = std::max(static_cast<int>(client.txcount), static_cast<int>(pkt.frame_count));
if (static_cast<int>(spkt->channel) < 32) {
client.channels |= 1 << static_cast<int>(spkt->channel);
}
......
......@@ -8,6 +8,7 @@
#include <string>
#include <list>
#include <unordered_map>
#include "../universe.hpp"
#include <ftl/threads.hpp>
#include <ftl/protocol/packet.hpp>
......@@ -21,17 +22,11 @@ namespace detail {
struct StreamClient {
ftl::UUID peerid;
std::atomic<int> txcount; // Frames sent since last request
int txmax; // Frames to send in request
std::atomic<uint32_t> channels; // A channel mask, those that have been requested
uint8_t quality;
};
}
/**
* The maximum number of frames a client can request in a single request.
*/
static const int kMaxFrames = 100;
struct NetStats {
float rxRate;
float txRate;
......@@ -124,7 +119,7 @@ class Net : public Stream {
static int64_t last_msg__;
static MUTEX msg_mtx__;
std::list<detail::StreamClient> clients_;
std::unordered_map<ftl::protocol::FrameID, std::list<detail::StreamClient>> clients_;
bool _enable(FrameID id);
bool _processRequest(ftl::net::Peer *p, ftl::protocol::StreamPacket *spkt, const ftl::protocol::DataPacket &pkt);
......
......@@ -41,23 +41,23 @@ class MockNetStream : public ftl::protocol::Net {
// --- Tests -------------------------------------------------------------------
TEST_CASE("Net stream options") {
SECTION("can get correct URI") {
auto s1 = ftl::createStream("ftl://mystream?opt=none");
REQUIRE( s1 );
REQUIRE( s1->begin() );
SECTION("can get correct URI") {
auto s1 = ftl::createStream("ftl://mystream?opt=none");
REQUIRE( s1 );
REQUIRE( s1->begin() );
REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" );
}
REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" );
}
SECTION("can get a name") {
auto s1 = ftl::createStream("ftl://mystream?opt=none");
REQUIRE( s1 );
REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 );
}
auto s1 = ftl::createStream("ftl://mystream?opt=none");
REQUIRE( s1 );
REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 );
}
SECTION("can pause the stream") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
SECTION("can pause the stream") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
StreamPacket spkt;
spkt.timestamp = 100;
......@@ -77,34 +77,34 @@ TEST_CASE("Net stream options") {
spkt.timestamp = 200;
REQUIRE( s1->post(spkt, pkt) );
REQUIRE( s1->lastSpkt.timestamp == 100 );
REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) );
}
REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) );
}
}
TEST_CASE("Net stream sending requests") {
auto p = createMockPeer(0);
fakedata[0] = "";
send_handshake(*p.get());
p->data();
sleep_for(milliseconds(50));
p->data();
sleep_for(milliseconds(50));
SECTION("cannot enable if not seen") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
REQUIRE( s1->begin() );
SECTION("cannot enable if not seen") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
REQUIRE( s1->begin() );
REQUIRE( !s1->enable(FrameID(1, 1), Channel::kDepth));
}
}
SECTION("sends request on enable") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
// Thread to provide response to otherwise blocking call
std::thread thr([&p]() {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
// Thread to provide response to otherwise blocking call
std::thread thr([&p]() {
auto z = std::make_unique<msgpack::zone>();
provideResponses(p, 0, {
{false, "find_stream", packResponse(*z, ftl::UUIDMSGPACK(p->id()))},
{true, "enable_stream", {}},
});
});
});
REQUIRE( s1->begin() );
......@@ -115,14 +115,14 @@ TEST_CASE("Net stream sending requests") {
thr.join();
REQUIRE( s1->lastSpkt.streamID == 1 );
REQUIRE( int(s1->lastSpkt.frame_number) == 255 ); // TODO: update when this is fixed
REQUIRE( int(s1->lastSpkt.frame_number) == 1 ); // TODO: update when this is fixed
REQUIRE( s1->lastSpkt.channel == Channel::kDepth );
REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
}
}
SECTION("responds to requests") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
bool seenReq = false;
......@@ -143,7 +143,7 @@ TEST_CASE("Net stream sending requests") {
sleep_for(milliseconds(50));
REQUIRE( seenReq );
}
}
p.reset();
ftl::protocol::reset();
......@@ -153,12 +153,12 @@ TEST_CASE("Net stream can see received data") {
auto p = createMockPeer(0);
fakedata[0] = "";
send_handshake(*p.get());
p->data();
sleep_for(milliseconds(50));
p->data();
sleep_for(milliseconds(50));
SECTION("available if packet is seen") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
bool seenReq = false;
......@@ -179,7 +179,7 @@ TEST_CASE("Net stream can see received data") {
sleep_for(milliseconds(50));
REQUIRE( seenReq );
REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) );
}
}
p.reset();
ftl::protocol::reset();
......
......@@ -16,85 +16,128 @@ using ftl::protocol::StreamProperty;
// --- Tests -------------------------------------------------------------------
TEST_CASE("TCP Stream", "[net]") {
std::mutex mtx;
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
p->waitConnection(5);
SECTION("fails if stream doesn't exist") {
auto s1 = self->getStream("ftl://mystream_bad");
REQUIRE( s1 );
auto seenError = ftl::protocol::Error::kNoError;
auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) {
seenError = err;
return true;
});
REQUIRE( s1->begin() );
REQUIRE( !s1->enable(FrameID(0, 0)) );
REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist );
}
SECTION("single enabled packet stream") {
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
auto s1 = ftl::createStream("ftl://mystream");
REQUIRE( s1 );
auto s2 = self->getStream("ftl://mystream");
REQUIRE( s2 );
ftl::protocol::DataPacket rpkt;
rpkt.bitrate = 20;
auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
rpkt = pkt;
cv.notify_one();
return true;
});
bool seenReq = false;
auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) {
seenReq = true;
return true;
});
s1->begin();
s2->begin();
s2->enable(FrameID(0, 0));
// TODO: Find better option
std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE( seenReq );
ftl::protocol::StreamPacket spkt;
spkt.streamID = 0;
spkt.frame_number = 0;
spkt.channel = ftl::protocol::Channel::kColour;
ftl::protocol::DataPacket pkt;
pkt.bitrate = 10;
pkt.codec = ftl::protocol::Codec::kJPG;
pkt.frame_count = 1;
s1->post(spkt, pkt);
bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; });
REQUIRE( r );
REQUIRE( rpkt.bitrate == 10 );
REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG );
REQUIRE( rpkt.frame_count == 1 );
REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 );
}
p.reset();
ftl::protocol::reset();
std::mutex mtx;
auto self = ftl::createDummySelf();
self->listen(ftl::URI("tcp://localhost:0"));
auto uri = "tcp://127.0.0.1:" + std::to_string(self->getListeningURIs().front().getPort());
LOG(INFO) << uri;
auto p = ftl::connectNode(uri);
p->waitConnection(5);
SECTION("fails if stream doesn't exist") {
auto s1 = self->getStream("ftl://mystream_bad");
REQUIRE( s1 );
auto seenError = ftl::protocol::Error::kNoError;
auto h = s1->onError([&seenError](ftl::protocol::Error err, const std::string &str) {
seenError = err;
return true;
});
REQUIRE( s1->begin() );
REQUIRE( !s1->enable(FrameID(0, 0)) );
REQUIRE( seenError == ftl::protocol::Error::kURIDoesNotExist );
}
SECTION("single enabled packet stream") {
std::condition_variable cv;
std::unique_lock<std::mutex> lk(mtx);
auto s1 = ftl::createStream("ftl://mystream");
REQUIRE( s1 );
auto s2 = self->getStream("ftl://mystream");
REQUIRE( s2 );
ftl::protocol::DataPacket rpkt;
rpkt.bitrate = 20;
auto h = s2->onPacket([&cv, &rpkt](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
rpkt = pkt;
cv.notify_one();
return true;
});
bool seenReq = false;
auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) {
seenReq = true;
return true;
});
s1->begin();
s2->begin();
s2->enable(FrameID(0, 0));
// TODO: Find better option
std::this_thread::sleep_for(std::chrono::milliseconds(10));
REQUIRE( seenReq );
ftl::protocol::StreamPacket spkt;
spkt.streamID = 0;
spkt.frame_number = 0;
spkt.channel = ftl::protocol::Channel::kColour;
ftl::protocol::DataPacket pkt;
pkt.bitrate = 10;
pkt.codec = ftl::protocol::Codec::kJPG;
pkt.frame_count = 1;
s1->post(spkt, pkt);
bool r = cv.wait_for(lk, std::chrono::seconds(5), [&rpkt](){ return rpkt.bitrate == 10; });
REQUIRE( r );
REQUIRE( rpkt.bitrate == 10 );
REQUIRE( rpkt.codec == ftl::protocol::Codec::kJPG );
REQUIRE( rpkt.frame_count == 1 );
REQUIRE( std::any_cast<size_t>(s1->getProperty(StreamProperty::kObservers)) == 1 );
}
SECTION("stops sending when request expires") {
std::atomic_int rcount = 0;
auto s1 = ftl::createStream("ftl://mystream");
REQUIRE( s1 );
auto s2 = self->getStream("ftl://mystream");
REQUIRE( s2 );
auto h = s2->onPacket([&rcount](const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
++rcount;
return true;
});
s1->begin();
s2->begin();
s2->enable(FrameID(0, 0));
// TODO: Find better option
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ftl::protocol::StreamPacket spkt;
spkt.streamID = 0;
spkt.frame_number = 0;
spkt.channel = ftl::protocol::Channel::kEndFrame;
ftl::protocol::DataPacket pkt;
pkt.bitrate = 10;
pkt.codec = ftl::protocol::Codec::kJPG;
pkt.frame_count = 1;
for (int i=0; i<30 + 20; ++i) {
spkt.timestamp = i;
s1->post(spkt, pkt);
}
// TODO: Find better option
int k = 10;
while (--k > 0 && rcount < 30) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
REQUIRE( rcount == 30 );
}
p.reset();
ftl::protocol::reset();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment