diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index 1967dad0f1e9699c079c74c28cf741e2433d6359..c427e13842e4d57a41653885a41ff94e6251c8ef 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -11,6 +11,7 @@ #include <memory> #include <unordered_map> #include <utility> +#include <string> #include <ftl/protocol/streams.hpp> namespace ftl { @@ -90,6 +91,12 @@ class Muxer : public Stream { */ std::shared_ptr<Stream> originStream(FrameID) const; + FrameID findLocal(const std::string &uri, FrameID remote) const; + + FrameID findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const; + + FrameID findRemote(FrameID local) const; + private: struct StreamEntry { std::shared_ptr<Stream> stream; @@ -113,6 +120,8 @@ class Muxer : public Stream { /* On packet receive, map to local ID */ FrameID _mapFromInput(StreamEntry *, FrameID id); + FrameID _mapFromInput(const StreamEntry *, FrameID id) const; + /* On posting, map to output ID */ std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const; }; diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index 86cb4cdf37bc3e9bc51868094fedec66b2cf02fe..e375d193400f997ceed39771f75d4fe6cba54667 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -361,13 +361,13 @@ class Stream { return error_cb_.on(cb); } + /** Mark the channel and frame as available */ + void seen(FrameID id, ftl::protocol::Channel channel); + protected: /** Dispatch packets to callbacks */ void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt); - /** Mark the channel and frame as available */ - void seen(FrameID id, ftl::protocol::Channel channel); - /** Dispatch a request */ void request(const Request &req); diff --git a/src/streams/muxer.cpp b/src/streams/muxer.cpp index edba691909da8f0fb11c74f642883b5acd6ac19c..128020ca674b6d9719cf068a9af6603eb3948f3a 100644 --- a/src/streams/muxer.cpp +++ b/src/streams/muxer.cpp @@ -53,6 +53,17 @@ FrameID Muxer::_mapFromInput(Muxer::StreamEntry *s, FrameID id) { } } +FrameID Muxer::_mapFromInput(const Muxer::StreamEntry *s, FrameID id) const { + SHARED_LOCK(mutex_, lk); + int64_t iid = (int64_t(s->id) << 32) | id.id; + auto it = imap_.find(iid); + if (it != imap_.end()) { + return it->second; + } else { + throw FTL_Error("No mapping"); + } +} + std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const { SHARED_LOCK(mutex_, lk); auto it = omap_.find(id); @@ -63,6 +74,54 @@ std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const { } } +FrameID Muxer::findLocal(const std::string &uri, FrameID remote) const { + const StreamEntry *entry = nullptr; + + { + SHARED_LOCK(mutex_, lk); + for (const auto &e : streams_) { + if (std::any_cast<std::string>(e.stream->getProperty(StreamProperty::kURI)) == uri) { + entry = &e; + break; + } + } + } + + if (entry) { + return _mapFromInput(entry, remote); + } else { + throw FTL_Error("No stream"); + } +} + +FrameID Muxer::findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const { + const StreamEntry *entry = nullptr; + + { + SHARED_LOCK(mutex_, lk); + for (const auto &e : streams_) { + if (e.stream == stream) { + entry = &e; + break; + } + } + } + + if (entry) { + return _mapFromInput(entry, remote); + } else { + throw FTL_Error("No stream"); + } +} + +FrameID Muxer::findRemote(FrameID local) const { + auto m = _mapToOutput(local); + if (m.second == nullptr) { + throw FTL_Error("No mapping"); + } + return m.first; +} + void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { UNIQUE_LOCK(mutex_, lk); diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp index 4b8263e1cd00151d29eb1e07e9ce30f48147ffdf..995721056dfab5d526f8b6197c3c0467b135dc6f 100644 --- a/test/muxer_unit.cpp +++ b/test/muxer_unit.cpp @@ -642,3 +642,67 @@ TEST_CASE("Muxer onError", "[stream]") { REQUIRE( seenErr == ftl::protocol::Error::kUnknown ); } + +TEST_CASE("Muxer mappings", "[stream]") { + + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("can get local from remote") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + + auto f1 = mux->findLocal(s2, FrameID(0, 0)); + + REQUIRE( f1.frameset() == 1 ); + } + + SECTION("fails if mapping not valid") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + + bool didThrow = false; + + try { + mux->findLocal(s2, FrameID(1, 0)); + } catch(const ftl::exception &e) { + e.what(); + didThrow = true; + } + + REQUIRE( didThrow ); + } + + SECTION("can get remote from local") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1,1); + mux->add(s2,1); + + s1->seen(FrameID(0, 0), Channel::kEndFrame); + s2->seen(FrameID(0, 0), Channel::kEndFrame); + + auto f1 = mux->findRemote(FrameID(1, 0)); + + REQUIRE( f1.frameset() == 0 ); + } +}