Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
netstream_unit.cpp 5.42 KiB
#include "catch.hpp"
#include <ftl/protocol.hpp>
#include <ftl/protocol/self.hpp>
#include <ftl/protocol/streams.hpp>
#include <ftl/uri.hpp>
#include <ftl/exception.hpp>
#include <ftl/protocol/node.hpp>
#include <thread>
#include <chrono>

#include "../src/streams/netstream.hpp"
#include "../src/streams/packetMsgpack.hpp"
#include "mocks/connection.hpp"

using ftl::protocol::FrameID;
using ftl::protocol::StreamProperty;
using ftl::protocol::StreamPacket;
using ftl::protocol::DataPacket;
using ftl::protocol::Channel;
using std::this_thread::sleep_for;
using std::chrono::milliseconds;

// --- Mock --------------------------------------------------------------------

class MockNetStream : public ftl::protocol::Net {
    public:
    MockNetStream(const std::string &uri, ftl::net::Universe *net, bool host=false): Net(uri, net, host) {};

    void hasPosted(const StreamPacket &spkt, const DataPacket &pkt) override {
        lastSpkt = spkt;
    }

    void forceSeen(FrameID id, Channel channel) {
        seen(id, channel);
    }

    StreamPacket lastSpkt;
};

// --- Tests -------------------------------------------------------------------

TEST_CASE("Net stream options") {
	SECTION("can get correct URI") {
		auto s1 = ftl::createStream("ftl://mystream?opt=none");
		REQUIRE( s1 );
		REQUIRE( s1->begin() );

		REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kURI)) == "ftl://mystream" );
	}

    SECTION("can get a name") {
		auto s1 = ftl::createStream("ftl://mystream?opt=none");
		REQUIRE( s1 );
		REQUIRE( std::any_cast<std::string>(s1->getProperty(StreamProperty::kName)).size() > 0 );
	}

	SECTION("can pause the stream") {
		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
		REQUIRE( s1->begin() );

        StreamPacket spkt;
        spkt.timestamp = 100;
        spkt.streamID = 1;
        spkt.frame_number = 2;
        spkt.channel = Channel::kColour;

        DataPacket pkt;
        pkt.frame_count = 1;

        s1->lastSpkt.timestamp = 0;
        REQUIRE( s1->post(spkt, pkt) );
        REQUIRE( s1->lastSpkt.timestamp == 100 );

        s1->setProperty(StreamProperty::kPaused, true);

        spkt.timestamp = 200;
        REQUIRE( s1->post(spkt, pkt) );
        REQUIRE( s1->lastSpkt.timestamp == 100 );
		REQUIRE( std::any_cast<bool>(s1->getProperty(StreamProperty::kPaused)) );
	}
}

TEST_CASE("Net stream sending requests") {
    auto p = createMockPeer(0);
    fakedata[0] = "";
    send_handshake(*p.get());
	p->data();
	sleep_for(milliseconds(50));

	SECTION("cannot enable if not seen") {
		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
		REQUIRE( s1->begin() );
        REQUIRE( !s1->enable(FrameID(1, 1), Channel::kDepth));
	}

    SECTION("sends request on enable") {
		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), false);
		
		// Thread to provide response to otherwise blocking call
		std::thread thr([&p]() {
            auto z = std::make_unique<msgpack::zone>();
            provideResponses(p, 0, {
                {false, "find_stream", packResponse(*z, p->id())},
                {true, "enable_stream", {}},
            });
		});

        REQUIRE( s1->begin() );

        s1->forceSeen(FrameID(1, 1), Channel::kDepth);
        s1->lastSpkt.channel = Channel::kNone;
        REQUIRE( s1->enable(FrameID(1, 1), Channel::kDepth));

        thr.join();

        REQUIRE( s1->lastSpkt.streamID == 1 );
        REQUIRE( int(s1->lastSpkt.frame_number) == 255 );  // TODO: update when this is fixed
        REQUIRE( s1->lastSpkt.channel == Channel::kDepth );
        REQUIRE( (s1->lastSpkt.flags & ftl::protocol::kFlagRequest) > 0 );
	}

    SECTION("responds to requests") {
		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
		
        REQUIRE( s1->begin() );

        bool seenReq = false;

        auto h = s1->onRequest([&seenReq](const ftl::protocol::Request &req) {
            seenReq = true;
            return true;
        });

        ftl::protocol::StreamPacketMSGPACK spkt;
        ftl::protocol::PacketMSGPACK pkt;
        spkt.streamID = 1;
        spkt.frame_number = 1;
        spkt.channel = Channel::kColour;
        spkt.flags = ftl::protocol::kFlagRequest;
        writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
        p->data();

        sleep_for(milliseconds(50));
        REQUIRE( seenReq );
	}

    p.reset();
    ftl::protocol::reset();
}

TEST_CASE("Net stream can see received data") {
    auto p = createMockPeer(0);
    fakedata[0] = "";
    send_handshake(*p.get());
	p->data();
	sleep_for(milliseconds(50));

    SECTION("available if packet is seen") {
		auto s1 = std::make_shared<MockNetStream>("ftl://mystream", ftl::getSelf()->getUniverse(), true);
		
        REQUIRE( s1->begin() );

        bool seenReq = false;

        auto h = s1->onAvailable([&seenReq](FrameID id, Channel channel) {
            seenReq = true;
            return true;
        });

        ftl::protocol::StreamPacketMSGPACK spkt;
        ftl::protocol::PacketMSGPACK pkt;
        spkt.streamID = 1;
        spkt.frame_number = 1;
        spkt.channel = Channel::kColour;
        writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt));
        p->data();

        sleep_for(milliseconds(50));
        REQUIRE( seenReq );
        REQUIRE( s1->available(FrameID(1, 1), Channel::kColour) );
	}

    p.reset();
    ftl::protocol::reset();
}