diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index 633f815ad9bb113e9fc3de18e03b95f11ada03e2..0df1a67ae592e497b3b56cd34c32efd55f4fb856 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -200,10 +200,22 @@ void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { })); se.req_handle = std::move(s->onRequest([this, ptr](const Request &req) { - FrameID newID = _mapFromInput(ptr, req.id); - Request newRequest = req; - newRequest.id = newID; - request(newRequest); + if (req.id.frameset() == 255 || req.id.source() == 255) { + for (const auto &i : ptr->stream->frames()) { + if (req.id.frameset() != 255 && req.id.frameset() != i.frameset()) continue; + if (req.id.source() != 255 && req.id.source() != i.source()) continue; + + FrameID newID = _mapFromInput(ptr, i); + Request newRequest = req; + newRequest.id = newID; + request(newRequest); + } + } else { + FrameID newID = _mapFromInput(ptr, req.id); + Request newRequest = req; + newRequest.id = newID; + request(newRequest); + } return true; })); @@ -288,26 +300,71 @@ void Muxer::reset() { } bool Muxer::enable(FrameID id) { - auto p = _mapToOutput(id); - if (!p.second) return false; - bool r = p.second->stream->enable(p.first); - if (r) Stream::enable(id); + bool r = true; + + if (id.frameset() == 255 || id.source() == 255) { + for (const auto &i : frames()) { + if (id.frameset() != 255 && id.frameset() != i.frameset()) continue; + if (id.source() != 255 && id.source() != i.source()) continue; + + auto p = _mapToOutput(i); + if (!p.second) return false; + bool rr = p.second->stream->enable(p.first); + if (rr) Stream::enable(i); + r = r && rr; + } + } else { + auto p = _mapToOutput(id); + if (!p.second) return false; + r = p.second->stream->enable(p.first); + if (r) Stream::enable(id); + } return r; } bool Muxer::enable(FrameID id, ftl::protocol::Channel channel) { - auto p = _mapToOutput(id); - if (!p.second) return false; - bool r = p.second->stream->enable(p.first, channel); - if (r) Stream::enable(id, channel); + bool r = true; + + if (id.frameset() == 255 || id.source() == 255) { + for (const auto &i : frames()) { + if (id.frameset() != 255 && id.frameset() != i.frameset()) continue; + if (id.source() != 255 && id.source() != i.source()) continue; + + auto p = _mapToOutput(i); + if (!p.second) return false; + bool rr = p.second->stream->enable(p.first, channel); + if (rr) Stream::enable(i, channel); + r = r && rr; + } + } else { + auto p = _mapToOutput(id); + if (!p.second) return false; + r = p.second->stream->enable(p.first, channel); + if (r) Stream::enable(id, channel); + } return r; } bool Muxer::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { - auto p = _mapToOutput(id); - if (!p.second) return false; - bool r = p.second->stream->enable(p.first, channels); - if (r) Stream::enable(id, channels); + bool r = true; + + if (id.frameset() == 255 || id.source() == 255) { + for (const auto &i : frames()) { + if (id.frameset() != 255 && id.frameset() != i.frameset()) continue; + if (id.source() != 255 && id.source() != i.source()) continue; + + auto p = _mapToOutput(i); + if (!p.second) return false; + bool rr = p.second->stream->enable(p.first, channels); + if (rr) Stream::enable(i, channels); + r = r && rr; + } + } else { + auto p = _mapToOutput(id); + if (!p.second) return false; + r = p.second->stream->enable(p.first, channels); + if (r) Stream::enable(id, channels); + } return r; } diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp index c1d018bc888beac6e7a1ff37f3b4d46e116b0db4..550adfa01f4ec18ecc1e73e60da0472f4ee47fe8 100644 --- a/test/muxer_unit.cpp +++ b/test/muxer_unit.cpp @@ -39,6 +39,16 @@ class TestStream : public ftl::protocol::Stream { return 0; } + void sendRequest(Channel c, uint8_t frameset, uint8_t frames, uint8_t count) { + ftl::protocol::Request req; + req.id = FrameID(frameset, frames); + req.channel = c; + req.bitrate = 255; + req.codec = ftl::protocol::Codec::kAny; + req.count = count; + request(req); + } + bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } void forceSeen(FrameID id, Channel channel) { @@ -318,6 +328,23 @@ TEST_CASE("Muxer enable", "[stream]") { REQUIRE( frames.find(id2) != frames.end() ); } + SECTION("enable frame id 255") { + FrameID id1(0, 1); + FrameID id2(0, 2); + FrameID id3(0, 3); + s1->forceSeen(id1, Channel::kColour); + s1->forceSeen(id2, Channel::kColour); + s1->forceSeen(id3, Channel::kColour); + + REQUIRE( !s1->enabled(id1) ); + REQUIRE( !s1->enabled(id2) ); + REQUIRE( !s1->enabled(id3) ); + REQUIRE( mux->enable(FrameID(0, 255)) ); + REQUIRE( s1->enabled(id1) ); + REQUIRE( s1->enabled(id2) ); + REQUIRE( s1->enabled(id3) ); + } + SECTION("enable frame id fails for unseen") { FrameID id(0, 1); REQUIRE( !mux->enable(id) ); @@ -344,6 +371,23 @@ TEST_CASE("Muxer enable", "[stream]") { REQUIRE( !s2->enabled(id1, Channel::kDepth) ); } + SECTION("enable frame id 255 and channel") { + FrameID id1(0, 1); + FrameID id2(0, 2); + FrameID id3(0, 3); + s1->forceSeen(id1, Channel::kColour); + s1->forceSeen(id2, Channel::kColour); + s1->forceSeen(id3, Channel::kColour); + + REQUIRE( !s1->enabled(id1) ); + REQUIRE( !s1->enabled(id2) ); + REQUIRE( !s1->enabled(id3) ); + REQUIRE( mux->enable(FrameID(0, 255), Channel::kColour) ); + REQUIRE( s1->enabled(id1, Channel::kColour) ); + REQUIRE( s1->enabled(id2, Channel::kColour) ); + REQUIRE( s1->enabled(id3, Channel::kColour) ); + } + SECTION("enable frame id and channel set") { FrameID id1(0, 1); s1->forceSeen(id1, Channel::kDepth); @@ -731,3 +775,115 @@ TEST_CASE("Muxer mappings", "[stream]") { REQUIRE( foundS2 == nullptr ); } } + +TEST_CASE("Muxer requests", "[stream]") { + + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("can propagate specific request") { + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + + std::atomic_int seenReq = 0; + ftl::protocol::Request lastReq; + + auto h1 = mux->onRequest([&seenReq, &lastReq](const ftl::protocol::Request &req) { + ++seenReq; + lastReq = req; + return true; + }); + + s2->sendRequest(Channel::kColour, 0, 0, 1); + + REQUIRE( seenReq == 1 ); + REQUIRE( lastReq.id.frameset() == 1 ); + } + + SECTION("can generate a single 255 frameset request") { + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + + std::atomic_int seenReq = 0; + ftl::protocol::Request lastReq; + lastReq.id = 0; + + auto h1 = mux->onRequest([&seenReq, &lastReq](const ftl::protocol::Request &req) { + ++seenReq; + lastReq = req; + return true; + }); + + s2->sendRequest(Channel::kColour, 255, 255, 1); + + REQUIRE( seenReq == 1 ); + REQUIRE( lastReq.id.frameset() == 1 ); + REQUIRE( mux->findRemote(lastReq.id).frameset() == 0 ); + } + + SECTION("can generate multiple requests from a 255 frameset") { + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 1), Channel::kEndFrame); + s2->seen(FrameID(1, 0), Channel::kEndFrame); + + std::atomic_int seenReq = 0; + auto h1 = mux->onRequest([&seenReq](const ftl::protocol::Request &req) { + ++seenReq; + return true; + }); + + s2->sendRequest(Channel::kColour, 255, 255, 1); + + REQUIRE( seenReq == 3 ); + } + + SECTION("can generate multiple requests from a 255 frame") { + std::shared_ptr<TestStream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<TestStream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 1), Channel::kEndFrame); + s2->seen(FrameID(1, 0), Channel::kEndFrame); + + std::atomic_int seenReq = 0; + auto h1 = mux->onRequest([&seenReq](const ftl::protocol::Request &req) { + ++seenReq; + return true; + }); + + s2->sendRequest(Channel::kColour, 0, 255, 1); + + REQUIRE( seenReq == 2 ); + } +} diff --git a/test/webservice_e2e.cpp b/test/webservice_e2e.cpp index a18abd0d2e18666db6e0db964b2d96d743412f64..c0fd1ef4a9efd8175a36436d0163c578399805a5 100644 --- a/test/webservice_e2e.cpp +++ b/test/webservice_e2e.cpp @@ -2,12 +2,19 @@ #include <ftl/protocol.hpp> #include <ftl/protocol/self.hpp> #include <ftl/protocol/node.hpp> +#include <ftl/protocol/streams.hpp> #include <ftl/uri.hpp> #include <nlohmann/json.hpp> // --- Tests ------------------------------------------------------------------- TEST_CASE("Webservice connection", "[net]") { + ftl::getSelf()->onNodeDetails([]() -> nlohmann::json { + return { + {"id", ftl::protocol::id.to_string()} + }; + }); + SECTION("connect using secure websocket") { std::string uri; if(const char* env_p = std::getenv("FTL_WEBSERVICE_URI")) { @@ -27,5 +34,37 @@ TEST_CASE("Webservice connection", "[net]") { LOG(INFO) << "Details: " << details.dump(); } + /*SECTION("can create a net stream") { + std::string uri; + if(const char* env_p = std::getenv("FTL_WEBSERVICE_URI")) { + uri = std::string(env_p); + } else { + return; + } + + auto p = ftl::connectNode(uri); + REQUIRE( p ); + + REQUIRE( p->waitConnection(5) ); + + auto details = p->details(); + REQUIRE(details.contains("id")); + + ftl::protocol::Request req; + req.id = 0; + + auto stream = ftl::createStream("ftl://ftlab.utu.fi/teststream"); + auto h = stream->onRequest([&req](const ftl::protocol::Request &r) { + req = r; + return true; + }); + + stream->begin(); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + REQUIRE( req.id.frameset() == 255 ); + }*/ + ftl::protocol::reset(); }