diff --git a/include/ftl/protocol/channelSet.hpp b/include/ftl/protocol/channelSet.hpp index 55dcc8c339a5b8db1bb00fa1abbd7fee1d55ebb2..8a33b6326bfa4ffd6584d545075a4e5b75decd0d 100644 --- a/include/ftl/protocol/channelSet.hpp +++ b/include/ftl/protocol/channelSet.hpp @@ -21,6 +21,8 @@ using ChannelSet = std::unordered_set<ftl::protocol::Channel>; ftl::protocol::ChannelSet operator&(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); +ftl::protocol::ChannelSet operator|(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); + ftl::protocol::ChannelSet operator-(const ftl::protocol::ChannelSet &a, const ftl::protocol::ChannelSet &b); inline ftl::protocol::ChannelSet &operator+=(ftl::protocol::ChannelSet &t, ftl::protocol::Channel c) { diff --git a/include/ftl/protocol/channelUtils.hpp b/include/ftl/protocol/channelUtils.hpp index 6606e1ddbda3437b3c51477fb0f9f357bf1e682e..27030ab0a547d86956e97897866b9d81a6265ac6 100644 --- a/include/ftl/protocol/channelUtils.hpp +++ b/include/ftl/protocol/channelUtils.hpp @@ -12,9 +12,10 @@ namespace ftl { namespace protocol { -inline bool isVideo(Channel c) { return static_cast<int>(c) < 32; } -inline bool isAudio(Channel c) { return static_cast<int>(c) >= 32 && static_cast<int>(c) < 64; } -inline bool isData(Channel c) { return static_cast<int>(c) >= 64; } +constexpr bool isVideo(Channel c) { return static_cast<int>(c) < 32; } +constexpr bool isAudio(Channel c) { return static_cast<int>(c) >= 32 && static_cast<int>(c) < 64; } +constexpr bool isData(Channel c) { return static_cast<int>(c) >= 64; } +constexpr bool isPersistent(Channel c) { return static_cast<int>(c) >= 64; } /** Obtain a string name for channel. */ std::string name(Channel c); @@ -31,7 +32,7 @@ Channel fromName(const std::string &name); int type(Channel c); /** @deprecated */ -inline bool isFloatChannel(ftl::protocol::Channel chan) { +constexpr bool isFloatChannel(ftl::protocol::Channel chan) { switch (chan) { case Channel::kGroundTruth : case Channel::kDepth : diff --git a/include/ftl/protocol/streams.hpp b/include/ftl/protocol/streams.hpp index 1035790b8e0a3839c283af0c90b5f311c423cc6e..a258a35936bdf512397b1a41ce7b72af1c3bd50d 100644 --- a/include/ftl/protocol/streams.hpp +++ b/include/ftl/protocol/streams.hpp @@ -382,7 +382,9 @@ class Stream { struct FSState { bool enabled = false; ftl::protocol::ChannelSet selected; - ftl::protocol::ChannelSet available; + ftl::protocol::ChannelSet availablePersistent; + std::atomic_uint64_t availableLast = 0; + std::atomic_uint64_t availableNext = 0; // TODO(Nick): Add a name and metadata }; @@ -390,7 +392,10 @@ class Stream { ftl::Handler<const Request &> request_cb_; ftl::Handler<FrameID, ftl::protocol::Channel> avail_cb_; ftl::Handler<ftl::protocol::Error, const std::string&> error_cb_; - std::unordered_map<int, FSState> state_; + std::unordered_map<int, std::shared_ptr<FSState>> state_; + + std::shared_ptr<FSState> _getState(FrameID id); + std::shared_ptr<FSState> _getState(FrameID id) const; }; using StreamPtr = std::shared_ptr<Stream>; diff --git a/src/channelSet.cpp b/src/channelSet.cpp index c0c1702a83b02aeb0e93df3124bac404fa6b6a74..40ff8a6aa9a5a06399b1e43c002a0bb16e7fc88f 100644 --- a/src/channelSet.cpp +++ b/src/channelSet.cpp @@ -16,6 +16,13 @@ ChannelSet operator&(const ChannelSet &a, const ChannelSet &b) { return result; } +ChannelSet operator|(const ChannelSet &a, const ChannelSet &b) { + ChannelSet result; + result.insert(a.begin(), a.end()); + result.insert(b.begin(), b.end()); + return result; +} + ChannelSet operator-(const ChannelSet &a, const ChannelSet &b) { ChannelSet result; for (auto &i : a) { diff --git a/src/streams/streams.cpp b/src/streams/streams.cpp index 79fc28398c8fbddd9b73649de250c5207b02e863..ba699f0f41f58be82e758bb80e83b87f5a69336b 100644 --- a/src/streams/streams.cpp +++ b/src/streams/streams.cpp @@ -5,11 +5,14 @@ */ #include <ftl/protocol/streams.hpp> +#include <ftl/protocol/channelUtils.hpp> +#include <ftl/protocol/channelSet.hpp> using ftl::protocol::Stream; using ftl::protocol::Channel; using ftl::protocol::ChannelSet; using ftl::protocol::FrameID; +using ftl::protocol::isPersistent; std::string Stream::name() const { return "Unknown"; @@ -21,34 +24,46 @@ bool Stream::available(FrameID id) const { } bool Stream::available(FrameID id, Channel channel) const { - SHARED_LOCK(mtx_, lk); - auto it = state_.find(id); - if (it != state_.end()) { - return it->second.available.count(channel) > 0; + auto state = _getState(id); + if (!state) return false; + if (isPersistent(channel)) { + SHARED_LOCK(mtx_, lk); + return state->availablePersistent.count(channel) > 0; + } else { + return state->availableLast & (1ull << static_cast<int>(channel)); } - return false; } bool Stream::available(FrameID id, const ChannelSet &channels) const { - SHARED_LOCK(mtx_, lk); - auto it = state_.find(id); - if (it != state_.end()) { - const auto &set = it->second.available; - for (auto channel : channels) { - if (set.count(channel) == 0) return false; + auto state = _getState(id); + if (!state) return false; + for (auto channel : channels) { + if (isPersistent(channel)) { + SHARED_LOCK(mtx_, lk); + if (state->availablePersistent.count(channel) == 0) return false; + } else { + if ((state->availableLast & (1ull << static_cast<int>(channel))) == 0) return false; } - return true; } - return false; + return true; } ftl::protocol::ChannelSet Stream::channels(FrameID id) const { + auto state = _getState(id); + if (!state) return {}; + SHARED_LOCK(mtx_, lk); - auto it = state_.find(id); - if (it != state_.end()) { - return it->second.available; + ChannelSet result = state->availablePersistent; + lk.unlock(); + + uint64_t last = state->availableLast; + + for (int i = 0; i < 64; ++i) { + if ((1ull << i) & last) { + result.insert(static_cast<Channel>(i)); + } } - return {}; + return result; } std::unordered_set<FrameID> Stream::frames() const { @@ -64,7 +79,8 @@ std::unordered_set<FrameID> Stream::enabled() const { SHARED_LOCK(mtx_, lk); std::unordered_set<FrameID> result; for (const auto &s : state_) { - if (s.second.enabled) { + if (!s.second) continue; + if (s.second->enabled) { result.emplace(s.first); } } @@ -72,19 +88,17 @@ std::unordered_set<FrameID> Stream::enabled() const { } bool Stream::enabled(FrameID id) const { - SHARED_LOCK(mtx_, lk); - auto it = state_.find(id); - if (it != state_.end()) { - return it->second.enabled; - } - return false; + auto state = _getState(id); + if (!state) return false; + return state->enabled; } bool Stream::enabled(FrameID id, ftl::protocol::Channel channel) const { SHARED_LOCK(mtx_, lk); auto it = state_.find(id); if (it != state_.end()) { - return it->second.selected.count(channel) > 0; + if (!it->second) return false; + return it->second->selected.count(channel) > 0; } return false; } @@ -93,7 +107,8 @@ ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const { SHARED_LOCK(mtx_, lk); auto it = state_.find(id); if (it != state_.end()) { - return it->second.selected; + if (!it->second) return {}; + return it->second->selected; } return {}; } @@ -101,49 +116,55 @@ ftl::protocol::ChannelSet Stream::enabledChannels(FrameID id) const { bool Stream::enable(FrameID id) { UNIQUE_LOCK(mtx_, lk); auto &p = state_[id]; - p.enabled = true; + if (!p) p = std::make_shared<Stream::FSState>(); + p->enabled = true; return true; } bool Stream::enable(FrameID id, ftl::protocol::Channel channel) { UNIQUE_LOCK(mtx_, lk); auto &p = state_[id]; - p.enabled = true; - p.selected.insert(channel); + if (!p) p = std::make_shared<Stream::FSState>(); + p->enabled = true; + p->selected.insert(channel); return true; } bool Stream::enable(FrameID id, const ftl::protocol::ChannelSet &channels) { UNIQUE_LOCK(mtx_, lk); auto &p = state_[id]; - p.enabled = true; - p.selected.insert(channels.begin(), channels.end()); + if (!p) p = std::make_shared<Stream::FSState>(); + p->enabled = true; + p->selected.insert(channels.begin(), channels.end()); return true; } void Stream::disable(FrameID id) { UNIQUE_LOCK(mtx_, lk); auto &p = state_[id]; - p.enabled = false; + if (!p) p = std::make_shared<Stream::FSState>(); + p->enabled = false; } void Stream::disable(FrameID id, ftl::protocol::Channel channel) { UNIQUE_LOCK(mtx_, lk); auto &p = state_[id]; - p.selected.erase(channel); - if (p.selected.size() == 0) { - p.enabled = false; + if (!p) p = std::make_shared<Stream::FSState>(); + p->selected.erase(channel); + if (p->selected.size() == 0) { + p->enabled = false; } } void Stream::disable(FrameID id, const ftl::protocol::ChannelSet &channels) { UNIQUE_LOCK(mtx_, lk); auto &p = state_[id]; + if (!p) p = std::make_shared<Stream::FSState>(); for (const auto &c : channels) { - p.selected.erase(c); + p->selected.erase(c); } - if (p.selected.size() == 0) { - p.enabled = false; + if (p->selected.size() == 0) { + p->enabled = false; } } @@ -158,15 +179,42 @@ void Stream::trigger(const ftl::protocol::StreamPacket &spkt, const ftl::protoco cb_.trigger(spkt, pkt); } +std::shared_ptr<Stream::FSState> Stream::_getState(FrameID id) { + { + SHARED_LOCK(mtx_, lk); + auto it = state_.find(id); + if (it != state_.end()) return it->second; + } + UNIQUE_LOCK(mtx_, lk); + if (!state_[id]) state_[id] = std::make_shared<Stream::FSState>(); + return state_[id]; +} + +std::shared_ptr<Stream::FSState> Stream::_getState(FrameID id) const { + SHARED_LOCK(mtx_, lk); + auto it = state_.find(id); + if (it != state_.end()) return it->second; + return nullptr; +} + void Stream::seen(FrameID id, ftl::protocol::Channel channel) { - if (!available(id, channel)) { + auto state = _getState(id); + if (channel == Channel::kEndFrame) { + state->availableLast = static_cast<uint64_t>(state->availableNext); + state->availableNext = 0; + } else if (isPersistent(channel)) { { - UNIQUE_LOCK(mtx_, lk); - auto &p = state_[id]; - p.available.insert(channel); + SHARED_LOCK(mtx_, lk); + if (state->availablePersistent.count(channel) > 0) return; } - avail_cb_.trigger(id, channel); + UNIQUE_LOCK(mtx_, lk); + state->availablePersistent.insert(channel); + } else { + state->availableNext |= 1ull << static_cast<int>(channel); + if (state->availableLast & (1ull << static_cast<int>(channel))) return; } + + avail_cb_.trigger(id, channel); } void Stream::request(const ftl::protocol::Request &req) { diff --git a/test/muxer_unit.cpp b/test/muxer_unit.cpp index 550adfa01f4ec18ecc1e73e60da0472f4ee47fe8..1614ff91a996e8c760a5e1f5e779022fdac70070 100644 --- a/test/muxer_unit.cpp +++ b/test/muxer_unit.cpp @@ -478,21 +478,34 @@ TEST_CASE("Muxer available", "[stream]") { REQUIRE( mux->available(id2) ); } - SECTION("available channel when seen") { + SECTION("available (persistent) channel when seen") { + FrameID id1(0, 1); + REQUIRE( !s1->available(id1, Channel::kPose) ); + REQUIRE( !mux->available(id1, Channel::kPose) ); + s1->forceSeen(id1, Channel::kPose); + REQUIRE( s1->available(id1, Channel::kPose) ); + REQUIRE( mux->available(id1, Channel::kPose) ); + } + + SECTION("available (temp) channel when seen") { FrameID id1(0, 1); REQUIRE( !s1->available(id1, Channel::kColour) ); REQUIRE( !mux->available(id1, Channel::kColour) ); s1->forceSeen(id1, Channel::kColour); + s1->forceSeen(id1, Channel::kEndFrame); REQUIRE( s1->available(id1, Channel::kColour) ); REQUIRE( mux->available(id1, Channel::kColour) ); + s1->forceSeen(id1, Channel::kEndFrame); + REQUIRE( !s1->available(id1, Channel::kColour) ); + REQUIRE( !mux->available(id1, Channel::kColour) ); } SECTION("not available when wrong channel seen") { FrameID id1(0, 1); - s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kCalibration2); REQUIRE( mux->available(id1) ); - REQUIRE( !s1->available(id1, Channel::kColour) ); - REQUIRE( !mux->available(id1, Channel::kColour) ); + REQUIRE( !s1->available(id1, Channel::kPose) ); + REQUIRE( !mux->available(id1, Channel::kPose) ); } SECTION("available channel set when seen all") { @@ -502,6 +515,7 @@ TEST_CASE("Muxer available", "[stream]") { REQUIRE( !mux->available(id1, set) ); s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kEndFrame); REQUIRE( s1->available(id1, set) ); REQUIRE( mux->available(id1, set) ); } @@ -512,6 +526,7 @@ TEST_CASE("Muxer available", "[stream]") { REQUIRE( !s1->available(id1, set) ); REQUIRE( !mux->available(id1, set) ); s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kEndFrame); REQUIRE( !s1->available(id1, set) ); REQUIRE( !mux->available(id1, set) ); } @@ -625,6 +640,7 @@ TEST_CASE("Muxer channels", "[stream]") { s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kEndFrame); auto set = mux->channels(id1); REQUIRE( set.size() == 2 ); @@ -656,9 +672,11 @@ TEST_CASE("Muxer enabledChannels", "[stream]") { s1->forceSeen(id1, Channel::kColour); s1->forceSeen(id1, Channel::kDepth); + s1->forceSeen(id1, Channel::kPose); + s1->forceSeen(id1, Channel::kEndFrame); auto set = mux->channels(id1); - REQUIRE( set.size() == 2 ); + REQUIRE( set.size() == 3 ); REQUIRE( mux->enable(id1, set) ); diff --git a/test/netstream_unit.cpp b/test/netstream_unit.cpp index 06d5429f7e03a7ea4ba23140985019ec8aff4326..e57f71b596cb2d0da335c469935c48ec2159c800 100644 --- a/test/netstream_unit.cpp +++ b/test/netstream_unit.cpp @@ -201,6 +201,9 @@ TEST_CASE("Net stream can see received data") { spkt.channel = Channel::kColour; writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); p->data(); + spkt.channel = Channel::kEndFrame; + writeNotification(0, "ftl://mystream", std::make_tuple(0, spkt, pkt)); + p->data(); sleep_for(milliseconds(50)); REQUIRE( seenReq );