diff --git a/CMakeLists.txt b/CMakeLists.txt index 1a39fa40970060ece8fa847b2b92ca5be2648100..517f260530dd0cc585b4a9e6a63f38fe693aeb1e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -171,6 +171,10 @@ add_library(beyond-protocol STATIC src/protocol/websocket.cpp src/base64.cpp src/protocol.cpp + src/streams.cpp + src/channelSet.cpp + src/muxer.cpp + src/broadcaster.cpp ) target_include_directories(beyond-protocol PUBLIC diff --git a/include/ftl/protocol/broadcaster.hpp b/include/ftl/protocol/broadcaster.hpp index f138c0acfccfb2f8e43a0ba8d6e537fb9c8ffb8b..081709d6ca51f7587d3b76b8751a15329bdbdff8 100644 --- a/include/ftl/protocol/broadcaster.hpp +++ b/include/ftl/protocol/broadcaster.hpp @@ -16,8 +16,8 @@ class Broadcast : public Stream { explicit Broadcast(); virtual ~Broadcast(); - void add(Stream *); - void remove(Stream *); + void add(const std::shared_ptr<Stream> &); + void remove(const std::shared_ptr<Stream> &); void clear(); bool post(const ftl::protocol::StreamPacket &, const ftl::protocol::Packet &) override; @@ -28,10 +28,10 @@ class Broadcast : public Stream { void reset() override; - const std::list<Stream*> &streams() const { return streams_; } + const std::list<std::shared_ptr<Stream>> &streams() const { return streams_; } private: - std::list<Stream*> streams_; + std::list<std::shared_ptr<Stream>> streams_; std::list<ftl::Handle> handles_; //StreamCallback cb_; SHARED_MUTEX mutex_; diff --git a/include/ftl/protocol/muxer.hpp b/include/ftl/protocol/muxer.hpp index 938ec291600c5505f38ef2ad83976dc10495d3ea..129cb467433d666562572631d4701e8be202e2f9 100644 --- a/include/ftl/protocol/muxer.hpp +++ b/include/ftl/protocol/muxer.hpp @@ -3,6 +3,7 @@ #include <ftl/protocol/streams.hpp> #include <map> +#include <list> namespace ftl { namespace protocol { @@ -20,8 +21,8 @@ class Muxer : public Stream { explicit Muxer(); virtual ~Muxer(); - void add(Stream *, size_t fsid=0, const std::function<int()> &cb=nullptr); - void remove(Stream *); + void add(const std::shared_ptr<Stream> &, size_t fsid=0, const std::function<int()> &cb=nullptr); + void remove(const std::shared_ptr<Stream> &); //bool onPacket(const StreamCallback &) override; @@ -33,11 +34,11 @@ class Muxer : public Stream { void reset() override; - ftl::protocol::Stream *originStream(size_t fsid, int fid); + std::shared_ptr<Stream> originStream(size_t fsid, int fid); private: struct StreamEntry { - Stream *stream; + std::shared_ptr<Stream> stream; std::unordered_map<int, std::vector<int>> maps; uint32_t original_fsid = 0; ftl::Handle handle; diff --git a/include/ftl/protocol/packet.hpp b/include/ftl/protocol/packet.hpp index 99421a14ec04a358561e78a288a80e0a9a41fe36..baaa97adacac140355b9de357a6ca6eadbf3a750 100644 --- a/include/ftl/protocol/packet.hpp +++ b/include/ftl/protocol/packet.hpp @@ -67,7 +67,7 @@ struct StreamPacketV4 { int64_t timestamp; uint8_t streamID; // Source number [or v4 frameset id] uint8_t frame_number; // v4+ First frame number (packet may include multiple frames) - ftl::codecs::Channel channel; // Actual channel of this current set of packets + ftl::protocol::Channel channel; // Actual channel of this current set of packets inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; } inline size_t frameSetID() const { return (version >= 4) ? streamID : 0; } @@ -92,7 +92,7 @@ struct StreamPacket { int64_t timestamp; uint8_t streamID; // Source number [or v4 frameset id] uint8_t frame_number; // v4+ First frame number (packet may include multiple frames) - ftl::codecs::Channel channel; // Actual channel of this current set of packets + ftl::protocol::Channel channel; // Actual channel of this current set of packets uint8_t flags=0; inline int frameNumber() const { return (version >= 4) ? frame_number : streamID; } diff --git a/src/broadcaster.cpp b/src/broadcaster.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c2471aa60e5f57b6d04716e02f3ffc4088996575 --- /dev/null +++ b/src/broadcaster.cpp @@ -0,0 +1,82 @@ +#include <ftl/protocol/broadcaster.hpp> + +using ftl::protocol::Broadcast; +using ftl::protocol::StreamPacket; +using ftl::protocol::Packet; + +Broadcast::Broadcast() { + +} + +Broadcast::~Broadcast() { + +} + +void Broadcast::add(const std::shared_ptr<Stream> &s) { + UNIQUE_LOCK(mutex_,lk); + + streams_.push_back(s); + handles_.push_back(std::move(s->onPacket([this,s](const StreamPacket &spkt, const Packet &pkt) { + //LOG(INFO) << "BCAST Request: " << (int)spkt.streamID << " " << (int)spkt.channel << " " << spkt.timestamp; + SHARED_LOCK(mutex_, lk); + if (spkt.frameSetID() < 255) available(spkt.frameSetID()) += spkt.channel; + cb_.trigger(spkt, pkt); + if (spkt.streamID < 255) s->select(spkt.streamID, selected(spkt.streamID)); + return true; + }))); +} + +void Broadcast::remove(const std::shared_ptr<Stream> &s) { + UNIQUE_LOCK(mutex_,lk); + // TODO: Find and remove handle also + streams_.remove(s); +} + +void Broadcast::clear() { + UNIQUE_LOCK(mutex_,lk); + handles_.clear(); + streams_.clear(); +} + +bool Broadcast::post(const StreamPacket &spkt, const Packet &pkt) { + SHARED_LOCK(mutex_, lk); + if (spkt.frameSetID() < 255) available(spkt.frameSetID()) += spkt.channel; + + bool status = true; + for (auto s : streams_) { + //s->select(spkt.frameSetID(), selected(spkt.frameSetID())); + status = status && s->post(spkt, pkt); + } + return status; +} + +bool Broadcast::begin() { + bool r = true; + for (auto &s : streams_) { + r = r && s->begin(); + } + return r; +} + +bool Broadcast::end() { + bool r = true; + for (auto &s : streams_) { + r = r && s->end(); + } + return r; +} + +bool Broadcast::active() { + if (streams_.size() == 0) return false; + bool r = true; + for (auto &s : streams_) { + r = r && s->active(); + } + return r; +} + +void Broadcast::reset() { + for (auto &s : streams_) { + s->reset(); + } +} diff --git a/src/channelSet.cpp b/src/channelSet.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1340df450ad4af316259804bd44d81e33e66c9ee --- /dev/null +++ b/src/channelSet.cpp @@ -0,0 +1,27 @@ +#include <ftl/protocol/channelSet.hpp> + +using ftl::protocol::ChannelSet; + +ChannelSet operator&(const ChannelSet &a, const ChannelSet &b) { + ChannelSet result; + for (auto &i : a) { + if (b.find(i) != b.end()) result.insert(i); + } + return result; +} + +ChannelSet operator-(const ChannelSet &a, const ChannelSet &b) { + ChannelSet result; + for (auto &i : a) { + if (b.find(i) == b.end()) result.insert(i); + } + return result; +} + +bool operator!=(const ChannelSet &a, const ChannelSet &b) { + if (a.size() != b.size()) return true; + for (auto &i : a) { + if (b.count(i) == 0) return true; + } + return false; +} diff --git a/src/muxer.cpp b/src/muxer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b21ea5305495a63e2030d3f65176f40f5fe98ae2 --- /dev/null +++ b/src/muxer.cpp @@ -0,0 +1,213 @@ +#include <ftl/protocol/muxer.hpp> +#include <ftl/lib/loguru.hpp> + +using ftl::protocol::Muxer; +using ftl::protocol::Stream; +using ftl::protocol::StreamPacket; + +Muxer::Muxer() : nid_{0} { + //value("paused", false); + //_forward("paused"); +} + +Muxer::~Muxer() { + UNIQUE_LOCK(mutex_,lk); + for (auto &se : streams_) { + se.handle.cancel(); + } +} + +void Muxer::_forward(const std::string &name) { + /*on(name, [this,name]() { + auto val = getConfig()[name]; + UNIQUE_LOCK(mutex_,lk); + for (auto &se : streams_) { + se.stream->set(name, val); + } + });*/ +} + + +void Muxer::add(const std::shared_ptr<Stream> &s, size_t fsid, const std::function<int()> &cb) { + UNIQUE_LOCK(mutex_,lk); + if (fsid < 0u || fsid >= ftl::protocol::kMaxStreams) return; + + auto &se = streams_.emplace_back(); + //int i = streams_.size()-1; + se.stream = s; + se.ids.push_back(fsid); + Muxer::StreamEntry *ptr = &se; + + se.handle = std::move(s->onPacket([this,s,ptr,cb](const StreamPacket &spkt, const Packet &pkt) { + //TODO: Allow input streams to have other streamIDs + // Same fsid means same streamIDs map together in the end + + /*ftl::stream::Muxer::StreamEntry *ptr = nullptr; + { + SHARED_LOCK(mutex_,lk); + ptr = &streams_[i]; + }*/ + + if (!cb && spkt.streamID > 0) { + LOG(WARNING) << "Multiple framesets in stream"; + return true; + } + + if (ptr->ids.size() <= spkt.streamID) { + UNIQUE_LOCK(mutex_,lk); + if (ptr->ids.size() <= spkt.streamID) { + ptr->ids.resize(spkt.streamID + 1); + ptr->ids[spkt.streamID] = cb(); + } + } + + int fsid; + { + SHARED_LOCK(mutex_, lk); + fsid = ptr->ids[spkt.streamID]; + } + + StreamPacket spkt2 = spkt; + ptr->original_fsid = spkt.streamID; // FIXME: Multiple originals needed + spkt2.streamID = fsid; + + if (spkt2.frame_number < 255) { + int id = _lookup(fsid, ptr, spkt.frame_number, pkt.frame_count); + spkt2.frame_number = id; + } + + _notify(spkt2, pkt); + s->select(spkt.streamID, selected(fsid), true); + return true; + })); +} + +void Muxer::remove(const std::shared_ptr<Stream> &s) { + UNIQUE_LOCK(mutex_,lk); + for (auto i = streams_.begin(); i != streams_.end(); ++i) { + if (i->stream == s) { + i->handle.cancel(); + auto *se = &(*i); + + for (size_t j=0; j<kMaxStreams; ++j) { + for (auto &k : revmap_[j]) { + if (k.first == se) { + k.first = nullptr; + } + } + } + + streams_.erase(i); + return; + } + } +} + +std::shared_ptr<Stream> Muxer::originStream(size_t fsid, int fid) { + if (fsid < ftl::protocol::kMaxStreams && static_cast<uint32_t>(fid) < revmap_[fsid].size()) { + return std::get<0>(revmap_[fsid][fid])->stream; + } + return nullptr; +} + +bool Muxer::post(const StreamPacket &spkt, const Packet &pkt) { + SHARED_LOCK(mutex_, lk); + if (pkt.data.size() > 0 || !(spkt.flags & ftl::protocol::kFlagRequest)) available(spkt.frameSetID()) += spkt.channel; + + if (spkt.streamID < ftl::protocol::kMaxStreams && spkt.frame_number < revmap_[spkt.streamID].size()) { + auto [se, ssid] = revmap_[spkt.streamID][spkt.frame_number]; + //auto &se = streams_[sid]; + + if (!se) return false; + + //LOG(INFO) << "POST " << spkt.frame_number; + + StreamPacket spkt2 = spkt; + spkt2.streamID = se->original_fsid; // FIXME: Multiple possible originals + spkt2.frame_number = ssid; + se->stream->select(spkt2.streamID, selected(spkt.frameSetID())); + return se->stream->post(spkt2, pkt); + } else { + return false; + } +} + +bool Muxer::begin() { + bool r = true; + for (auto &s : streams_) { + r = r && s.stream->begin(); + } + return r; +} + +bool Muxer::end() { + bool r = true; + for (auto &s : streams_) { + r = r && s.stream->end(); + } + return r; +} + +bool Muxer::active() { + bool r = true; + for (auto &s : streams_) { + r = r && s.stream->active(); + } + return r; +} + +void Muxer::reset() { + for (auto &s : streams_) { + s.stream->reset(); + } +} + +int Muxer::_lookup(size_t fsid, Muxer::StreamEntry *se, int ssid, int count) { + SHARED_LOCK(mutex_, lk); + + auto i = se->maps.find(fsid); + if (i == se->maps.end()) { + lk.unlock(); + { + UNIQUE_LOCK(mutex_, lk2); + if (se->maps.count(fsid) == 0) { + se->maps[fsid] = {}; + } + i = se->maps.find(fsid); + } + lk.lock(); + } + + auto &map = i->second; + + if (static_cast<uint32_t>(ssid) >= map.size()) { + lk.unlock(); + { + UNIQUE_LOCK(mutex_, lk2); + while (static_cast<uint32_t>(ssid) >= map.size()) { + int nid = nid_[fsid]++; + revmap_[fsid].push_back({se, static_cast<uint32_t>(map.size())}); + map.push_back(nid); + for (int j=1; j<count; ++j) { + int nid2 = nid_[fsid]++; + revmap_[fsid].push_back({se, static_cast<uint32_t>(map.size())}); + map.push_back(nid2); + } + } + } + lk.lock(); + } + return map[ssid]; +} + +void Muxer::_notify(const StreamPacket &spkt, const Packet &pkt) { + SHARED_LOCK(mutex_, lk); + available(spkt.frameSetID()) += spkt.channel; + + try { + cb_.trigger(spkt, pkt); // spkt.frame_number < 255 && + } catch (std::exception &e) { + LOG(ERROR) << "Exception in packet handler (" << int(spkt.channel) << "): " << e.what(); + //reset(); // Force stream reset here to get new i-frames + } +} diff --git a/src/streams.cpp b/src/streams.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bdf14b5d5c3a9f9dc8dd09da1e71a01e610330b1 --- /dev/null +++ b/src/streams.cpp @@ -0,0 +1,45 @@ +#include <ftl/protocol/streams.hpp> + +using ftl::protocol::Stream; +using ftl::protocol::Channel; +using ftl::protocol::ChannelSet; + +const ChannelSet &Stream::available(int fs) const { + SHARED_LOCK(mtx_, lk); + if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) throw FTL_Error("Frameset index out-of-bounds: " << fs); + return state_[fs].available; +} + +ChannelSet Stream::selected(int fs) const { + SHARED_LOCK(mtx_, lk); + if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) throw FTL_Error("Frameset index out-of-bounds: " << fs); + return state_[fs].selected; +} + +ChannelSet Stream::selectedNoExcept(int fs) const { + if (fs == 255) return {}; + + SHARED_LOCK(mtx_, lk); + if (fs < 0 || static_cast<uint32_t>(fs) >= state_.size()) return {}; + return state_[fs].selected; +} + +void Stream::select(int fs, const ChannelSet &s, bool make) { + if (fs == 255) return; + + UNIQUE_LOCK(mtx_, lk); + if (fs < 0 || (!make && static_cast<uint32_t>(fs) >= state_.size())) throw FTL_Error("Frameset index out-of-bounds: " << fs); + if (static_cast<uint32_t>(fs) >= state_.size()) state_.resize(fs+1); + state_[fs].selected = s; +} + +ChannelSet &Stream::available(int fs) { + UNIQUE_LOCK(mtx_, lk); + if (fs < 0) throw FTL_Error("Frameset index out-of-bounds: " << fs); + if (static_cast<uint32_t>(fs) >= state_.size()) state_.resize(fs+1); + return state_[fs].available; +} + +void Stream::reset() { + // Clear available and selected? +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3777c230bdcc33d0c4ac980089ace4aec46cc2fe..c7704125364b97e95560ff14fb46107551d936a9 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -49,3 +49,14 @@ target_link_libraries(net_performance beyond-protocol GnuTLS::GnuTLS Threads::Threads ${UUID_LIBRARIES} ${URIPARSER_LIBRARIES} ${OS_LIBS}) add_test(NetPerformance net_performance) + +### Stream Unit ################################################################ +add_executable(stream_unit + $<TARGET_OBJECTS:CatchTestFTL> + ./stream_unit.cpp +) +target_include_directories(stream_unit PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include") +target_link_libraries(stream_unit + beyond-protocol ${URIPARSER_LIBRARIES} ${OS_LIBS}) + +add_test(StreamUnitTest stream_unit) \ No newline at end of file diff --git a/test/stream_unit.cpp b/test/stream_unit.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fc920b6c0698289b871d92c7e8e7f9d00fd59ad2 --- /dev/null +++ b/test/stream_unit.cpp @@ -0,0 +1,285 @@ +#include "catch.hpp" + +#include <ftl/protocol/streams.hpp> +#include <ftl/protocol/muxer.hpp> +#include <ftl/protocol/broadcaster.hpp> +#include <nlohmann/json.hpp> + +using ftl::protocol::Muxer; +using ftl::protocol::Broadcast; +using ftl::protocol::Stream; +using ftl::protocol::StreamPacket; +using ftl::protocol::Packet; +using ftl::protocol::Channel; + +class TestStream : public ftl::protocol::Stream { + public: + TestStream() {}; + ~TestStream() {}; + + bool post(const ftl::protocol::StreamPacket &spkt, const ftl::protocol::Packet &pkt) { + available(spkt.streamID) += spkt.channel; + cb_.trigger(spkt, pkt); + return true; + } + + bool begin() override { return true; } + bool end() override { return true; } + bool active() override { return true; } + + private: + //std::function<void(const StreamPacket &, const Packet &)> cb_; +}; + +TEST_CASE("ftl::stream::Muxer()::write", "[stream]") { + + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("write with one stream") { + std::shared_ptr<Stream> s = std::make_shared<TestStream>(); + REQUIRE(s); + + mux->add(s); + + ftl::protocol::StreamPacket tspkt = {4,0,0,1, Channel::kColour}; + + auto h = s->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( !mux->post({4,100,0,1,ftl::protocol::Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 0 ); + } + + SECTION("write to previously read") { + + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + ftl::protocol::StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 100 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 101 ); + REQUIRE( tspkt.frame_number == 1 ); + + StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; + StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; + auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + tspkt2 = spkt; + return true; + }); + auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { + tspkt3 = spkt; + return true; + }); + + REQUIRE( mux->post({4,200,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt3.timestamp == 200 ); + REQUIRE( tspkt3.frame_number == 0 ); + } +} + +TEST_CASE("ftl::stream::Muxer()::post multi-frameset", "[stream]") { + + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("write to previously read") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2,1); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 0 ); + + StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; + StreamPacket tspkt3 = {4,0,0,1,Channel::kColour}; + auto h2 = s1->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + tspkt2 = spkt; + return true; + }); + auto h3 = s2->onPacket([&tspkt3](const StreamPacket &spkt, const Packet &pkt) { + tspkt3 = spkt; + return true; + }); + + REQUIRE( mux->post({4,200,1,0,Channel::kColour},{}) ); + REQUIRE( tspkt3.streamID == 0 ); + REQUIRE( tspkt3.frame_number == 0 ); + } +} + +TEST_CASE("ftl::stream::Muxer()::read", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + SECTION("read with two writing streams") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 100 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 101 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s1->post({4,102,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 102 ); + REQUIRE( tspkt.frame_number == 2 ); + + REQUIRE( s2->post({4,103,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 103 ); + REQUIRE( tspkt.frame_number == 3 ); + } + + SECTION("read consistency with two writing streams") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 100 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 101 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s1->post({4,102,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 102 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,103,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.timestamp == 103 ); + REQUIRE( tspkt.frame_number == 1 ); + } +} + +TEST_CASE("ftl::stream::Muxer()::read multi-frameset", "[stream]") { + std::unique_ptr<Muxer> mux = std::make_unique<Muxer>(); + REQUIRE(mux); + + //SECTION("read with two writing streams") { + + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + std::shared_ptr<Stream> s3 = std::make_shared<TestStream>(); + REQUIRE(s3); + std::shared_ptr<Stream> s4 = std::make_shared<TestStream>(); + REQUIRE(s4); + + mux->add(s1,0); + mux->add(s2,1); + mux->add(s3,0); + mux->add(s4,1); + + StreamPacket tspkt = {4,0,0,1,Channel::kColour}; + auto h = mux->onPacket([&tspkt](const StreamPacket &spkt, const Packet &pkt) { + tspkt = spkt; + return true; + }); + + REQUIRE( s1->post({4,100,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s2->post({4,101,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 0 ); + + REQUIRE( s3->post({4,102,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 0 ); + REQUIRE( tspkt.frame_number == 1 ); + + REQUIRE( s4->post({4,103,0,0,Channel::kColour},{}) ); + REQUIRE( tspkt.streamID == 1 ); + REQUIRE( tspkt.frame_number == 1 ); + //} +} + +TEST_CASE("ftl::stream::Broadcast()::write", "[stream]") { + std::unique_ptr<Broadcast> mux = std::make_unique<Broadcast>(); + REQUIRE(mux); + + SECTION("write with two streams") { + std::shared_ptr<Stream> s1 = std::make_shared<TestStream>(); + REQUIRE(s1); + std::shared_ptr<Stream> s2 = std::make_shared<TestStream>(); + REQUIRE(s2); + + mux->add(s1); + mux->add(s2); + + StreamPacket tspkt1 = {4,0,0,1,Channel::kColour}; + StreamPacket tspkt2 = {4,0,0,1,Channel::kColour}; + + auto h1 = s1->onPacket([&tspkt1](const StreamPacket &spkt, const Packet &pkt) { + tspkt1 = spkt; + return true; + }); + auto h2 = s2->onPacket([&tspkt2](const StreamPacket &spkt, const Packet &pkt) { + tspkt2 = spkt; + return true; + }); + + REQUIRE( mux->post({4,100,0,1,Channel::kColour},{}) ); + REQUIRE( tspkt1.timestamp == 100 ); + REQUIRE( tspkt2.timestamp == 100 ); + } + +}