Something went wrong on our end
-
Nicolas Pope authoredNicolas Pope authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
netstream_unit.cpp 5.42 KiB
#include "catch.hpp"
#include <ftl/protocol.hpp>
#include <ftl/protocol/self.hpp>
#include <ftl/protocol/streams.hpp>
#include <ftl/uri.hpp>
#include <ftl/exception.hpp>
#include <ftl/protocol/node.hpp>
#include <thread>
#include <chrono>
#include "../src/streams/netstream.hpp"
#include "../src/streams/packetMsgpack.hpp"
#include "mocks/connection.hpp"
using ftl::protocol::FrameID;
using ftl::protocol::StreamProperty;
using ftl::protocol::StreamPacket;
using ftl::protocol::DataPacket;
using ftl::protocol::Channel;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;
// --- Mock --------------------------------------------------------------------
class MockNetStream : public ftl::protocol::Net {
public:
MockNetStream(const std::string &uri, ftl::net::Universe *net, bool host=false): Net(uri, net, host) {};
void hasPosted(const StreamPacket &spkt, const DataPacket &pkt) override {
lastSpkt = spkt;
}
void forceSeen(FrameID id, Channel channel) {
seen(id, channel);
}
StreamPacket lastSpkt;
};
// --- Tests -------------------------------------------------------------------
TEST_CASE("Net stream options") {
SECTION("can get correct URI") {
auto s1 = ftl::createStream("ftl://mystream?opt=none");
REQUIRE( s1 );
REQUIRE( s1->begin() );
REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" );
}
SECTION("can get a name") {
auto s1 = ftl::createStream("ftl://mystream?opt=none");
REQUIRE( s1 );
REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 );
}
SECTION("can pause the stream") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
StreamPacket spkt;
spkt.timestamp = 100;
spkt.streamID = 1;
spkt.frame_number = 2;
spkt.channel = Channel::kColour;
DataPacket pkt;
pkt.frame_count = 1;
s1->lastSpkt.timestamp = 0;
REQUIRE( s1->post(spkt, pkt) );
REQUIRE( s1->lastSpkt.timestamp == 100 );
s1->setProperty(StreamProperty::kPaused, true);
spkt.timestamp = 200;
REQUIRE( s1->post(spkt, pkt) );
REQUIRE( s1->lastSpkt.timestamp == 100 );
REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) );
}
}
TEST_CASE("Net stream sending requests") {
auto p = createMockPeer(0);
fakedata[0] = "";
send_handshake(*p.get());
p->data();
sleep_for(milliseconds(50));
SECTION("cannot enable if not seen") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
REQUIRE( s1->begin() );
REQUIRE( !s1->enable(FrameID(1, 1), Channel::kDepth));
}
SECTION("sends request on enable") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
// Thread to provide response to otherwise blocking call
std::thread thr([&p]() {
auto z = std::make_unique<msgpack::zone>();
provideResponses(p, 0, {
{false, "find_stream", packResponse(*z, p->id())},
{true, "enable_stream", {}},
});
});
REQUIRE( s1->begin() );
s1->forceSeen(FrameID(1, 1), Channel::kDepth);
s1->lastSpkt.channel = Channel::kNone;
REQUIRE( s1->enable(FrameID(1, 1), Channel::kDepth));
thr.join();
REQUIRE( s1->lastSpkt.streamID == 1 );
REQUIRE( int(s1->lastSpkt.frame_number) == 255 ); // TODO: update when this is fixed
REQUIRE( s1->lastSpkt.channel == Channel::kDepth );
REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
}
SECTION("responds to requests") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
bool seenReq = false;
auto h = s1->onRequest([&seenReq](const ftl::protocol::Request &req) {
seenReq = true;
return true;
});
ftl::protocol::StreamPacketMSGPACK spkt;
ftl::protocol::PacketMSGPACK pkt;
spkt.streamID = 1;
spkt.frame_number = 1;
spkt.channel = Channel::kColour;
spkt.flags = ftl::protocol::kFlagRequest;
writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
p->data();
sleep_for(milliseconds(50));
REQUIRE( seenReq );
}
p.reset();
ftl::protocol::reset();
}
TEST_CASE("Net stream can see received data") {
auto p = createMockPeer(0);
fakedata[0] = "";
send_handshake(*p.get());
p->data();
sleep_for(milliseconds(50));
SECTION("available if packet is seen") {
auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
REQUIRE( s1->begin() );
bool seenReq = false;
auto h = s1->onAvailable([&seenReq](FrameID id, Channel channel) {
seenReq = true;
return true;
});
ftl::protocol::StreamPacketMSGPACK spkt;
ftl::protocol::PacketMSGPACK pkt;
spkt.streamID = 1;
spkt.frame_number = 1;
spkt.channel = Channel::kColour;
writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
p->data();
sleep_for(milliseconds(50));
REQUIRE( seenReq );
REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) );
}
p.reset();
ftl::protocol::reset();
}