diff --git a/include/ftl/protocol/broadcaster.hpp b/include/ftl/protocol/broadcaster.hpp index 7a45622f3864ef897ebb234ce760041c9cf95339..7afd3f4a30879367e2307accf514000e26f8ab79 100644 --- a/include/ftl/protocol/broadcaster.hpp +++ b/include/ftl/protocol/broadcaster.hpp @@ -28,6 +28,8 @@ class Broadcast : public Stream { void reset() override; + void refresh() override; + std::list<std::shared_ptr<Stream>> streams() const; void setProperty(ftl::protocol::StreamProperty opt, int value) override; @@ -36,6 +38,12 @@ class Broadcast : public Stream { bool supportsProperty(ftl::protocol::StreamProperty opt) override; + bool enable(FrameID id) override; + + bool enable(FrameID id, ftl::protocol::Channel channel) override; + + bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; + StreamType type() const override; private: diff --git a/src/streams/broadcaster.cpp b/src/streams/broadcaster.cpp index 6315314619eb561ab8f2717c57fbea664b1cd779..bc33e1c8f295212f73c3dc3c7ceffc6338ce61ec 100644 --- a/src/streams/broadcaster.cpp +++ b/src/streams/broadcaster.cpp @@ -96,6 +96,46 @@ void Broadcast::reset() { } } +void Broadcast::refresh() { + +} + +bool Broadcast::enable(FrameID id) { + bool r = false; + { + SHARED_LOCK(mtx_, lk); + for (auto &s : streams_) { + r = s.stream->enable(id) || r; + }; + } + if (r) Stream::enable(id); + return r; +} + +bool Broadcast::enable(FrameID id, ftl::protocol::Channel channel) { + bool r = false; + { + SHARED_LOCK(mtx_, lk); + for (auto &s : streams_) { + r = s.stream->enable(id, channel) || r; + }; + } + if (r) Stream::enable(id, channel); + return r; +} + +bool Broadcast::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { + bool r = false; + { + SHARED_LOCK(mtx_, lk); + for (auto &s : streams_) { + r = s.stream->enable(id, channels) || r; + }; + } + if (r) Stream::enable(id, channels); + return r; +} + void Broadcast::setProperty(ftl::protocol::StreamProperty opt, int value) { } diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index edde7d0fa8527b07ee47367b27c49f7eab6b4be1..6a84a9902da9087e153ff7865811adc32cf83080 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -172,19 +172,25 @@ void Muxer::reset() { bool Muxer::enable(FrameID id) { auto p = _mapToOutput(id); if (!p.second) return false; - return p.second->stream->enable(p.first); + bool r = p.second->stream->enable(p.first); + if (r) Stream::enable(id); + return r; } bool Muxer::enable(FrameID id, ftl::protocol::Channel channel) { auto p = _mapToOutput(id); if (!p.second) return false; - return p.second->stream->enable(p.first, channel); + bool r = p.second->stream->enable(p.first, channel); + if (r) Stream::enable(id, channel); + return r; } bool Muxer::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { auto p = _mapToOutput(id); if (!p.second) return false; - return p.second->stream->enable(p.first, channels); + bool r = p.second->stream->enable(p.first, channels); + if (r) Stream::enable(id, channels); + return r; } void Muxer::setProperty(ftl::protocol::StreamProperty opt, int value) { diff --git a/src/streams/netstream.cpp b/src/streams/netstream.cpp index f90478aa2aef1d7c7f17efbfd0bcdd59272330ff..7cf263c7f11adea62d062665b7d4d00764ba2e8f 100644 --- a/src/streams/netstream.cpp +++ b/src/streams/netstream.cpp @@ -45,7 +45,7 @@ Net::Net(const std::string &uri, ftl::net::Universe *net, bool host) : //if (!has_bindings.test_and_set()) { if (net_->isBound("find_stream")) net_->unbind("find_stream"); net_->bind("find_stream", [net = net_](const std::string &uri) -> optional<ftl::UUID> { - LOG(INFO) << "Request for stream: " << uri; + DLOG(INFO) << "Request for stream: " << uri; ftl::URI u1(uri); std::string base = u1.getBaseURI(); @@ -127,7 +127,6 @@ bool Net::post(const StreamPacket &spkt, const Packet &pkt) { pkt_strip.flags = pkt.flags; if (host_) { - LOG(INFO) << "Send to " << clients_.size() << " clients"; auto c = clients_.begin(); while (c != clients_.end()) { auto &client = *c; @@ -300,7 +299,7 @@ bool Net::begin() { }); if (host_) { - LOG(INFO) << "Hosting stream: " << uri_; + DLOG(INFO) << "Hosting stream: " << uri_; // Alias the URI to the configurable if not already // Allows the URI to be used to get config data. @@ -401,6 +400,16 @@ bool Net::enable(FrameID id, Channel c) { return true; } +bool Net::enable(FrameID id, const ChannelSet &channels) { + if (host_) { return false; } + if (!_enable(id)) return false; + if (!Stream::enable(id, channels)) return false; + for (auto c : channels) { + _sendRequest(c, id.frameset(), kAllFrames, kFramesToRequest, 255, true); + } + return true; +} + bool Net::_sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count, uint8_t bitrate, bool doreset) { if (!active_ || host_) return false; @@ -451,7 +460,7 @@ void Net::_cleanUp() { */ bool Net::_processRequest(ftl::net::Peer &p, StreamPacket &spkt, const Packet &pkt) { bool found = false; - LOG(INFO) << "processing request"; + DLOG(INFO) << "processing request: " << int(spkt.streamID) << ", " << int(spkt.channel); { SHARED_LOCK(mutex_,lk); @@ -490,7 +499,13 @@ bool Net::_processRequest(ftl::net::Peer &p, StreamPacket &spkt, const Packet &p } } - LOG(INFO) << "Request processed"; + ftl::protocol::Request req; + req.bitrate = pkt.bitrate; + req.channel = spkt.channel; + req.id = FrameID(spkt.streamID, spkt.frame_number); + req.count = pkt.frame_count; + req.codec = pkt.codec; + request(req); return false; } diff --git a/src/streams/netstream.hpp b/src/streams/netstream.hpp index b41bf3f252a881402b9c070dfbaf7b142d765bb8..13430ff028832174943085b7ac67c39e0a275b39 100644 --- a/src/streams/netstream.hpp +++ b/src/streams/netstream.hpp @@ -50,6 +50,7 @@ class Net : public Stream { bool enable(FrameID id) override; bool enable(FrameID id, ftl::protocol::Channel c) override; + bool enable(FrameID id, const ftl::protocol::ChannelSet &channels) override; void reset() override; void refresh() override; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d140b4071fca12c18a3d571d2860ca3c3035acca..578543c6c56ccc5c881bf205c274d6cec5d8a143 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -50,16 +50,27 @@ target_link_libraries(net_performance beyond-protocol add_test(NetPerformance net_performance) -### Stream Unit ################################################################ -add_executable(stream_unit +### Muxer Unit ################################################################# +add_executable(muxer_unit $<TARGET_OBJECTS:CatchTestFTL> - ./stream_unit.cpp + ./muxer_unit.cpp ) -target_include_directories(stream_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") -target_link_libraries(stream_unit +target_include_directories(muxer_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(muxer_unit beyond-protocol ${URIPARSER_LIBRARIES} ${OS_LIBS}) -add_test(StreamUnitTest stream_unit) +add_test(MuxerUnitTest muxer_unit) + +### Broadcast Unit ############################################################# +add_executable(broadcast_unit + $<TARGET_OBJECTS:CatchTestFTL> + ./broadcast_unit.cpp +) +target_include_directories(broadcast_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(broadcast_unit + beyond-protocol ${URIPARSER_LIBRARIES} ${OS_LIBS}) + +add_test(BroadcastUnitTest broadcast_unit) ### Stream Integration ######################################################### add_executable(stream_integration diff --git a/test/broadcast_unit.cpp b/test/broadcast_unit.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b35ac6da8ad88ba20ba6b1e7e49c227447e086a6 --- /dev/null +++ b/test/broadcast_unit.cpp @@ -0,0 +1,154 @@ +#include "catch.hpp" + +#include <ftl/protocol/streams.hpp> +#include <ftl/protocol/muxer.hpp> +#include <ftl/protocol/broadcaster.hpp> +#include <nlohmann/json.hpp> + +using ftl::protocol::Muxer; +using ftl::protocol::Broadcast; +using ftl::protocol::Stream; +using ftl::protocol::StreamPacket; +using ftl::protocol::Packet; +using ftl::protocol::Channel; +using ftl::protocol::ChannelSet; +using ftl::protocol::FrameID; + +class TestStream : public ftl::protocol::Stream { + public: + TestStream() {}; + ~TestStream() {}; + + bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); + trigger(spkt, pkt); + return true; + } + + bool begin() override { return true; } + bool end() override { return true; } + bool active() override { return true; } + + void setProperty(ftl::protocol::StreamProperty opt, int value) override {} + + int getProperty(ftl::protocol::StreamProperty opt) override { return 0; } + + bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } + + void forceSeen(FrameID id, Channel channel) { + seen(id, channel); + } +}; + +TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { + std::unique_ptr<Broadcast> mux = std::make_unique<Broadcast>(); + REQUIRE(mux); + + SECTION("write with two streams") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + StreamPacket tspkt1 = {4,0,0,1,Channel::kColour}; + StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; + + auto h1 = s1->onPacket([&tspkt1](const StreamPacket &spkt, const Packet &pkt) { + tspkt1 = spkt; + return true; + }); + auto h2 = s2->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + tspkt2 = spkt; + return true; + }); + + REQUIRE( mux->post({4,100,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt1.timestamp == 100 ); + REQUIRE( tspkt2.timestamp == 100 ); + } + +} + +TEST_CASE("Broadcast enable", "[stream]") { + std::unique_ptr<Broadcast> mux = std::make_unique<Broadcast>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + SECTION("enable frame id") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kColour); + // s2->forceSeen(id1, Channel::kColour); + + REQUIRE( !s1->enabled(id1) ); + REQUIRE( mux->enable(id1) ); + REQUIRE( s1->enabled(id1) ); + REQUIRE( s2->enabled(id1) ); + + FrameID id2(1, 1); + s2->forceSeen(id2, Channel::kColour); + + REQUIRE( !s2->enabled(id2) ); + REQUIRE( mux->enable(id2) ); + REQUIRE( s2->enabled(id2) ); + REQUIRE( s1->enabled(id2) ); + + auto frames = mux->enabled(); + REQUIRE( frames.size() == 2 ); + REQUIRE( frames.find(id1) != frames.end() ); + REQUIRE( frames.find(id2) != frames.end() ); + } + + SECTION("enable frame id for unseen") { + FrameID id(0, 1); + REQUIRE( mux->enable(id) ); + REQUIRE( s1->enabled(id) ); + REQUIRE( s2->enabled(id) ); + } + + SECTION("enable channel for unseen") { + FrameID id(0, 1); + REQUIRE( mux->enable(id, Channel::kDepth) ); + REQUIRE( s1->enabled(id, Channel::kDepth) ); + REQUIRE( s2->enabled(id, Channel::kDepth) ); + } + + SECTION("enable channel set for unseen") { + FrameID id(0, 1); + ChannelSet set = {Channel::kDepth, Channel::kRight}; + REQUIRE( mux->enable(id, set) ); + } + + SECTION("enable frame id and channel") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kDepth); + + REQUIRE( !s1->enabled(id1, Channel::kDepth) ); + REQUIRE( mux->enable(id1, Channel::kDepth) ); + REQUIRE( s1->enabled(id1, Channel::kDepth) ); + REQUIRE( s2->enabled(id1, Channel::kDepth) ); + } + + SECTION("enable frame id and channel set") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kRight); + + ChannelSet set = {Channel::kDepth, Channel::kRight}; + REQUIRE( !s1->enabled(id1, Channel::kDepth) ); + REQUIRE( !s1->enabled(id1, Channel::kRight) ); + REQUIRE( mux->enable(id1, set) ); + REQUIRE( s1->enabled(id1, Channel::kDepth) ); + REQUIRE( s1->enabled(id1, Channel::kRight) ); + REQUIRE( s2->enabled(id1, Channel::kDepth) ); + } +} diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp new file mode 100644 index 0000000000000000000000000000000000000000..03d2991fb854079fb8a2f00f9e3847c3687dbacd --- /dev/null +++ b/test/muxer_unit.cpp @@ -0,0 +1,572 @@ +#include "catch.hpp" + +#include <ftl/protocol/streams.hpp> +#include <ftl/protocol/muxer.hpp> +#include <ftl/protocol/broadcaster.hpp> +#include <nlohmann/json.hpp> + +using ftl::protocol::Muxer; +using ftl::protocol::Broadcast; +using ftl::protocol::Stream; +using ftl::protocol::StreamPacket; +using ftl::protocol::Packet; +using ftl::protocol::Channel; +using ftl::protocol::ChannelSet; +using ftl::protocol::FrameID; + +class TestStream : public ftl::protocol::Stream { + public: + TestStream() {}; + ~TestStream() {}; + + bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); + trigger(spkt, pkt); + return true; + } + + bool begin() override { return true; } + bool end() override { return true; } + bool active() override { return true; } + + void setProperty(ftl::protocol::StreamProperty opt, int value) override {} + + int getProperty(ftl::protocol::StreamProperty opt) override { return 0; } + + bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } + + void forceSeen(FrameID id, Channel channel) { + seen(id, channel); + } +}; + +TEST_CASE("Muxer post, distinct framesets", "[stream]") { + + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("write with one stream fails") { + std::shared_ptr<Stream> s = std::make_shared<TestStream>(); + REQUIRE(s); + + mux->add(s); + + ftl::protocol::StreamPacket tspkt = {4,0,0,1, Channel::kColour}; + + auto h = s->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( !mux->post({4,100,0,1,ftl::protocol::Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 0 ); + } + + SECTION("write to previously read") { + + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + ftl::protocol::StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.timestamp == 100 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.timestamp == 101 ); + REQUIRE( tspkt.frame_number == 0 ); + + StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; + StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; + auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + tspkt2 = spkt; + return true; + }); + auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { + tspkt3 = spkt; + return true; + }); + + REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); + REQUIRE( tspkt3.timestamp == 200 ); + REQUIRE( tspkt3.streamID == 0 ); + REQUIRE( tspkt3.frame_number == 0 ); + } +} + +TEST_CASE("Muxer post, single frameset", "[stream]") { + + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("write to previously read") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 1 ); + + StreamPacket tspkt2 = {4,0,4,4,Channel::kColour}; + StreamPacket tspkt3 = {4,0,4,4,Channel::kColour}; + auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + tspkt2 = spkt; + return true; + }); + auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { + tspkt3 = spkt; + return true; + }); + + REQUIRE( mux->post({4,200,1,1,Channel::kColour},{}) ); + REQUIRE( tspkt2.streamID == 4 ); + REQUIRE( tspkt2.frame_number == 4 ); + REQUIRE( tspkt3.streamID == 0 ); + REQUIRE( tspkt3.frame_number == 0 ); + + REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); + REQUIRE( tspkt2.streamID == 0 ); + REQUIRE( tspkt2.frame_number == 0 ); + } +} + +TEST_CASE("Muxer read", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("read with two writing streams") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1, 0); + mux->add(s2, 0); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 100 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 101 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s1->post({4,102,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 102 ); + REQUIRE( tspkt.frame_number == 2 ); + + REQUIRE( s2->post({4,103,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 103 ); + REQUIRE( tspkt.frame_number == 3 ); + } + + SECTION("read consistency with two writing streams") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1, 0); + mux->add(s2, 0); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 100 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 101 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s1->post({4,102,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 102 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,103,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 103 ); + REQUIRE( tspkt.frame_number == 1 ); + } +} + +TEST_CASE("Muxer read multi-frameset", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + //SECTION("read with two writing streams") { + + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + std::shared_ptr<Stream> s3 = std::make_shared<TestStream>(); + REQUIRE(s3); + std::shared_ptr<Stream> s4 = std::make_shared<TestStream>(); + REQUIRE(s4); + + mux->add(s1,0); + mux->add(s2,1); + mux->add(s3,0); + mux->add(s4,1); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s3->post({4,102,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s4->post({4,103,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 1 ); + //} +} + +TEST_CASE("Muxer enable", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + SECTION("enable frame id") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kColour); + + REQUIRE( !s1->enabled(id1) ); + REQUIRE( mux->enable(id1) ); + REQUIRE( s1->enabled(id1) ); + REQUIRE( !s2->enabled(id1) ); + + FrameID id2(1, 1); + s2->forceSeen(id2, Channel::kColour); + + REQUIRE( !s2->enabled(id2) ); + REQUIRE( mux->enable(id2) ); + REQUIRE( s2->enabled(id2) ); + REQUIRE( !s1->enabled(id2) ); + + auto frames = mux->enabled(); + REQUIRE( frames.size() == 2 ); + REQUIRE( frames.find(id1) != frames.end() ); + REQUIRE( frames.find(id2) != frames.end() ); + } + + SECTION("enable frame id fails for unseen") { + FrameID id(0, 1); + REQUIRE( !mux->enable(id) ); + } + + SECTION("enable channel fails for unseen") { + FrameID id(0, 1); + REQUIRE( !mux->enable(id, Channel::kDepth) ); + } + + SECTION("enable channel set fails for unseen") { + FrameID id(0, 1); + ChannelSet set = {Channel::kDepth, Channel::kRight}; + REQUIRE( !mux->enable(id, set) ); + } + + SECTION("enable frame id and channel") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kDepth); + + REQUIRE( !s1->enabled(id1, Channel::kDepth) ); + REQUIRE( mux->enable(id1, Channel::kDepth) ); + REQUIRE( s1->enabled(id1, Channel::kDepth) ); + REQUIRE( !s2->enabled(id1, Channel::kDepth) ); + } + + SECTION("enable frame id and channel set") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kRight); + + ChannelSet set = {Channel::kDepth, Channel::kRight}; + REQUIRE( !s1->enabled(id1, Channel::kDepth) ); + REQUIRE( !s1->enabled(id1, Channel::kRight) ); + REQUIRE( mux->enable(id1, set) ); + REQUIRE( s1->enabled(id1, Channel::kDepth) ); + REQUIRE( s1->enabled(id1, Channel::kRight) ); + REQUIRE( !s2->enabled(id1, Channel::kDepth) ); + } +} + +TEST_CASE("Muxer available", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + SECTION("available id when seen") { + FrameID id1(0, 1); + REQUIRE( !s1->available(id1) ); + REQUIRE( !mux->available(id1) ); + s1->forceSeen(id1, Channel::kColour); + REQUIRE( s1->available(id1) ); + REQUIRE( mux->available(id1) ); + + FrameID id2(1, 1); + REQUIRE( !s2->available(id2) ); + REQUIRE( !mux->available(id2) ); + s2->forceSeen(id2, Channel::kColour); + REQUIRE( s2->available(id2) ); + REQUIRE( !s1->available(id2) ); + REQUIRE( mux->available(id1) ); + REQUIRE( mux->available(id2) ); + } + + SECTION("available channel when seen") { + FrameID id1(0, 1); + REQUIRE( !s1->available(id1, Channel::kColour) ); + REQUIRE( !mux->available(id1, Channel::kColour) ); + s1->forceSeen(id1, Channel::kColour); + REQUIRE( s1->available(id1, Channel::kColour) ); + REQUIRE( mux->available(id1, Channel::kColour) ); + } + + SECTION("not available when wrong channel seen") { + FrameID id1(0, 1); + s1->forceSeen(id1, Channel::kDepth); + REQUIRE( mux->available(id1) ); + REQUIRE( !s1->available(id1, Channel::kColour) ); + REQUIRE( !mux->available(id1, Channel::kColour) ); + } + + SECTION("available channel set when seen all") { + FrameID id1(0, 1); + ChannelSet set = {Channel::kColour, Channel::kDepth}; + REQUIRE( !s1->available(id1, set) ); + REQUIRE( !mux->available(id1, set) ); + s1->forceSeen(id1, Channel::kColour); + s1->forceSeen(id1, Channel::kDepth); + REQUIRE( s1->available(id1, set) ); + REQUIRE( mux->available(id1, set) ); + } + + SECTION("not available channel set if not all seen") { + FrameID id1(0, 1); + ChannelSet set = {Channel::kColour, Channel::kDepth}; + REQUIRE( !s1->available(id1, set) ); + REQUIRE( !mux->available(id1, set) ); + s1->forceSeen(id1, Channel::kDepth); + REQUIRE( !s1->available(id1, set) ); + REQUIRE( !mux->available(id1, set) ); + } +} + +TEST_CASE("Muxer onAvailable", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + SECTION("available event when seen") { + FrameID id1(0, 1); + + bool seen1a = false; + auto h1 = s1->onAvailable([&seen1a, id1](FrameID id, Channel channel) { + seen1a = true; + REQUIRE( id == id1 ); + return true; + }); + + bool seen1b = false; + auto h2 = mux->onAvailable([&seen1b, id1](FrameID id, Channel channel) { + seen1b = true; + REQUIRE( id == id1 ); + return true; + }); + + s1->forceSeen(id1, Channel::kColour); + REQUIRE( seen1a ); + REQUIRE( seen1b ); + } +} + +TEST_CASE("Muxer frames", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + SECTION("unique framesets list correct") { + mux->add(s1); + mux->add(s2); + + FrameID id1(0, 1); + FrameID id2(1, 1); + FrameID id3(0, 2); + FrameID id4(1, 2); + + REQUIRE( mux->frames().size() == 0 ); + + s1->forceSeen(id1, Channel::kColour); + s2->forceSeen(id2, Channel::kColour); + s1->forceSeen(id3, Channel::kColour); + s2->forceSeen(id4, Channel::kColour); + + auto frames = mux->frames(); + REQUIRE( frames.size() == 4 ); + REQUIRE( frames.find(id1) != frames.end() ); + REQUIRE( frames.find(id2) != frames.end() ); + REQUIRE( frames.find(id3) != frames.end() ); + REQUIRE( frames.find(id4) != frames.end() ); + } + + SECTION("merged framesets list correct") { + mux->add(s1, 1); + mux->add(s2, 1); + + FrameID id1(0, 0); + FrameID id2(0, 1); + + REQUIRE( mux->frames().size() == 0 ); + + s1->forceSeen(id1, Channel::kColour); + s2->forceSeen(id1, Channel::kColour); + s1->forceSeen(id2, Channel::kColour); + s2->forceSeen(id2, Channel::kColour); + + auto frames = mux->frames(); + REQUIRE( frames.size() == 4 ); + REQUIRE( frames.find(FrameID(1, 0)) != frames.end() ); + REQUIRE( frames.find(FrameID(1, 1)) != frames.end() ); + REQUIRE( frames.find(FrameID(1, 2)) != frames.end() ); + REQUIRE( frames.find(FrameID(1, 3)) != frames.end() ); + } +} + +TEST_CASE("Muxer channels", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + SECTION("correct channels for valid frame") { + FrameID id1(0, 1); + + s1->forceSeen(id1, Channel::kColour); + s1->forceSeen(id1, Channel::kDepth); + + auto set = mux->channels(id1); + REQUIRE( set.size() == 2 ); + REQUIRE( set.count(Channel::kColour) == 1 ); + REQUIRE( set.count(Channel::kDepth) == 1 ); + } + + SECTION("empty for invalid frame") { + FrameID id1(0, 1); + auto set = mux->channels(id1); + REQUIRE( set.size() == 0 ); + } +} + +TEST_CASE("Muxer enabledChannels", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + SECTION("correct channels for valid frame") { + FrameID id1(0, 1); + + s1->forceSeen(id1, Channel::kColour); + s1->forceSeen(id1, Channel::kDepth); + + auto set = mux->channels(id1); + REQUIRE( set.size() == 2 ); + + REQUIRE( mux->enable(id1, set) ); + + REQUIRE( mux->enabledChannels(id1) == set ); + REQUIRE( s1->enabledChannels(id1) == set ); + } + + SECTION("empty for invalid frame") { + FrameID id1(0, 1); + auto set = mux->enabledChannels(id1); + REQUIRE( set.size() == 0 ); + } +} diff --git a/test/stream_integration.cpp b/test/stream_integration.cpp index 6e16b2ab3ff53d5cc656b079387967d20fe0c0c2..66f8eeedfa654e4eb22ff34bcbe8cdeb161cd0a1 100644 --- a/test/stream_integration.cpp +++ b/test/stream_integration.cpp @@ -40,6 +40,12 @@ TEST_CASE("TCP Stream", "[net]") { return true; }); + bool seenReq = false; + auto h2 = s1->onRequest([&seenReq](const ftl::protocol::Request &req) { + seenReq = true; + return true; + }); + s1->begin(); s2->begin(); @@ -48,6 +54,8 @@ TEST_CASE("TCP Stream", "[net]") { // TODO: Find better option std::this_thread::sleep_for(std::chrono::milliseconds(10)); + REQUIRE( seenReq ); + ftl::protocol::StreamPacket spkt; spkt.streamID = 0; spkt.frame_number = 0; diff --git a/test/stream_unit.cpp b/test/stream_unit.cpp deleted file mode 100644 index fcbb1430fe399b28ae352782e7bd5b5d63c2352d..0000000000000000000000000000000000000000 --- a/test/stream_unit.cpp +++ /dev/null @@ -1,301 +0,0 @@ -#include "catch.hpp" - -#include <ftl/protocol/streams.hpp> -#include <ftl/protocol/muxer.hpp> -#include <ftl/protocol/broadcaster.hpp> -#include <nlohmann/json.hpp> - -using ftl::protocol::Muxer; -using ftl::protocol::Broadcast; -using ftl::protocol::Stream; -using ftl::protocol::StreamPacket; -using ftl::protocol::Packet; -using ftl::protocol::Channel; -using ftl::protocol::FrameID; - -class TestStream : public ftl::protocol::Stream { - public: - TestStream() {}; - ~TestStream() {}; - - bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { - seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); - trigger(spkt, pkt); - return true; - } - - bool begin() override { return true; } - bool end() override { return true; } - bool active() override { return true; } - - void setProperty(ftl::protocol::StreamProperty opt, int value) override {} - - int getProperty(ftl::protocol::StreamProperty opt) override { return 0; } - - bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } - - private: - //std::function<void(const StreamPacket &, const Packet &)> cb_; -}; - -TEST_CASE("ftl::stream::Muxer()::post, distinct framesets", "[stream]") { - - std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); - REQUIRE(mux); - - SECTION("write with one stream fails") { - std::shared_ptr<Stream> s = std::make_shared<TestStream>(); - REQUIRE(s); - - mux->add(s); - - ftl::protocol::StreamPacket tspkt = {4,0,0,1, Channel::kColour}; - - auto h = s->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { - tspkt = spkt; - return true; - }); - - REQUIRE( !mux->post({4,100,0,1,ftl::protocol::Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 0 ); - } - - SECTION("write to previously read") { - - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); - REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); - REQUIRE(s2); - - mux->add(s1); - mux->add(s2); - - ftl::protocol::StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { - tspkt = spkt; - return true; - }); - - REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 0 ); - REQUIRE( tspkt.timestamp == 100 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 1 ); - REQUIRE( tspkt.timestamp == 101 ); - REQUIRE( tspkt.frame_number == 0 ); - - StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; - StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; - auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { - tspkt2 = spkt; - return true; - }); - auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { - tspkt3 = spkt; - return true; - }); - - REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); - REQUIRE( tspkt3.timestamp == 200 ); - REQUIRE( tspkt3.streamID == 0 ); - REQUIRE( tspkt3.frame_number == 0 ); - } -} - -TEST_CASE("ftl::stream::Muxer()::post, single frameset", "[stream]") { - - std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); - REQUIRE(mux); - - SECTION("write to previously read") { - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); - REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); - REQUIRE(s2); - - mux->add(s1,1); - mux->add(s2,1); - - StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { - tspkt = spkt; - return true; - }); - - REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 1 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 1 ); - REQUIRE( tspkt.frame_number == 1 ); - - StreamPacket tspkt2 = {4,0,4,4,Channel::kColour}; - StreamPacket tspkt3 = {4,0,4,4,Channel::kColour}; - auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { - tspkt2 = spkt; - return true; - }); - auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { - tspkt3 = spkt; - return true; - }); - - REQUIRE( mux->post({4,200,1,1,Channel::kColour},{}) ); - REQUIRE( tspkt2.streamID == 4 ); - REQUIRE( tspkt2.frame_number == 4 ); - REQUIRE( tspkt3.streamID == 0 ); - REQUIRE( tspkt3.frame_number == 0 ); - - REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); - REQUIRE( tspkt2.streamID == 0 ); - REQUIRE( tspkt2.frame_number == 0 ); - } -} - -TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { - std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); - REQUIRE(mux); - - SECTION("read with two writing streams") { - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); - REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); - REQUIRE(s2); - - mux->add(s1, 0); - mux->add(s2, 0); - - StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { - tspkt = spkt; - return true; - }); - - REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 100 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 101 ); - REQUIRE( tspkt.frame_number == 1 ); - - REQUIRE( s1->post({4,102,0,1,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 102 ); - REQUIRE( tspkt.frame_number == 2 ); - - REQUIRE( s2->post({4,103,0,1,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 103 ); - REQUIRE( tspkt.frame_number == 3 ); - } - - SECTION("read consistency with two writing streams") { - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); - REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); - REQUIRE(s2); - - mux->add(s1, 0); - mux->add(s2, 0); - - StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { - tspkt = spkt; - return true; - }); - - REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 100 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 101 ); - REQUIRE( tspkt.frame_number == 1 ); - - REQUIRE( s1->post({4,102,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 102 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s2->post({4,103,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.timestamp == 103 ); - REQUIRE( tspkt.frame_number == 1 ); - } -} - -TEST_CASE("ftl::stream::Muxer()::read multi-frameset", "[stream]") { - std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); - REQUIRE(mux); - - //SECTION("read with two writing streams") { - - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); - REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); - REQUIRE(s2); - std::shared_ptr<Stream> s3 = std::make_shared<TestStream>(); - REQUIRE(s3); - std::shared_ptr<Stream> s4 = std::make_shared<TestStream>(); - REQUIRE(s4); - - mux->add(s1,0); - mux->add(s2,1); - mux->add(s3,0); - mux->add(s4,1); - - StreamPacket tspkt = {4,0,0,1,Channel::kColour}; - auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { - tspkt = spkt; - return true; - }); - - REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 0 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 1 ); - REQUIRE( tspkt.frame_number == 0 ); - - REQUIRE( s3->post({4,102,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 0 ); - REQUIRE( tspkt.frame_number == 1 ); - - REQUIRE( s4->post({4,103,0,0,Channel::kColour},{}) ); - REQUIRE( tspkt.streamID == 1 ); - REQUIRE( tspkt.frame_number == 1 ); - //} -} - -TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { - std::unique_ptr<Broadcast> mux = std::make_unique<Broadcast>(); - REQUIRE(mux); - - SECTION("write with two streams") { - std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); - REQUIRE(s1); - std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); - REQUIRE(s2); - - mux->add(s1); - mux->add(s2); - - StreamPacket tspkt1 = {4,0,0,1,Channel::kColour}; - StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; - - auto h1 = s1->onPacket([&tspkt1](const StreamPacket &spkt, const Packet &pkt) { - tspkt1 = spkt; - return true; - }); - auto h2 = s2->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { - tspkt2 = spkt; - return true; - }); - - REQUIRE( mux->post({4,100,0,1,Channel::kColour},{}) ); - REQUIRE( tspkt1.timestamp == 100 ); - REQUIRE( tspkt2.timestamp == 100 ); - } - -}