Skip to content
Snippets Groups Projects
Commit b7d422c6 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'feature/#53' into 'main'

#53 Allow more muxer find options

See merge request beyondaka/beyond-protocol!31
parents b4a49771 8e58f771
No related branches found
No related tags found
No related merge requests found
...@@ -91,12 +91,64 @@ class Muxer : public Stream { ...@@ -91,12 +91,64 @@ class Muxer : public Stream {
*/ */
std::shared_ptr<Stream> originStream(FrameID) const; std::shared_ptr<Stream> originStream(FrameID) const;
/**
* @brief Get a stream by URI.
*
* If the stream does not exist in the muxer, then returns nullptr.
*
* @param uri
* @return std::shared_ptr<Stream>
*/
std::shared_ptr<Stream> findStream(const std::string &uri) const;
/**
* @brief Find the local frame ID using only the URI.
*
* If the URI does not contain a "set" and "frame" query component,
* then 0 and 0 are assumed.
*
* @param uri
* @return FrameID
*/
FrameID findLocal(const std::string &uri) const;
/**
* @brief Find the local frame ID given a URI and stream specific ID.
*
* This function first attempts to find a stream entity using the URI.
*
* @param uri
* @param remote
* @return FrameID
*/
FrameID findLocal(const std::string &uri, FrameID remote) const; FrameID findLocal(const std::string &uri, FrameID remote) const;
/**
* @brief Find the system local ID for a stream specific ID.
*
* @param stream
* @param remote
* @return FrameID
*/
FrameID findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const; FrameID findLocal(const std::shared_ptr<Stream> &stream, FrameID remote) const;
/**
* @brief Given a local frame ID, get the stream specific ID.
*
* @see originStream
*
* @param local
* @return FrameID
*/
FrameID findRemote(FrameID local) const; FrameID findRemote(FrameID local) const;
/**
* @brief Obtain a list of all streams in the muxer.
*
* @return std::list<std::shared_ptr<Stream>>
*/
std::list<std::shared_ptr<Stream>> streams() const;
private: private:
struct StreamEntry { struct StreamEntry {
std::shared_ptr<Stream> stream; std::shared_ptr<Stream> stream;
......
...@@ -190,7 +190,7 @@ class Stream { ...@@ -190,7 +190,7 @@ class Stream {
* @return true if all channels exist * @return true if all channels exist
* @return false if one or more do not exist * @return false if one or more do not exist
*/ */
bool available(FrameID id, const ftl::protocol::ChannelSet channels) const; bool available(FrameID id, const ftl::protocol::ChannelSet &channels) const;
/** /**
* @brief Register a callback for when new frames and channels become available. * @brief Register a callback for when new frames and channels become available.
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <ftl/protocol/muxer.hpp> #include <ftl/protocol/muxer.hpp>
#include <ftl/lib/loguru.hpp> #include <ftl/lib/loguru.hpp>
#include <ftl/uri.hpp>
using ftl::protocol::Muxer; using ftl::protocol::Muxer;
using ftl::protocol::Stream; using ftl::protocol::Stream;
...@@ -74,6 +75,47 @@ std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const { ...@@ -74,6 +75,47 @@ std::pair<FrameID, Muxer::StreamEntry*> Muxer::_mapToOutput(FrameID id) const {
} }
} }
std::shared_ptr<Stream> Muxer::findStream(const std::string &uri) const {
SHARED_LOCK(mutex_, lk);
for (const auto &e : streams_) {
if (std::any_cast<std::string>(e.stream->getProperty(StreamProperty::kURI)) == uri) {
return e.stream;
}
}
return nullptr;
}
FrameID Muxer::findLocal(const std::string &uri) const {
ftl::URI u(uri);
const StreamEntry *entry = nullptr;
int fsid = 0;
int fid = 0;
if (u.hasAttribute("set")) {
fsid = u.getAttribute<int>("set");
}
if (u.hasAttribute("frame")) {
fid = u.getAttribute<int>("frame");
}
FrameID remote(fsid, fid);
{
SHARED_LOCK(mutex_, lk);
for (const auto &e : streams_) {
if (std::any_cast<std::string>(e.stream->getProperty(StreamProperty::kURI)) == uri) {
entry = &e;
break;
}
}
}
if (entry) {
return _mapFromInput(entry, remote);
} else {
throw FTL_Error("No stream");
}
}
FrameID Muxer::findLocal(const std::string &uri, FrameID remote) const { FrameID Muxer::findLocal(const std::string &uri, FrameID remote) const {
const StreamEntry *entry = nullptr; const StreamEntry *entry = nullptr;
...@@ -122,6 +164,15 @@ FrameID Muxer::findRemote(FrameID local) const { ...@@ -122,6 +164,15 @@ FrameID Muxer::findRemote(FrameID local) const {
return m.first; return m.first;
} }
std::list<std::shared_ptr<Stream>> Muxer::streams() const {
std::list<std::shared_ptr<Stream>> result;
result.resize(streams_.size());
std::transform(streams_.begin(), streams_.end(), result.begin(), [](const StreamEntry &e) {
return e.stream;
});
return result;
}
void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) { void Muxer::add(const std::shared_ptr<Stream> &s, int fsid) {
UNIQUE_LOCK(mutex_, lk); UNIQUE_LOCK(mutex_, lk);
......
...@@ -29,7 +29,7 @@ bool Stream::available(FrameID id, Channel channel) const { ...@@ -29,7 +29,7 @@ bool Stream::available(FrameID id, Channel channel) const {
return false; return false;
} }
bool Stream::available(FrameID id, ChannelSet channels) const { bool Stream::available(FrameID id, const ChannelSet &channels) const {
SHARED_LOCK(mtx_, lk); SHARED_LOCK(mtx_, lk);
auto it = state_.find(id); auto it = state_.find(id);
if (it != state_.end()) { if (it != state_.end()) {
......
...@@ -16,8 +16,9 @@ using ftl::protocol::FrameID; ...@@ -16,8 +16,9 @@ using ftl::protocol::FrameID;
class TestStream : public ftl::protocol::Stream { class TestStream : public ftl::protocol::Stream {
public: public:
TestStream() {}; TestStream() {}
~TestStream() {}; explicit TestStream(const std::string &uri) : uri_(uri) {}
~TestStream() {}
bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) { bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::DataPacket &pkt) {
seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel); seen(FrameID(spkt.streamID, spkt.frame_number), spkt.channel);
...@@ -31,7 +32,12 @@ class TestStream : public ftl::protocol::Stream { ...@@ -31,7 +32,12 @@ class TestStream : public ftl::protocol::Stream {
void setProperty(ftl::protocol::StreamProperty opt, std::any value) override {} void setProperty(ftl::protocol::StreamProperty opt, std::any value) override {}
std::any getProperty(ftl::protocol::StreamProperty opt) override { return 0; } std::any getProperty(ftl::protocol::StreamProperty opt) override {
if (opt == ftl::protocol::StreamProperty::kURI) {
return uri_;
}
return 0;
}
bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; } bool supportsProperty(ftl::protocol::StreamProperty opt) override { return true; }
...@@ -42,6 +48,9 @@ class TestStream : public ftl::protocol::Stream { ...@@ -42,6 +48,9 @@ class TestStream : public ftl::protocol::Stream {
void fakeError(ftl::protocol::Error err, const std::string &str) { void fakeError(ftl::protocol::Error err, const std::string &str) {
error(err, str); error(err, str);
} }
private:
std::string uri_;
}; };
TEST_CASE("Muxer post, distinct framesets", "[stream]") { TEST_CASE("Muxer post, distinct framesets", "[stream]") {
...@@ -705,4 +714,20 @@ TEST_CASE("Muxer mappings", "[stream]") { ...@@ -705,4 +714,20 @@ TEST_CASE("Muxer mappings", "[stream]") {
REQUIRE( f1.frameset() == 0 ); REQUIRE( f1.frameset() == 0 );
} }
SECTION("can find stream by URI") {
std::shared_ptr<Stream> s1 = std::make_shared<TestStream>("ftl://myuri1");
REQUIRE(s1);
std::shared_ptr<Stream> s2 = std::make_shared<TestStream>("ftl://myuri2");
REQUIRE(s2);
mux->add(s1,1);
mux->add(s2,1);
auto foundS = mux->findStream("ftl://myuri2");
REQUIRE( foundS == s2 );
auto foundS2 = mux->findStream("ftl://myuri3");
REQUIRE( foundS2 == nullptr );
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment