Skip to content
Snippets Groups Projects
Commit a743c471 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

#49 Public muxer queries and seen method

parent 00d68ad2
No related branches found
No related tags found
No related merge requests found
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include <string>
#include <ftl/protocol/streams.hpp> #include <ftl/protocol/streams.hpp>
namespace ftl { namespace ftl {
...@@ -90,6 +91,12 @@ class Muxer : public Stream { ...@@ -90,6 +91,12 @@ class Muxer : public Stream {
*/ */
std::shared_ptr<Stream> originStream(FrameID) const; std::shared_ptr<Stream> originStream(FrameID) const;
FrameID findLocal(const std::string &uri, FrameID remote) const;
FrameID findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const;
FrameID findRemote(FrameID local) const;
private: private:
struct StreamEntry { struct StreamEntry {
std::shared_ptr<Stream> stream; std::shared_ptr<Stream> stream;
...@@ -113,6 +120,8 @@ class Muxer : public Stream { ...@@ -113,6 +120,8 @@ class Muxer : public Stream {
/* On packet receive, map to local ID */ /* On packet receive, map to local ID */
FrameID _mapFromInput(StreamEntry *, FrameID id); FrameID _mapFromInput(StreamEntry *, FrameID id);
FrameID _mapFromInput(const StreamEntry *, FrameID id) const;
/* On posting, map to output ID */ /* On posting, map to output ID */
std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const; std::pair<FrameID, StreamEntry*> _mapToOutput(FrameID id) const;
}; };
......
...@@ -361,13 +361,13 @@ class Stream { ...@@ -361,13 +361,13 @@ class Stream {
return error_cb_.on(cb); return error_cb_.on(cb);
} }
/** Mark the channel and frame as available */
void seen(FrameID id, ftl::protocol::Channel channel);
protected: protected:
/** Dispatch packets to callbacks */ /** Dispatch packets to callbacks */
void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt); void trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt);
/** Mark the channel and frame as available */
void seen(FrameID id, ftl::protocol::Channel channel);
/** Dispatch a request */ /** Dispatch a request */
void request(const Request &req); void request(const Request &req);
......
...@@ -53,6 +53,17 @@ FrameID Muxer::_mapFromInput(Muxer::StreamEntry *s, FrameID id) { ...@@ -53,6 +53,17 @@ FrameID Muxer::_mapFromInput(Muxer::StreamEntry *s, FrameID id) {
} }
} }
FrameID Muxer::_mapFromInput(const Muxer::StreamEntry *s, FrameID id) const {
SHARED_LOCK(mutex_, lk);
int64_t iid = (int64_t(s->id) << 32) | id.id;
auto it = imap_.find(iid);
if (it != imap_.end()) {
return it->second;
} else {
throw FTL_Error("No mapping");
}
}
std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const { std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const {
SHARED_LOCK(mutex_, lk); SHARED_LOCK(mutex_, lk);
auto it = omap_.find(id); auto it = omap_.find(id);
...@@ -63,6 +74,54 @@ std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const { ...@@ -63,6 +74,54 @@ std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const {
} }
} }
FrameID Muxer::findLocal(const std::string &uri, FrameID remote) const {
const StreamEntry *entry = nullptr;
{
SHARED_LOCK(mutex_, lk);
for (const auto &e : streams_) {
if (std::any_cast<std::string>(e.stream->getProperty(StreamProperty::kURI)) == uri) {
entry = &e;
break;
}
}
}
if (entry) {
return _mapFromInput(entry, remote);
} else {
throw FTL_Error("No stream");
}
}
FrameID Muxer::findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const {
const StreamEntry *entry = nullptr;
{
SHARED_LOCK(mutex_, lk);
for (const auto &e : streams_) {
if (e.stream == stream) {
entry = &e;
break;
}
}
}
if (entry) {
return _mapFromInput(entry, remote);
} else {
throw FTL_Error("No stream");
}
}
FrameID Muxer::findRemote(FrameID local) const {
auto m = _mapToOutput(local);
if (m.second == nullptr) {
throw FTL_Error("No mapping");
}
return m.first;
}
void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) {
UNIQUE_LOCK(mutex_, lk); UNIQUE_LOCK(mutex_, lk);
......
...@@ -642,3 +642,67 @@ TEST_CASE("Muxer onError", "[stream]") { ...@@ -642,3 +642,67 @@ TEST_CASE("Muxer onError", "[stream]") {
REQUIRE( seenErr == ftl::protocol::Error::kUnknown ); REQUIRE( seenErr == ftl::protocol::Error::kUnknown );
} }
TEST_CASE("Muxer mappings", "[stream]") {
std::unique_ptr<Muxer> mux = std::make_unique<Muxer>();
REQUIRE(mux);
SECTION("can get local from remote") {
std::shared_ptr<Stream> s1 = std::make_shared<TestStream>();
REQUIRE(s1);
std::shared_ptr<Stream> s2 = std::make_shared<TestStream>();
REQUIRE(s2);
mux->add(s1,1);
mux->add(s2,1);
s1->seen(FrameID(0, 0), Channel::kEndFrame);
s2->seen(FrameID(0, 0), Channel::kEndFrame);
auto f1 = mux->findLocal(s2, FrameID(0, 0));
REQUIRE( f1.frameset() == 1 );
}
SECTION("fails if mapping not valid") {
std::shared_ptr<Stream> s1 = std::make_shared<TestStream>();
REQUIRE(s1);
std::shared_ptr<Stream> s2 = std::make_shared<TestStream>();
REQUIRE(s2);
mux->add(s1,1);
mux->add(s2,1);
s1->seen(FrameID(0, 0), Channel::kEndFrame);
s2->seen(FrameID(0, 0), Channel::kEndFrame);
bool didThrow = false;
try {
mux->findLocal(s2, FrameID(1, 0));
} catch(const ftl::exception &e) {
e.what();
didThrow = true;
}
REQUIRE( didThrow );
}
SECTION("can get remote from local") {
std::shared_ptr<Stream> s1 = std::make_shared<TestStream>();
REQUIRE(s1);
std::shared_ptr<Stream> s2 = std::make_shared<TestStream>();
REQUIRE(s2);
mux->add(s1,1);
mux->add(s2,1);
s1->seen(FrameID(0, 0), Channel::kEndFrame);
s2->seen(FrameID(0, 0), Channel::kEndFrame);
auto f1 = mux->findRemote(FrameID(1, 0));
REQUIRE( f1.frameset() == 0 );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment